const http = require("http"); const amqp = require("amqplib"); const { Pool } = require("pg"); // Состояние процесса для /healthz и статуса в UI. const status = { state: "starting", error: "", lastSuccess: "", }; // Метрики обработки сообщений (показываются на HTML-странице). const metrics = { processed: 0, failed: 0, lastError: "", lastMessageAt: "", lastAction: "", }; // Унифицированное обновление состояния. function setStatus(state, error) { status.state = state; status.error = error || ""; if (!error) { status.lastSuccess = new Date().toISOString(); } } // HTML-страница со статусом и счетчиками. function renderStatusPage() { const queueName = (process.env.RABBIT_QUEUES || "crud_queue").split(",")[0] || "crud_queue"; const tableName = process.env.PG_TABLE || "rabbit_messages"; return ` Rabbit NodeJS Worker

NodeJS Worker

Queue: ${queueName} | Table: ${tableName}
${status.state}
Processed
${metrics.processed}
Failed
${metrics.failed}
Last Action
${metrics.lastAction || "-"}
${metrics.lastMessageAt || ""}
Last Error
${metrics.lastError || "-"}
${status.lastSuccess || ""}
`; } // Подготовка конфигурации PG (DATABASE_URL имеет приоритет). function buildPgConfig() { if (process.env.DATABASE_URL) { return { connectionString: process.env.DATABASE_URL, ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined, }; } return { host: process.env.PGHOST, port: Number(process.env.PGPORT || "5432"), user: process.env.PGUSER, password: process.env.PGPASSWORD, database: process.env.PGDATABASE || "postgres", ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined, }; } // Собираем AMQP URL из переменных окружения. function buildAmqpUrl() { if (process.env.AMQP_URL) { return process.env.AMQP_URL; } const host = process.env.RABBIT_HOST || ""; const port = process.env.RABBIT_PORT || "5672"; const user = process.env.RABBIT_USER || ""; const pass = process.env.RABBIT_PASSWORD || ""; const vhost = process.env.RABBIT_VHOST || "/"; if (!host || !user || !pass) { throw new Error("Missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL"); } return `amqp://${user}:${pass}@${host}:${port}${vhost}`; } const pool = new Pool(buildPgConfig()); // Минимальная таблица для демо, если БД пустая. async function ensureTable(table) { await pool.query( `CREATE TABLE IF NOT EXISTS ${table} (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())` ); } // Парсим JSON, нормализуем ключи, иначе считаем текстом. function parseMessage(buffer) { const body = buffer.toString("utf8"); try { const parsed = JSON.parse(body); const normalized = Object.keys(parsed || {}).reduce((acc, key) => { acc[key.toLowerCase()] = parsed[key]; return acc; }, {}); return { action: normalized.action || "create", id: normalized.id, text: normalized.text, requestId: normalized.request_id || normalized.requestid, }; } catch { return { action: "create", text: body.trim() || "(empty)", }; } } // CRUD-операции по сообщению. async function handleMessage(table, msg) { const action = (msg.action || "create").toLowerCase(); if (action === "create") { const text = msg.text || "(empty)"; await pool.query(`INSERT INTO ${table} (test_data) VALUES ($1)`, [text]); return; } if (action === "update") { if (!msg.id) { throw new Error("missing id for update"); } await pool.query(`UPDATE ${table} SET test_data=$1 WHERE id=$2`, [msg.text || "", msg.id]); return; } if (action === "delete") { if (!msg.id) { throw new Error("missing id for delete"); } await pool.query(`DELETE FROM ${table} WHERE id=$1`, [msg.id]); return; } throw new Error("invalid action"); } // Небольшой HTTP-сервер для /healthz и HTML-статуса. function startHttpServer() { const port = process.env.PORT || 3000; const server = http.createServer((req, res) => { if (req.url === "/healthz") { res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify(status)); return; } res.writeHead(200, { "Content-Type": "text/html; charset=utf-8" }); res.end(renderStatusPage()); }); server.listen(port, () => { console.log(`health server listening on ${port}`); }); } // Основной цикл: подключение к RabbitMQ и обработка сообщений. async function consumeLoop() { const table = process.env.PG_TABLE || "rabbit_messages"; const queueName = (process.env.RABBIT_QUEUES || "crud_queue") .split(",") .map((q) => q.trim()) .filter(Boolean)[0] || "crud_queue"; const durable = String(process.env.RABBIT_DURABLE || "true").toLowerCase() === "true"; const prefetch = Number(process.env.RABBIT_PREFETCH || "1"); const requeueOnError = String(process.env.REQUEUE_ON_ERROR || "true").toLowerCase() === "true"; // Гарантируем наличие таблицы перед стартом консьюмера. await ensureTable(table); let backoffMs = 2000; while (true) { try { const amqpUrl = buildAmqpUrl(); const connection = await amqp.connect(amqpUrl); const channel = await connection.createChannel(); await channel.prefetch(prefetch); await channel.assertQueue(queueName, { durable }); setStatus("running", ""); // Обрабатываем сообщения и фиксируем успех/ошибки. channel.consume(queueName, async (msg) => { if (!msg) { return; } try { const parsed = parseMessage(msg.content); await handleMessage(table, parsed); metrics.processed += 1; metrics.lastAction = parsed.action || "create"; metrics.lastMessageAt = new Date().toISOString(); channel.ack(msg); } catch (err) { metrics.failed += 1; metrics.lastError = String(err.message || err); metrics.lastMessageAt = new Date().toISOString(); if (String(err.message || "").includes("missing id") || String(err.message || "").includes("invalid action")) { channel.nack(msg, false, false); return; } channel.nack(msg, false, requeueOnError); } }); await new Promise((resolve, reject) => { connection.on("close", resolve); connection.on("error", reject); }); } catch (err) { setStatus("degraded", String(err.message || err)); await new Promise((r) => setTimeout(r, backoffMs)); if (backoffMs < 30000) { backoffMs *= 2; } continue; } } } startHttpServer(); consumeLoop().catch((err) => { setStatus("failed", String(err.message || err)); console.error(err); });