From a5493c73756cb06bf3dd55f25bb59f2881fadb6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNaeel=E2=80=9D?= Date: Mon, 23 Feb 2026 20:08:48 +0400 Subject: [PATCH] Add Russian inline comments --- README.md | 4 ++++ server.js | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/README.md b/README.md index 91ceeaa..6a4238f 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Consumes CRUD messages from RabbitMQ and writes to Postgres. +Комментарий: воркер специально пишет в таблицу демо и не хранит собственное состояние, кроме метрик в памяти. + ## Environment variables RabbitMQ: @@ -20,3 +22,5 @@ Postgres: ## Notes Health endpoint: `/healthz`. + +HTML-страница со статусом доступна на `/`. diff --git a/server.js b/server.js index 3ccf111..e92e1aa 100644 --- a/server.js +++ b/server.js @@ -2,12 +2,14 @@ const http = require("http"); const amqp = require("amqplib"); const { Pool } = require("pg"); +// Состояние процесса для /healthz и статуса в UI. const status = { state: "starting", error: "", lastSuccess: "", }; +// Метрики обработки сообщений (показываются на HTML-странице). const metrics = { processed: 0, failed: 0, @@ -16,6 +18,7 @@ const metrics = { lastAction: "", }; +// Унифицированное обновление состояния. function setStatus(state, error) { status.state = state; status.error = error || ""; @@ -24,6 +27,7 @@ function setStatus(state, error) { } } +// HTML-страница со статусом и счетчиками. function renderStatusPage() { const queueName = (process.env.RABBIT_QUEUES || "crud_queue").split(",")[0] || "crud_queue"; const tableName = process.env.PG_TABLE || "rabbit_messages"; @@ -80,6 +84,7 @@ function renderStatusPage() { `; } +// Подготовка конфигурации PG (DATABASE_URL имеет приоритет). function buildPgConfig() { if (process.env.DATABASE_URL) { return { @@ -98,6 +103,7 @@ function buildPgConfig() { }; } +// Собираем AMQP URL из переменных окружения. function buildAmqpUrl() { if (process.env.AMQP_URL) { return process.env.AMQP_URL; @@ -115,12 +121,14 @@ function buildAmqpUrl() { const pool = new Pool(buildPgConfig()); +// Минимальная таблица для демо, если БД пустая. async function ensureTable(table) { await pool.query( `CREATE TABLE IF NOT EXISTS ${table} (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())` ); } +// Парсим JSON, нормализуем ключи, иначе считаем текстом. function parseMessage(buffer) { const body = buffer.toString("utf8"); try { @@ -143,6 +151,7 @@ function parseMessage(buffer) { } } +// CRUD-операции по сообщению. async function handleMessage(table, msg) { const action = (msg.action || "create").toLowerCase(); if (action === "create") { @@ -167,6 +176,7 @@ async function handleMessage(table, msg) { throw new Error("invalid action"); } +// Небольшой HTTP-сервер для /healthz и HTML-статуса. function startHttpServer() { const port = process.env.PORT || 3000; const server = http.createServer((req, res) => { @@ -183,6 +193,7 @@ function startHttpServer() { }); } +// Основной цикл: подключение к RabbitMQ и обработка сообщений. async function consumeLoop() { const table = process.env.PG_TABLE || "rabbit_messages"; const queueName = (process.env.RABBIT_QUEUES || "crud_queue") @@ -193,6 +204,7 @@ async function consumeLoop() { const prefetch = Number(process.env.RABBIT_PREFETCH || "1"); const requeueOnError = String(process.env.REQUEUE_ON_ERROR || "true").toLowerCase() === "true"; + // Гарантируем наличие таблицы перед стартом консьюмера. await ensureTable(table); let backoffMs = 2000; @@ -206,6 +218,7 @@ async function consumeLoop() { setStatus("running", ""); + // Обрабатываем сообщения и фиксируем успех/ошибки. channel.consume(queueName, async (msg) => { if (!msg) { return;