rabbit-worker/main.go

319 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
)
// Message описывает формат команды из очереди.
//
// Поддерживаемые действия:
// - create: вставка новой строки
// - update: обновление строки по id
// - delete: удаление строки по id
//
// Если тело сообщения не JSON, оно трактуется как create,
// а весь текст — это содержимое записи.
type Message struct {
Action string `json:"action"`
ID *int64 `json:"id,omitempty"`
Text string `json:"text,omitempty"`
Source string `json:"source,omitempty"`
RequestID string `json:"request_id,omitempty"`
}
// getenv возвращает значение переменной окружения
// или значение по умолчанию, если переменная не задана.
func getenv(key, def string) string {
val := os.Getenv(key)
if val == "" {
return def
}
return val
}
// getenvInt пытается распарсить целое значение из переменной окружения.
// Если переменной нет или парсинг не удался — возвращает default.
func getenvInt(key string, def int) int {
val := os.Getenv(key)
if val == "" {
return def
}
parsed, err := strconv.Atoi(val)
if err != nil {
return def
}
return parsed
}
// buildAMQPURL строит строку подключения к RabbitMQ.
// Сначала пытаемся взять AMQP_URL, затем собираем из RABBIT_*.
// Возвращает пустую строку, если недостаточно данных.
func buildAMQPURL() string {
if url := os.Getenv("AMQP_URL"); url != "" {
return url
}
host := getenv("RABBIT_HOST", "")
if host == "" {
return ""
}
port := getenv("RABBIT_PORT", "5672")
user := getenv("RABBIT_USER", "")
pass := getenv("RABBIT_PASSWORD", "")
vhost := getenv("RABBIT_VHOST", "/")
if user == "" || pass == "" {
return ""
}
return fmt.Sprintf("amqp://%s:%s@%s:%s%s", user, pass, host, port, vhost)
}
// buildPgConnString формирует строку подключения к Postgres.
// Предпочтительно использовать DATABASE_URL, иначе PG* переменные.
func buildPgConnString() (string, error) {
if url := os.Getenv("DATABASE_URL"); url != "" {
return url, nil
}
host := getenv("PGHOST", "")
if host == "" {
return "", errors.New("missing PGHOST or DATABASE_URL")
}
port := getenv("PGPORT", "5432")
user := getenv("PGUSER", "")
pass := getenv("PGPASSWORD", "")
db := getenv("PGDATABASE", "postgres")
ssl := getenv("PGSSLMODE", "require")
if user == "" || pass == "" {
return "", errors.New("missing PGUSER/PGPASSWORD or DATABASE_URL")
}
return fmt.Sprintf(
"postgresql://%s:%s@%s:%s/%s?sslmode=%s",
user,
pass,
host,
port,
db,
ssl,
), nil
}
// ensureTable гарантирует наличие таблицы (минимальный демо-скелет).
// Это удобно для старта, чтобы воркер не падал на пустой БД.
func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error {
query := fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())",
table,
)
_, err := pool.Exec(ctx, query)
return err
}
// parseMessage разбирает сообщение из очереди.
// Если JSON не распарсился — считаем, что это create с текстом в теле.
func parseMessage(body []byte) Message {
msg := Message{Action: "create"}
if err := json.Unmarshal(body, &msg); err != nil {
msg.Text = strings.TrimSpace(string(body))
if msg.Text == "" {
msg.Text = "(empty)"
}
return msg
}
if msg.Action == "" {
msg.Action = "create"
}
return msg
}
// isPermanentError определяет ошибки, при которых сообщение стоит отбрасывать
// (например, некорректный формат).
func isPermanentError(err error) bool {
return errors.Is(err, errInvalidMessage)
}
// errInvalidMessage — признак некорректной команды (не повторяем обработку).
var errInvalidMessage = errors.New("invalid message")
// handleMessage выполняет CRUD операцию в Postgres.
// Таймаут нужен, чтобы зависшие запросы не блокировали воркер.
func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Message) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
switch strings.ToLower(msg.Action) {
case "create":
text := msg.Text
if text == "" {
text = "(empty)"
}
query := fmt.Sprintf("INSERT INTO %s (test_data) VALUES ($1)", table)
_, err := pool.Exec(ctx, query, text)
return err
case "update":
if msg.ID == nil {
return errInvalidMessage
}
query := fmt.Sprintf("UPDATE %s SET test_data=$1 WHERE id=$2", table)
_, err := pool.Exec(ctx, query, msg.Text, *msg.ID)
return err
case "delete":
if msg.ID == nil {
return errInvalidMessage
}
query := fmt.Sprintf("DELETE FROM %s WHERE id=$1", table)
_, err := pool.Exec(ctx, query, *msg.ID)
return err
default:
return errInvalidMessage
}
}
// main инициализирует соединения, подписывается на очереди и
// обрабатывает сообщения в бесконечном цикле до SIGINT/SIGTERM.
func main() {
amqpURL := buildAMQPURL()
if amqpURL == "" {
log.Fatal("missing AMQP_URL or RABBIT_* envs")
}
pgConn, err := buildPgConnString()
if err != nil {
log.Fatal(err)
}
// Можно слушать несколько очередей, разделенных запятой.
queues := strings.Split(getenv("RABBIT_QUEUES", "crud_queue"), ",")
for i := range queues {
queues[i] = strings.TrimSpace(queues[i])
}
queueDurable := getenv("RABBIT_DURABLE", "true") == "true"
// Prefetch ограничивает количество непроцессed сообщений.
prefetch := getenvInt("RABBIT_PREFETCH", 1)
// Повторная постановка сообщения в очередь при ошибке.
requeueOnError := getenv("REQUEUE_ON_ERROR", "true") == "true"
// Таблица в Postgres для операций воркера.
table := getenv("PG_TABLE", "nubes_test_table")
// Грейсфул-шатдаун по сигналам ОС.
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
pool, err := pgxpool.New(ctx, pgConn)
if err != nil {
log.Fatalf("pg pool: %v", err)
}
defer pool.Close()
if err := pool.Ping(ctx); err != nil {
log.Fatalf("pg ping: %v", err)
}
if err := ensureTable(ctx, pool, table); err != nil {
log.Fatalf("ensure table: %v", err)
}
conn, err := amqp.Dial(amqpURL)
if err != nil {
log.Fatalf("amqp dial: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("amqp channel: %v", err)
}
defer ch.Close()
// QoS: ограничиваем количество сообщений, которые можно взять без ack.
if err := ch.Qos(prefetch, 0, false); err != nil {
log.Fatalf("amqp qos: %v", err)
}
// Объявляем очереди, чтобы воркер мог стартовать даже в пустой Rabbit.
for _, q := range queues {
if q == "" {
continue
}
_, err := ch.QueueDeclare(
q,
queueDurable,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("queue declare %s: %v", q, err)
}
}
// Канал, в который складываем сообщения из всех очередей.
msgs := make(chan amqp.Delivery)
for _, q := range queues {
if q == "" {
continue
}
delivery, err := ch.Consume(
q,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("consume %s: %v", q, err)
}
// Отдельная горутина на каждую очередь.
go func(d <-chan amqp.Delivery) {
for m := range d {
msgs <- m
}
}(delivery)
}
log.Printf("worker started: queues=%v table=%s", queues, table)
for {
select {
case <-ctx.Done():
log.Println("shutdown")
return
case m := <-msgs:
msg := parseMessage(m.Body)
if err := handleMessage(ctx, pool, table, msg); err != nil {
if isPermanentError(err) {
// Некорректное сообщение — отбрасываем без повторной попытки.
_ = m.Nack(false, false)
log.Printf("reject message: %v", err)
continue
}
// Временная ошибка — можно вернуть сообщение в очередь.
_ = m.Nack(false, requeueOnError)
log.Printf("error handling message: %v", err)
continue
}
// Успешная обработка — подтверждаем сообщение.
_ = m.Ack(false)
}
}
}