commit 664713ea125a1f62351ae2c6313aedfef1eb61fa Author: “Naeel” Date: Mon Feb 23 09:22:09 2026 +0400 Initial Go rabbit worker diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1dda6e5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +bin/ +*.log +.env diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..64971c0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM golang:1.22-alpine AS build + +WORKDIR /src + +COPY go.mod ./ +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /out/worker ./ + +FROM alpine:3.20 + +RUN apk add --no-cache ca-certificates + +WORKDIR /app +COPY --from=build /out/worker /app/worker + +CMD ["/app/worker"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..86a8ef7 --- /dev/null +++ b/README.md @@ -0,0 +1,40 @@ +# Rabbit Worker (Go) + +Consumes CRUD messages from RabbitMQ and writes to Postgres. + +## Environment variables + +RabbitMQ: +- `AMQP_URL` (preferred), example: `amqp://user:pass@host:5672/vhost` +- or `RABBIT_HOST`, `RABBIT_PORT` (default 5672), `RABBIT_USER`, `RABBIT_PASSWORD`, `RABBIT_VHOST` (default `/`) +- `RABBIT_QUEUES` (comma-separated, default `crud_queue`) +- `RABBIT_DURABLE` (default `true`) +- `RABBIT_PREFETCH` (default `1`) +- `REQUEUE_ON_ERROR` (default `true`) + +Postgres: +- `DATABASE_URL` (preferred) +- or `PGHOST`, `PGPORT` (default 5432), `PGUSER`, `PGPASSWORD`, `PGDATABASE` (default `postgres`), `PGSSLMODE` (default `require`) +- `PG_TABLE` (default `nubes_test_table`) + +## Message format + +JSON payload: +```json +{"action":"create","text":"hello","id":1} +``` + +Fields: +- `action`: `create` | `update` | `delete` +- `id`: required for `update` and `delete` +- `text`: used by `create` and `update` + +If the payload is not JSON, it is treated as `create` with body as text. + +## Build + +```bash +cd /home/naeel/terraform/PROD_STAND/RABBIT/rabbit-worker + +docker build -t /: . +``` diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4780409 --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module gitea-naeel.giteak8s.services.ngcloud.ru/naeel/rabbit-worker + +go 1.22 + +require ( + github.com/jackc/pgx/v5 v5.5.5 + github.com/rabbitmq/amqp091-go v1.10.0 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/text v0.14.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..242ae45 --- /dev/null +++ b/go.sum @@ -0,0 +1,32 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..aae87c0 --- /dev/null +++ b/main.go @@ -0,0 +1,318 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + amqp "github.com/rabbitmq/amqp091-go" +) + +// Message описывает формат команды из очереди. +// +// Поддерживаемые действия: +// - create: вставка новой строки +// - update: обновление строки по id +// - delete: удаление строки по id +// +// Если тело сообщения не JSON, оно трактуется как create, +// а весь текст — это содержимое записи. +type Message struct { + Action string `json:"action"` + ID *int64 `json:"id,omitempty"` + Text string `json:"text,omitempty"` + Source string `json:"source,omitempty"` + RequestID string `json:"request_id,omitempty"` +} + +// getenv возвращает значение переменной окружения +// или значение по умолчанию, если переменная не задана. +func getenv(key, def string) string { + val := os.Getenv(key) + if val == "" { + return def + } + return val +} + +// getenvInt пытается распарсить целое значение из переменной окружения. +// Если переменной нет или парсинг не удался — возвращает default. +func getenvInt(key string, def int) int { + val := os.Getenv(key) + if val == "" { + return def + } + parsed, err := strconv.Atoi(val) + if err != nil { + return def + } + return parsed +} + +// buildAMQPURL строит строку подключения к RabbitMQ. +// Сначала пытаемся взять AMQP_URL, затем собираем из RABBIT_*. +// Возвращает пустую строку, если недостаточно данных. +func buildAMQPURL() string { + if url := os.Getenv("AMQP_URL"); url != "" { + return url + } + + host := getenv("RABBIT_HOST", "") + if host == "" { + return "" + } + port := getenv("RABBIT_PORT", "5672") + user := getenv("RABBIT_USER", "") + pass := getenv("RABBIT_PASSWORD", "") + vhost := getenv("RABBIT_VHOST", "/") + + if user == "" || pass == "" { + return "" + } + + return fmt.Sprintf("amqp://%s:%s@%s:%s%s", user, pass, host, port, vhost) +} + +// buildPgConnString формирует строку подключения к Postgres. +// Предпочтительно использовать DATABASE_URL, иначе PG* переменные. +func buildPgConnString() (string, error) { + if url := os.Getenv("DATABASE_URL"); url != "" { + return url, nil + } + + host := getenv("PGHOST", "") + if host == "" { + return "", errors.New("missing PGHOST or DATABASE_URL") + } + port := getenv("PGPORT", "5432") + user := getenv("PGUSER", "") + pass := getenv("PGPASSWORD", "") + db := getenv("PGDATABASE", "postgres") + ssl := getenv("PGSSLMODE", "require") + + if user == "" || pass == "" { + return "", errors.New("missing PGUSER/PGPASSWORD or DATABASE_URL") + } + + return fmt.Sprintf( + "postgresql://%s:%s@%s:%s/%s?sslmode=%s", + user, + pass, + host, + port, + db, + ssl, + ), nil +} + +// ensureTable гарантирует наличие таблицы (минимальный демо-скелет). +// Это удобно для старта, чтобы воркер не падал на пустой БД. +func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error { + query := fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())", + table, + ) + _, err := pool.Exec(ctx, query) + return err +} + +// parseMessage разбирает сообщение из очереди. +// Если JSON не распарсился — считаем, что это create с текстом в теле. +func parseMessage(body []byte) Message { + msg := Message{Action: "create"} + if err := json.Unmarshal(body, &msg); err != nil { + msg.Text = strings.TrimSpace(string(body)) + if msg.Text == "" { + msg.Text = "(empty)" + } + return msg + } + if msg.Action == "" { + msg.Action = "create" + } + return msg +} + +// isPermanentError определяет ошибки, при которых сообщение стоит отбрасывать +// (например, некорректный формат). +func isPermanentError(err error) bool { + return errors.Is(err, errInvalidMessage) +} + +// errInvalidMessage — признак некорректной команды (не повторяем обработку). +var errInvalidMessage = errors.New("invalid message") + +// handleMessage выполняет CRUD операцию в Postgres. +// Таймаут нужен, чтобы зависшие запросы не блокировали воркер. +func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Message) error { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + switch strings.ToLower(msg.Action) { + case "create": + text := msg.Text + if text == "" { + text = "(empty)" + } + query := fmt.Sprintf("INSERT INTO %s (test_data) VALUES ($1)", table) + _, err := pool.Exec(ctx, query, text) + return err + case "update": + if msg.ID == nil { + return errInvalidMessage + } + query := fmt.Sprintf("UPDATE %s SET test_data=$1 WHERE id=$2", table) + _, err := pool.Exec(ctx, query, msg.Text, *msg.ID) + return err + case "delete": + if msg.ID == nil { + return errInvalidMessage + } + query := fmt.Sprintf("DELETE FROM %s WHERE id=$1", table) + _, err := pool.Exec(ctx, query, *msg.ID) + return err + default: + return errInvalidMessage + } +} + +// main инициализирует соединения, подписывается на очереди и +// обрабатывает сообщения в бесконечном цикле до SIGINT/SIGTERM. +func main() { + amqpURL := buildAMQPURL() + if amqpURL == "" { + log.Fatal("missing AMQP_URL or RABBIT_* envs") + } + + pgConn, err := buildPgConnString() + if err != nil { + log.Fatal(err) + } + + // Можно слушать несколько очередей, разделенных запятой. + queues := strings.Split(getenv("RABBIT_QUEUES", "crud_queue"), ",") + for i := range queues { + queues[i] = strings.TrimSpace(queues[i]) + } + queueDurable := getenv("RABBIT_DURABLE", "true") == "true" + // Prefetch ограничивает количество непроцессed сообщений. + prefetch := getenvInt("RABBIT_PREFETCH", 1) + // Повторная постановка сообщения в очередь при ошибке. + requeueOnError := getenv("REQUEUE_ON_ERROR", "true") == "true" + // Таблица в Postgres для операций воркера. + table := getenv("PG_TABLE", "nubes_test_table") + + // Грейсфул-шатдаун по сигналам ОС. + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + pool, err := pgxpool.New(ctx, pgConn) + if err != nil { + log.Fatalf("pg pool: %v", err) + } + defer pool.Close() + + if err := pool.Ping(ctx); err != nil { + log.Fatalf("pg ping: %v", err) + } + if err := ensureTable(ctx, pool, table); err != nil { + log.Fatalf("ensure table: %v", err) + } + + conn, err := amqp.Dial(amqpURL) + if err != nil { + log.Fatalf("amqp dial: %v", err) + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + log.Fatalf("amqp channel: %v", err) + } + defer ch.Close() + + // QoS: ограничиваем количество сообщений, которые можно взять без ack. + if err := ch.Qos(prefetch, 0, false); err != nil { + log.Fatalf("amqp qos: %v", err) + } + + // Объявляем очереди, чтобы воркер мог стартовать даже в пустой Rabbit. + for _, q := range queues { + if q == "" { + continue + } + _, err := ch.QueueDeclare( + q, + queueDurable, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatalf("queue declare %s: %v", q, err) + } + } + + // Канал, в который складываем сообщения из всех очередей. + msgs := make(chan amqp.Delivery) + for _, q := range queues { + if q == "" { + continue + } + delivery, err := ch.Consume( + q, + "", + false, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatalf("consume %s: %v", q, err) + } + + // Отдельная горутина на каждую очередь. + go func(d <-chan amqp.Delivery) { + for m := range d { + msgs <- m + } + }(delivery) + } + + log.Printf("worker started: queues=%v table=%s", queues, table) + + for { + select { + case <-ctx.Done(): + log.Println("shutdown") + return + case m := <-msgs: + msg := parseMessage(m.Body) + if err := handleMessage(ctx, pool, table, msg); err != nil { + if isPermanentError(err) { + // Некорректное сообщение — отбрасываем без повторной попытки. + _ = m.Nack(false, false) + log.Printf("reject message: %v", err) + continue + } + // Временная ошибка — можно вернуть сообщение в очередь. + _ = m.Nack(false, requeueOnError) + log.Printf("error handling message: %v", err) + continue + } + // Успешная обработка — подтверждаем сообщение. + _ = m.Ack(false) + } + } +}