package main import ( "context" "encoding/json" "errors" "fmt" "log" "net/http" "os" "os/signal" "strconv" "strings" "sync" "syscall" "time" "github.com/jackc/pgx/v5/pgxpool" amqp "github.com/rabbitmq/amqp091-go" ) // Воркер держит HTTP /healthz и параллельно слушает RabbitMQ, // записывая события в Postgres. Даже при ошибках контейнер остается живым. type Message struct { Action string `json:"action"` ID *int64 `json:"id,omitempty"` Text string `json:"text,omitempty"` Source string `json:"source,omitempty"` RequestID string `json:"request_id,omitempty"` } type workerStatus struct { mu sync.Mutex state string lastError string lastSuccess time.Time } func (s *workerStatus) set(state, err string) { s.mu.Lock() defer s.mu.Unlock() s.state = state s.lastError = err if err == "" { s.lastSuccess = time.Now() } } func (s *workerStatus) snapshot() map[string]string { s.mu.Lock() defer s.mu.Unlock() result := map[string]string{ "state": s.state, "error": s.lastError, "timestamp": s.lastSuccess.Format(time.RFC3339), } return result } // getenv читает переменную окружения с дефолтом. func getenv(key, def string) string { val := os.Getenv(key) if val == "" { return def } return val } // getenvInt читает int-переменную окружения с дефолтом. func getenvInt(key string, def int) int { val := os.Getenv(key) if val == "" { return def } parsed, err := strconv.Atoi(val) if err != nil { return def } return parsed } // buildAMQPURL собирает URL для RabbitMQ. func buildAMQPURL() (string, error) { if url := os.Getenv("AMQP_URL"); url != "" { return url, nil } host := getenv("RABBIT_HOST", "") port := getenv("RABBIT_PORT", "5672") user := getenv("RABBIT_USER", "") pass := getenv("RABBIT_PASSWORD", "") vhost := getenv("RABBIT_VHOST", "/") if host == "" || user == "" || pass == "" { return "", errors.New("missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL") } return fmt.Sprintf("amqp://%s:%s@%s:%s%s", user, pass, host, port, vhost), nil } // buildPgConnString собирает строку подключения к Postgres. func buildPgConnString() (string, error) { if url := os.Getenv("DATABASE_URL"); url != "" { return url, nil } host := getenv("PGHOST", "") if host == "" { return "", errors.New("missing PGHOST or DATABASE_URL") } port := getenv("PGPORT", "5432") user := getenv("PGUSER", "") pass := getenv("PGPASSWORD", "") db := getenv("PGDATABASE", "postgres") ssl := getenv("PGSSLMODE", "require") if user == "" || pass == "" { return "", errors.New("missing PGUSER/PGPASSWORD or DATABASE_URL") } return fmt.Sprintf( "postgresql://%s:%s@%s:%s/%s?sslmode=%s", user, pass, host, port, db, ssl, ), nil } // ensureTable гарантирует наличие таблицы для демо. func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error { query := fmt.Sprintf( "CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())", table, ) _, err := pool.Exec(ctx, query) return err } // parseMessage принимает JSON и fallback-ит в текст. func parseMessage(body []byte) Message { msg := Message{Action: "create"} if err := json.Unmarshal(body, &msg); err != nil { msg.Text = strings.TrimSpace(string(body)) if msg.Text == "" { msg.Text = "(empty)" } return msg } if msg.Action == "" { msg.Action = "create" } return msg } var errInvalidMessage = errors.New("invalid message") // handleMessage выполняет CRUD над таблицей. func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Message) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() switch strings.ToLower(msg.Action) { case "create": text := msg.Text if text == "" { text = "(empty)" } query := fmt.Sprintf("INSERT INTO %s (test_data) VALUES ($1)", table) _, err := pool.Exec(ctx, query, text) return err case "update": if msg.ID == nil { return errInvalidMessage } query := fmt.Sprintf("UPDATE %s SET test_data=$1 WHERE id=$2", table) _, err := pool.Exec(ctx, query, msg.Text, *msg.ID) return err case "delete": if msg.ID == nil { return errInvalidMessage } query := fmt.Sprintf("DELETE FROM %s WHERE id=$1", table) _, err := pool.Exec(ctx, query, *msg.ID) return err default: return errInvalidMessage } } // startHTTPServer поднимает /healthz и базовый ответ на /. func startHTTPServer(port string, status *workerStatus) { http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { payload, _ := json.Marshal(status.snapshot()) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write(payload) }) http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("rabbit-worker ready")) }) log.Printf("listening on :%s", port) if err := http.ListenAndServe(fmt.Sprintf(":%s", port), nil); err != nil { log.Fatal(err) } } // runWorker подключается к Rabbit и обрабатывает сообщения. func runWorker(ctx context.Context, status *workerStatus) error { amqpURL, err := buildAMQPURL() if err != nil { return err } pgConn, err := buildPgConnString() if err != nil { return err } // Список очередей поддерживает несколько значений через запятую. queues := strings.Split(getenv("RABBIT_QUEUES", "crud_queue"), ",") for i := range queues { queues[i] = strings.TrimSpace(queues[i]) } queueDurable := getenv("RABBIT_DURABLE", "true") == "true" prefetch := getenvInt("RABBIT_PREFETCH", 1) requeueOnError := getenv("REQUEUE_ON_ERROR", "true") == "true" table := getenv("PG_TABLE", "nubes_test_table") pool, err := pgxpool.New(ctx, pgConn) if err != nil { return fmt.Errorf("pg pool: %w", err) } defer pool.Close() if err := pool.Ping(ctx); err != nil { return fmt.Errorf("pg ping: %w", err) } if err := ensureTable(ctx, pool, table); err != nil { return fmt.Errorf("ensure table: %w", err) } conn, err := amqp.Dial(amqpURL) if err != nil { return fmt.Errorf("amqp dial: %w", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { return fmt.Errorf("amqp channel: %w", err) } defer ch.Close() if err := ch.Qos(prefetch, 0, false); err != nil { return fmt.Errorf("amqp qos: %w", err) } for _, q := range queues { if q == "" { continue } if _, err := ch.QueueDeclare(q, queueDurable, false, false, false, nil); err != nil { return fmt.Errorf("queue declare %s: %w", q, err) } } // Сливаем несколько очередей в один канал сообщений. msgs := make(chan amqp.Delivery) for _, q := range queues { if q == "" { continue } delivery, err := ch.Consume(q, "", false, false, false, false, nil) if err != nil { return fmt.Errorf("consume %s: %w", q, err) } go func(d <-chan amqp.Delivery) { for m := range d { msgs <- m } }(delivery) } status.set("running", "") for { select { case <-ctx.Done(): return nil case m := <-msgs: msg := parseMessage(m.Body) if err := handleMessage(ctx, pool, table, msg); err != nil { if errors.Is(err, errInvalidMessage) { _ = m.Nack(false, false) log.Printf("reject message: %v", err) continue } _ = m.Nack(false, requeueOnError) log.Printf("error handling message: %v", err) continue } _ = m.Ack(false) } } } func main() { port := getenv("PORT", "8080") status := &workerStatus{state: "starting"} ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() go startHTTPServer(port, status) // Экспоненциальный backoff при ошибках подключения. backoff := 2 * time.Second for { if ctx.Err() != nil { return } if err := runWorker(ctx, status); err != nil { status.set("degraded", err.Error()) log.Printf("worker error: %v", err) time.Sleep(backoff) if backoff < 30*time.Second { backoff *= 2 } continue } status.set("stopped", "") return } }