diff --git a/Dockerfile b/Dockerfile index a33ffe8..34eda5f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,3 +1,4 @@ +# Сборочный этап: собираем статический бинарник. FROM golang:1.22-alpine AS build WORKDIR /src @@ -9,6 +10,7 @@ COPY . . RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /out/worker ./ +# Рантайм-этап: минимальный образ с ca-certificates. FROM alpine:3.20 RUN apk add --no-cache ca-certificates diff --git a/README.md b/README.md index 86a8ef7..3013b07 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Consumes CRUD messages from RabbitMQ and writes to Postgres. +Комментарий: HTTP-эндпоинт нужен только для health-check, остальная логика работает через RabbitMQ. + ## Environment variables RabbitMQ: @@ -31,6 +33,8 @@ Fields: If the payload is not JSON, it is treated as `create` with body as text. +Примечание: сообщения без `id` для update/delete считаются некорректными и отбрасываются. + ## Build ```bash diff --git a/main.go b/main.go index 38d5e59..7cefc9c 100644 --- a/main.go +++ b/main.go @@ -58,6 +58,7 @@ func (s *workerStatus) snapshot() map[string]string { return result } +// getenv читает переменную окружения с дефолтом. func getenv(key, def string) string { val := os.Getenv(key) if val == "" { @@ -66,6 +67,7 @@ func getenv(key, def string) string { return val } +// getenvInt читает int-переменную окружения с дефолтом. func getenvInt(key string, def int) int { val := os.Getenv(key) if val == "" { @@ -78,6 +80,7 @@ func getenvInt(key string, def int) int { return parsed } +// buildAMQPURL собирает URL для RabbitMQ. func buildAMQPURL() (string, error) { if url := os.Getenv("AMQP_URL"); url != "" { return url, nil @@ -96,6 +99,7 @@ func buildAMQPURL() (string, error) { return fmt.Sprintf("amqp://%s:%s@%s:%s%s", user, pass, host, port, vhost), nil } +// buildPgConnString собирает строку подключения к Postgres. func buildPgConnString() (string, error) { if url := os.Getenv("DATABASE_URL"); url != "" { return url, nil @@ -126,6 +130,7 @@ func buildPgConnString() (string, error) { ), nil } +// ensureTable гарантирует наличие таблицы для демо. func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error { query := fmt.Sprintf( "CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())", @@ -135,6 +140,7 @@ func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error { return err } +// parseMessage принимает JSON и fallback-ит в текст. func parseMessage(body []byte) Message { msg := Message{Action: "create"} if err := json.Unmarshal(body, &msg); err != nil { @@ -152,6 +158,7 @@ func parseMessage(body []byte) Message { var errInvalidMessage = errors.New("invalid message") +// handleMessage выполняет CRUD над таблицей. func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Message) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -184,6 +191,7 @@ func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Me } } +// startHTTPServer поднимает /healthz и базовый ответ на /. func startHTTPServer(port string, status *workerStatus) { http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { payload, _ := json.Marshal(status.snapshot()) @@ -203,6 +211,7 @@ func startHTTPServer(port string, status *workerStatus) { } } +// runWorker подключается к Rabbit и обрабатывает сообщения. func runWorker(ctx context.Context, status *workerStatus) error { amqpURL, err := buildAMQPURL() if err != nil { @@ -214,6 +223,7 @@ func runWorker(ctx context.Context, status *workerStatus) error { return err } + // Список очередей поддерживает несколько значений через запятую. queues := strings.Split(getenv("RABBIT_QUEUES", "crud_queue"), ",") for i := range queues { queues[i] = strings.TrimSpace(queues[i]) @@ -261,6 +271,7 @@ func runWorker(ctx context.Context, status *workerStatus) error { } } + // Сливаем несколько очередей в один канал сообщений. msgs := make(chan amqp.Delivery) for _, q := range queues { if q == "" { @@ -309,6 +320,7 @@ func main() { go startHTTPServer(port, status) + // Экспоненциальный backoff при ошибках подключения. backoff := 2 * time.Second for { if ctx.Err() != nil {