rabbit-worker/main.go

342 lines
8.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
)
// Воркер держит HTTP /healthz и параллельно слушает RabbitMQ,
// записывая события в Postgres. Даже при ошибках контейнер остается живым.
type Message struct {
Action string `json:"action"`
ID *int64 `json:"id,omitempty"`
Text string `json:"text,omitempty"`
Source string `json:"source,omitempty"`
RequestID string `json:"request_id,omitempty"`
}
type workerStatus struct {
mu sync.Mutex
state string
lastError string
lastSuccess time.Time
}
func (s *workerStatus) set(state, err string) {
s.mu.Lock()
defer s.mu.Unlock()
s.state = state
s.lastError = err
if err == "" {
s.lastSuccess = time.Now()
}
}
func (s *workerStatus) snapshot() map[string]string {
s.mu.Lock()
defer s.mu.Unlock()
result := map[string]string{
"state": s.state,
"error": s.lastError,
"timestamp": s.lastSuccess.Format(time.RFC3339),
}
return result
}
// getenv читает переменную окружения с дефолтом.
func getenv(key, def string) string {
val := os.Getenv(key)
if val == "" {
return def
}
return val
}
// getenvInt читает int-переменную окружения с дефолтом.
func getenvInt(key string, def int) int {
val := os.Getenv(key)
if val == "" {
return def
}
parsed, err := strconv.Atoi(val)
if err != nil {
return def
}
return parsed
}
// buildAMQPURL собирает URL для RabbitMQ.
func buildAMQPURL() (string, error) {
if url := os.Getenv("AMQP_URL"); url != "" {
return url, nil
}
host := getenv("RABBIT_HOST", "")
port := getenv("RABBIT_PORT", "5672")
user := getenv("RABBIT_USER", "")
pass := getenv("RABBIT_PASSWORD", "")
vhost := getenv("RABBIT_VHOST", "/")
if host == "" || user == "" || pass == "" {
return "", errors.New("missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL")
}
return fmt.Sprintf("amqp://%s:%s@%s:%s%s", user, pass, host, port, vhost), nil
}
// buildPgConnString собирает строку подключения к Postgres.
func buildPgConnString() (string, error) {
if url := os.Getenv("DATABASE_URL"); url != "" {
return url, nil
}
host := getenv("PGHOST", "")
if host == "" {
return "", errors.New("missing PGHOST or DATABASE_URL")
}
port := getenv("PGPORT", "5432")
user := getenv("PGUSER", "")
pass := getenv("PGPASSWORD", "")
db := getenv("PGDATABASE", "postgres")
ssl := getenv("PGSSLMODE", "require")
if user == "" || pass == "" {
return "", errors.New("missing PGUSER/PGPASSWORD or DATABASE_URL")
}
return fmt.Sprintf(
"postgresql://%s:%s@%s:%s/%s?sslmode=%s",
user,
pass,
host,
port,
db,
ssl,
), nil
}
// ensureTable гарантирует наличие таблицы для демо.
func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error {
query := fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())",
table,
)
_, err := pool.Exec(ctx, query)
return err
}
// parseMessage принимает JSON и fallback-ит в текст.
func parseMessage(body []byte) Message {
msg := Message{Action: "create"}
if err := json.Unmarshal(body, &msg); err != nil {
msg.Text = strings.TrimSpace(string(body))
if msg.Text == "" {
msg.Text = "(empty)"
}
return msg
}
if msg.Action == "" {
msg.Action = "create"
}
return msg
}
var errInvalidMessage = errors.New("invalid message")
// handleMessage выполняет CRUD над таблицей.
func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Message) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
switch strings.ToLower(msg.Action) {
case "create":
text := msg.Text
if text == "" {
text = "(empty)"
}
query := fmt.Sprintf("INSERT INTO %s (test_data) VALUES ($1)", table)
_, err := pool.Exec(ctx, query, text)
return err
case "update":
if msg.ID == nil {
return errInvalidMessage
}
query := fmt.Sprintf("UPDATE %s SET test_data=$1 WHERE id=$2", table)
_, err := pool.Exec(ctx, query, msg.Text, *msg.ID)
return err
case "delete":
if msg.ID == nil {
return errInvalidMessage
}
query := fmt.Sprintf("DELETE FROM %s WHERE id=$1", table)
_, err := pool.Exec(ctx, query, *msg.ID)
return err
default:
return errInvalidMessage
}
}
// startHTTPServer поднимает /healthz и базовый ответ на /.
func startHTTPServer(port string, status *workerStatus) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
payload, _ := json.Marshal(status.snapshot())
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(payload)
})
http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("rabbit-worker ready"))
})
log.Printf("listening on :%s", port)
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), nil); err != nil {
log.Fatal(err)
}
}
// runWorker подключается к Rabbit и обрабатывает сообщения.
func runWorker(ctx context.Context, status *workerStatus) error {
amqpURL, err := buildAMQPURL()
if err != nil {
return err
}
pgConn, err := buildPgConnString()
if err != nil {
return err
}
// Список очередей поддерживает несколько значений через запятую.
queues := strings.Split(getenv("RABBIT_QUEUES", "crud_queue"), ",")
for i := range queues {
queues[i] = strings.TrimSpace(queues[i])
}
queueDurable := getenv("RABBIT_DURABLE", "true") == "true"
prefetch := getenvInt("RABBIT_PREFETCH", 1)
requeueOnError := getenv("REQUEUE_ON_ERROR", "true") == "true"
table := getenv("PG_TABLE", "nubes_test_table")
pool, err := pgxpool.New(ctx, pgConn)
if err != nil {
return fmt.Errorf("pg pool: %w", err)
}
defer pool.Close()
if err := pool.Ping(ctx); err != nil {
return fmt.Errorf("pg ping: %w", err)
}
if err := ensureTable(ctx, pool, table); err != nil {
return fmt.Errorf("ensure table: %w", err)
}
conn, err := amqp.Dial(amqpURL)
if err != nil {
return fmt.Errorf("amqp dial: %w", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("amqp channel: %w", err)
}
defer ch.Close()
if err := ch.Qos(prefetch, 0, false); err != nil {
return fmt.Errorf("amqp qos: %w", err)
}
for _, q := range queues {
if q == "" {
continue
}
if _, err := ch.QueueDeclare(q, queueDurable, false, false, false, nil); err != nil {
return fmt.Errorf("queue declare %s: %w", q, err)
}
}
// Сливаем несколько очередей в один канал сообщений.
msgs := make(chan amqp.Delivery)
for _, q := range queues {
if q == "" {
continue
}
delivery, err := ch.Consume(q, "", false, false, false, false, nil)
if err != nil {
return fmt.Errorf("consume %s: %w", q, err)
}
go func(d <-chan amqp.Delivery) {
for m := range d {
msgs <- m
}
}(delivery)
}
status.set("running", "")
for {
select {
case <-ctx.Done():
return nil
case m := <-msgs:
msg := parseMessage(m.Body)
if err := handleMessage(ctx, pool, table, msg); err != nil {
if errors.Is(err, errInvalidMessage) {
_ = m.Nack(false, false)
log.Printf("reject message: %v", err)
continue
}
_ = m.Nack(false, requeueOnError)
log.Printf("error handling message: %v", err)
continue
}
_ = m.Ack(false)
}
}
}
func main() {
port := getenv("PORT", "8080")
status := &workerStatus{state: "starting"}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
go startHTTPServer(port, status)
// Экспоненциальный backoff при ошибках подключения.
backoff := 2 * time.Second
for {
if ctx.Err() != nil {
return
}
if err := runWorker(ctx, status); err != nil {
status.set("degraded", err.Error())
log.Printf("worker error: %v", err)
time.Sleep(backoff)
if backoff < 30*time.Second {
backoff *= 2
}
continue
}
status.set("stopped", "")
return
}
}