330 lines
7.3 KiB
Go
330 lines
7.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
// Воркер держит HTTP /healthz и параллельно слушает RabbitMQ,
|
|
// записывая события в Postgres. Даже при ошибках контейнер остается живым.
|
|
|
|
type Message struct {
|
|
Action string `json:"action"`
|
|
ID *int64 `json:"id,omitempty"`
|
|
Text string `json:"text,omitempty"`
|
|
Source string `json:"source,omitempty"`
|
|
RequestID string `json:"request_id,omitempty"`
|
|
}
|
|
|
|
type workerStatus struct {
|
|
mu sync.Mutex
|
|
state string
|
|
lastError string
|
|
lastSuccess time.Time
|
|
}
|
|
|
|
func (s *workerStatus) set(state, err string) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.state = state
|
|
s.lastError = err
|
|
if err == "" {
|
|
s.lastSuccess = time.Now()
|
|
}
|
|
}
|
|
|
|
func (s *workerStatus) snapshot() map[string]string {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
result := map[string]string{
|
|
"state": s.state,
|
|
"error": s.lastError,
|
|
"timestamp": s.lastSuccess.Format(time.RFC3339),
|
|
}
|
|
return result
|
|
}
|
|
|
|
func getenv(key, def string) string {
|
|
val := os.Getenv(key)
|
|
if val == "" {
|
|
return def
|
|
}
|
|
return val
|
|
}
|
|
|
|
func getenvInt(key string, def int) int {
|
|
val := os.Getenv(key)
|
|
if val == "" {
|
|
return def
|
|
}
|
|
parsed, err := strconv.Atoi(val)
|
|
if err != nil {
|
|
return def
|
|
}
|
|
return parsed
|
|
}
|
|
|
|
func buildAMQPURL() (string, error) {
|
|
if url := os.Getenv("AMQP_URL"); url != "" {
|
|
return url, nil
|
|
}
|
|
|
|
host := getenv("RABBIT_HOST", "")
|
|
port := getenv("RABBIT_PORT", "5672")
|
|
user := getenv("RABBIT_USER", "")
|
|
pass := getenv("RABBIT_PASSWORD", "")
|
|
vhost := getenv("RABBIT_VHOST", "/")
|
|
|
|
if host == "" || user == "" || pass == "" {
|
|
return "", errors.New("missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL")
|
|
}
|
|
|
|
return fmt.Sprintf("amqp://%s:%s@%s:%s%s", user, pass, host, port, vhost), nil
|
|
}
|
|
|
|
func buildPgConnString() (string, error) {
|
|
if url := os.Getenv("DATABASE_URL"); url != "" {
|
|
return url, nil
|
|
}
|
|
|
|
host := getenv("PGHOST", "")
|
|
if host == "" {
|
|
return "", errors.New("missing PGHOST or DATABASE_URL")
|
|
}
|
|
port := getenv("PGPORT", "5432")
|
|
user := getenv("PGUSER", "")
|
|
pass := getenv("PGPASSWORD", "")
|
|
db := getenv("PGDATABASE", "postgres")
|
|
ssl := getenv("PGSSLMODE", "require")
|
|
|
|
if user == "" || pass == "" {
|
|
return "", errors.New("missing PGUSER/PGPASSWORD or DATABASE_URL")
|
|
}
|
|
|
|
return fmt.Sprintf(
|
|
"postgresql://%s:%s@%s:%s/%s?sslmode=%s",
|
|
user,
|
|
pass,
|
|
host,
|
|
port,
|
|
db,
|
|
ssl,
|
|
), nil
|
|
}
|
|
|
|
func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error {
|
|
query := fmt.Sprintf(
|
|
"CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())",
|
|
table,
|
|
)
|
|
_, err := pool.Exec(ctx, query)
|
|
return err
|
|
}
|
|
|
|
func parseMessage(body []byte) Message {
|
|
msg := Message{Action: "create"}
|
|
if err := json.Unmarshal(body, &msg); err != nil {
|
|
msg.Text = strings.TrimSpace(string(body))
|
|
if msg.Text == "" {
|
|
msg.Text = "(empty)"
|
|
}
|
|
return msg
|
|
}
|
|
if msg.Action == "" {
|
|
msg.Action = "create"
|
|
}
|
|
return msg
|
|
}
|
|
|
|
var errInvalidMessage = errors.New("invalid message")
|
|
|
|
func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Message) error {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
switch strings.ToLower(msg.Action) {
|
|
case "create":
|
|
text := msg.Text
|
|
if text == "" {
|
|
text = "(empty)"
|
|
}
|
|
query := fmt.Sprintf("INSERT INTO %s (test_data) VALUES ($1)", table)
|
|
_, err := pool.Exec(ctx, query, text)
|
|
return err
|
|
case "update":
|
|
if msg.ID == nil {
|
|
return errInvalidMessage
|
|
}
|
|
query := fmt.Sprintf("UPDATE %s SET test_data=$1 WHERE id=$2", table)
|
|
_, err := pool.Exec(ctx, query, msg.Text, *msg.ID)
|
|
return err
|
|
case "delete":
|
|
if msg.ID == nil {
|
|
return errInvalidMessage
|
|
}
|
|
query := fmt.Sprintf("DELETE FROM %s WHERE id=$1", table)
|
|
_, err := pool.Exec(ctx, query, *msg.ID)
|
|
return err
|
|
default:
|
|
return errInvalidMessage
|
|
}
|
|
}
|
|
|
|
func startHTTPServer(port string, status *workerStatus) {
|
|
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
|
|
payload, _ := json.Marshal(status.snapshot())
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(payload)
|
|
})
|
|
|
|
http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("rabbit-worker ready"))
|
|
})
|
|
|
|
log.Printf("listening on :%s", port)
|
|
if err := http.ListenAndServe(fmt.Sprintf(":%s", port), nil); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func runWorker(ctx context.Context, status *workerStatus) error {
|
|
amqpURL, err := buildAMQPURL()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pgConn, err := buildPgConnString()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
queues := strings.Split(getenv("RABBIT_QUEUES", "crud_queue"), ",")
|
|
for i := range queues {
|
|
queues[i] = strings.TrimSpace(queues[i])
|
|
}
|
|
queueDurable := getenv("RABBIT_DURABLE", "true") == "true"
|
|
prefetch := getenvInt("RABBIT_PREFETCH", 1)
|
|
requeueOnError := getenv("REQUEUE_ON_ERROR", "true") == "true"
|
|
table := getenv("PG_TABLE", "nubes_test_table")
|
|
|
|
pool, err := pgxpool.New(ctx, pgConn)
|
|
if err != nil {
|
|
return fmt.Errorf("pg pool: %w", err)
|
|
}
|
|
defer pool.Close()
|
|
|
|
if err := pool.Ping(ctx); err != nil {
|
|
return fmt.Errorf("pg ping: %w", err)
|
|
}
|
|
if err := ensureTable(ctx, pool, table); err != nil {
|
|
return fmt.Errorf("ensure table: %w", err)
|
|
}
|
|
|
|
conn, err := amqp.Dial(amqpURL)
|
|
if err != nil {
|
|
return fmt.Errorf("amqp dial: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return fmt.Errorf("amqp channel: %w", err)
|
|
}
|
|
defer ch.Close()
|
|
|
|
if err := ch.Qos(prefetch, 0, false); err != nil {
|
|
return fmt.Errorf("amqp qos: %w", err)
|
|
}
|
|
|
|
for _, q := range queues {
|
|
if q == "" {
|
|
continue
|
|
}
|
|
if _, err := ch.QueueDeclare(q, queueDurable, false, false, false, nil); err != nil {
|
|
return fmt.Errorf("queue declare %s: %w", q, err)
|
|
}
|
|
}
|
|
|
|
msgs := make(chan amqp.Delivery)
|
|
for _, q := range queues {
|
|
if q == "" {
|
|
continue
|
|
}
|
|
delivery, err := ch.Consume(q, "", false, false, false, false, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("consume %s: %w", q, err)
|
|
}
|
|
|
|
go func(d <-chan amqp.Delivery) {
|
|
for m := range d {
|
|
msgs <- m
|
|
}
|
|
}(delivery)
|
|
}
|
|
|
|
status.set("running", "")
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case m := <-msgs:
|
|
msg := parseMessage(m.Body)
|
|
if err := handleMessage(ctx, pool, table, msg); err != nil {
|
|
if errors.Is(err, errInvalidMessage) {
|
|
_ = m.Nack(false, false)
|
|
log.Printf("reject message: %v", err)
|
|
continue
|
|
}
|
|
_ = m.Nack(false, requeueOnError)
|
|
log.Printf("error handling message: %v", err)
|
|
continue
|
|
}
|
|
_ = m.Ack(false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
port := getenv("PORT", "8080")
|
|
status := &workerStatus{state: "starting"}
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
go startHTTPServer(port, status)
|
|
|
|
backoff := 2 * time.Second
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
if err := runWorker(ctx, status); err != nil {
|
|
status.set("degraded", err.Error())
|
|
log.Printf("worker error: %v", err)
|
|
time.Sleep(backoff)
|
|
if backoff < 30*time.Second {
|
|
backoff *= 2
|
|
}
|
|
continue
|
|
}
|
|
status.set("stopped", "")
|
|
return
|
|
}
|
|
}
|