diff --git a/Dockerfile b/Dockerfile index 64971c0..a33ffe8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,4 +16,6 @@ RUN apk add --no-cache ca-certificates WORKDIR /app COPY --from=build /out/worker /app/worker +EXPOSE 8080 + CMD ["/app/worker"] diff --git a/go.mod b/go.mod index 4780409..f4366c0 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,3 @@ module gitea-naeel.giteak8s.services.ngcloud.ru/naeel/rabbit-worker go 1.22 - -require ( - github.com/jackc/pgx/v5 v5.5.5 - github.com/rabbitmq/amqp091-go v1.10.0 -) - -require ( - github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/puddle/v2 v2.2.1 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/text v0.14.0 // indirect -) diff --git a/go.sum b/go.sum index 242ae45..e69de29 100644 --- a/go.sum +++ b/go.sum @@ -1,32 +0,0 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= -github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= -github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= -github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= -github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= -github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index aae87c0..15112f0 100644 --- a/main.go +++ b/main.go @@ -1,41 +1,15 @@ package main import ( - "context" - "encoding/json" - "errors" "fmt" "log" + "net/http" "os" - "os/signal" - "strconv" - "strings" - "syscall" - "time" - - "github.com/jackc/pgx/v5/pgxpool" - amqp "github.com/rabbitmq/amqp091-go" ) -// Message описывает формат команды из очереди. -// -// Поддерживаемые действия: -// - create: вставка новой строки -// - update: обновление строки по id -// - delete: удаление строки по id -// -// Если тело сообщения не JSON, оно трактуется как create, -// а весь текст — это содержимое записи. -type Message struct { - Action string `json:"action"` - ID *int64 `json:"id,omitempty"` - Text string `json:"text,omitempty"` - Source string `json:"source,omitempty"` - RequestID string `json:"request_id,omitempty"` -} +// Минимальный воркер для проверки, что nubes_http умеет запускать контейнеры. +// Никакой логики с Rabbit или Postgres нет — только HTTP health endpoint. -// getenv возвращает значение переменной окружения -// или значение по умолчанию, если переменная не задана. func getenv(key, def string) string { val := os.Getenv(key) if val == "" { @@ -44,275 +18,21 @@ func getenv(key, def string) string { return val } -// getenvInt пытается распарсить целое значение из переменной окружения. -// Если переменной нет или парсинг не удался — возвращает default. -func getenvInt(key string, def int) int { - val := os.Getenv(key) - if val == "" { - return def - } - parsed, err := strconv.Atoi(val) - if err != nil { - return def - } - return parsed -} - -// buildAMQPURL строит строку подключения к RabbitMQ. -// Сначала пытаемся взять AMQP_URL, затем собираем из RABBIT_*. -// Возвращает пустую строку, если недостаточно данных. -func buildAMQPURL() string { - if url := os.Getenv("AMQP_URL"); url != "" { - return url - } - - host := getenv("RABBIT_HOST", "") - if host == "" { - return "" - } - port := getenv("RABBIT_PORT", "5672") - user := getenv("RABBIT_USER", "") - pass := getenv("RABBIT_PASSWORD", "") - vhost := getenv("RABBIT_VHOST", "/") - - if user == "" || pass == "" { - return "" - } - - return fmt.Sprintf("amqp://%s:%s@%s:%s%s", user, pass, host, port, vhost) -} - -// buildPgConnString формирует строку подключения к Postgres. -// Предпочтительно использовать DATABASE_URL, иначе PG* переменные. -func buildPgConnString() (string, error) { - if url := os.Getenv("DATABASE_URL"); url != "" { - return url, nil - } - - host := getenv("PGHOST", "") - if host == "" { - return "", errors.New("missing PGHOST or DATABASE_URL") - } - port := getenv("PGPORT", "5432") - user := getenv("PGUSER", "") - pass := getenv("PGPASSWORD", "") - db := getenv("PGDATABASE", "postgres") - ssl := getenv("PGSSLMODE", "require") - - if user == "" || pass == "" { - return "", errors.New("missing PGUSER/PGPASSWORD or DATABASE_URL") - } - - return fmt.Sprintf( - "postgresql://%s:%s@%s:%s/%s?sslmode=%s", - user, - pass, - host, - port, - db, - ssl, - ), nil -} - -// ensureTable гарантирует наличие таблицы (минимальный демо-скелет). -// Это удобно для старта, чтобы воркер не падал на пустой БД. -func ensureTable(ctx context.Context, pool *pgxpool.Pool, table string) error { - query := fmt.Sprintf( - "CREATE TABLE IF NOT EXISTS %s (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())", - table, - ) - _, err := pool.Exec(ctx, query) - return err -} - -// parseMessage разбирает сообщение из очереди. -// Если JSON не распарсился — считаем, что это create с текстом в теле. -func parseMessage(body []byte) Message { - msg := Message{Action: "create"} - if err := json.Unmarshal(body, &msg); err != nil { - msg.Text = strings.TrimSpace(string(body)) - if msg.Text == "" { - msg.Text = "(empty)" - } - return msg - } - if msg.Action == "" { - msg.Action = "create" - } - return msg -} - -// isPermanentError определяет ошибки, при которых сообщение стоит отбрасывать -// (например, некорректный формат). -func isPermanentError(err error) bool { - return errors.Is(err, errInvalidMessage) -} - -// errInvalidMessage — признак некорректной команды (не повторяем обработку). -var errInvalidMessage = errors.New("invalid message") - -// handleMessage выполняет CRUD операцию в Postgres. -// Таймаут нужен, чтобы зависшие запросы не блокировали воркер. -func handleMessage(ctx context.Context, pool *pgxpool.Pool, table string, msg Message) error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - switch strings.ToLower(msg.Action) { - case "create": - text := msg.Text - if text == "" { - text = "(empty)" - } - query := fmt.Sprintf("INSERT INTO %s (test_data) VALUES ($1)", table) - _, err := pool.Exec(ctx, query, text) - return err - case "update": - if msg.ID == nil { - return errInvalidMessage - } - query := fmt.Sprintf("UPDATE %s SET test_data=$1 WHERE id=$2", table) - _, err := pool.Exec(ctx, query, msg.Text, *msg.ID) - return err - case "delete": - if msg.ID == nil { - return errInvalidMessage - } - query := fmt.Sprintf("DELETE FROM %s WHERE id=$1", table) - _, err := pool.Exec(ctx, query, *msg.ID) - return err - default: - return errInvalidMessage - } -} - -// main инициализирует соединения, подписывается на очереди и -// обрабатывает сообщения в бесконечном цикле до SIGINT/SIGTERM. func main() { - amqpURL := buildAMQPURL() - if amqpURL == "" { - log.Fatal("missing AMQP_URL or RABBIT_* envs") - } + port := getenv("PORT", "8080") - pgConn, err := buildPgConnString() - if err != nil { + http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + http.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("rabbit-worker ready")) + }) + + log.Printf("listening on :%s", port) + if err := http.ListenAndServe(fmt.Sprintf(":%s", port), nil); err != nil { log.Fatal(err) } - - // Можно слушать несколько очередей, разделенных запятой. - queues := strings.Split(getenv("RABBIT_QUEUES", "crud_queue"), ",") - for i := range queues { - queues[i] = strings.TrimSpace(queues[i]) - } - queueDurable := getenv("RABBIT_DURABLE", "true") == "true" - // Prefetch ограничивает количество непроцессed сообщений. - prefetch := getenvInt("RABBIT_PREFETCH", 1) - // Повторная постановка сообщения в очередь при ошибке. - requeueOnError := getenv("REQUEUE_ON_ERROR", "true") == "true" - // Таблица в Postgres для операций воркера. - table := getenv("PG_TABLE", "nubes_test_table") - - // Грейсфул-шатдаун по сигналам ОС. - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() - - pool, err := pgxpool.New(ctx, pgConn) - if err != nil { - log.Fatalf("pg pool: %v", err) - } - defer pool.Close() - - if err := pool.Ping(ctx); err != nil { - log.Fatalf("pg ping: %v", err) - } - if err := ensureTable(ctx, pool, table); err != nil { - log.Fatalf("ensure table: %v", err) - } - - conn, err := amqp.Dial(amqpURL) - if err != nil { - log.Fatalf("amqp dial: %v", err) - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Fatalf("amqp channel: %v", err) - } - defer ch.Close() - - // QoS: ограничиваем количество сообщений, которые можно взять без ack. - if err := ch.Qos(prefetch, 0, false); err != nil { - log.Fatalf("amqp qos: %v", err) - } - - // Объявляем очереди, чтобы воркер мог стартовать даже в пустой Rabbit. - for _, q := range queues { - if q == "" { - continue - } - _, err := ch.QueueDeclare( - q, - queueDurable, - false, - false, - false, - nil, - ) - if err != nil { - log.Fatalf("queue declare %s: %v", q, err) - } - } - - // Канал, в который складываем сообщения из всех очередей. - msgs := make(chan amqp.Delivery) - for _, q := range queues { - if q == "" { - continue - } - delivery, err := ch.Consume( - q, - "", - false, - false, - false, - false, - nil, - ) - if err != nil { - log.Fatalf("consume %s: %v", q, err) - } - - // Отдельная горутина на каждую очередь. - go func(d <-chan amqp.Delivery) { - for m := range d { - msgs <- m - } - }(delivery) - } - - log.Printf("worker started: queues=%v table=%s", queues, table) - - for { - select { - case <-ctx.Done(): - log.Println("shutdown") - return - case m := <-msgs: - msg := parseMessage(m.Body) - if err := handleMessage(ctx, pool, table, msg); err != nil { - if isPermanentError(err) { - // Некорректное сообщение — отбрасываем без повторной попытки. - _ = m.Nack(false, false) - log.Printf("reject message: %v", err) - continue - } - // Временная ошибка — можно вернуть сообщение в очередь. - _ = m.Nack(false, requeueOnError) - log.Printf("error handling message: %v", err) - continue - } - // Успешная обработка — подтверждаем сообщение. - _ = m.Ack(false) - } - } }