rabbit-nodeworker/server.js

265 lines
8.9 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const http = require("http");
const amqp = require("amqplib");
const { Pool } = require("pg");
// Состояние процесса для /healthz и статуса в UI.
const status = {
state: "starting",
error: "",
lastSuccess: "",
};
// Метрики обработки сообщений (показываются на HTML-странице).
const metrics = {
processed: 0,
failed: 0,
lastError: "",
lastMessageAt: "",
lastAction: "",
};
// Унифицированное обновление состояния.
function setStatus(state, error) {
status.state = state;
status.error = error || "";
if (!error) {
status.lastSuccess = new Date().toISOString();
}
}
// HTML-страница со статусом и счетчиками.
function renderStatusPage() {
const queueName = (process.env.RABBIT_QUEUES || "crud_queue").split(",")[0] || "crud_queue";
const tableName = process.env.PG_TABLE || "rabbit_messages";
return `<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Rabbit NodeJS Worker</title>
<style>
body { font-family: Arial, sans-serif; background: #f6f7fb; color: #1c1c1c; margin: 0; }
.wrap { max-width: 900px; margin: 40px auto; padding: 0 20px; }
.card { background: #fff; border-radius: 14px; box-shadow: 0 8px 24px rgba(0,0,0,0.06); padding: 24px; }
.grid { display: grid; grid-template-columns: repeat(2, minmax(0, 1fr)); gap: 16px; }
.stat { padding: 16px; border: 1px solid #e5e7eb; border-radius: 12px; background: #fafbff; }
.label { color: #6b7280; font-size: 12px; text-transform: uppercase; letter-spacing: 0.08em; }
.value { font-size: 20px; font-weight: 700; margin-top: 6px; }
.small { font-size: 12px; color: #6b7280; margin-top: 6px; }
.status { padding: 6px 12px; border-radius: 999px; display: inline-block; font-weight: 700; }
.ok { background: #e7f6ee; color: #1e4620; }
.bad { background: #fdeeee; color: #b42318; }
</style>
</head>
<body>
<div class="wrap">
<div class="card">
<h2>NodeJS Worker</h2>
<div class="small">Queue: ${queueName} | Table: ${tableName}</div>
<div style="margin: 12px 0;">
<span class="status ${status.state === "running" ? "ok" : "bad"}">${status.state}</span>
</div>
<div class="grid">
<div class="stat">
<div class="label">Processed</div>
<div class="value">${metrics.processed}</div>
</div>
<div class="stat">
<div class="label">Failed</div>
<div class="value">${metrics.failed}</div>
</div>
<div class="stat">
<div class="label">Last Action</div>
<div class="value">${metrics.lastAction || "-"}</div>
<div class="small">${metrics.lastMessageAt || ""}</div>
</div>
<div class="stat">
<div class="label">Last Error</div>
<div class="value">${metrics.lastError || "-"}</div>
<div class="small">${status.lastSuccess || ""}</div>
</div>
</div>
</div>
</div>
</body>
</html>`;
}
// Подготовка конфигурации PG (DATABASE_URL имеет приоритет).
function buildPgConfig() {
if (process.env.DATABASE_URL) {
return {
connectionString: process.env.DATABASE_URL,
ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined,
};
}
return {
host: process.env.PGHOST,
port: Number(process.env.PGPORT || "5432"),
user: process.env.PGUSER,
password: process.env.PGPASSWORD,
database: process.env.PGDATABASE || "postgres",
ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined,
};
}
// Собираем AMQP URL из переменных окружения.
function buildAmqpUrl() {
if (process.env.AMQP_URL) {
return process.env.AMQP_URL;
}
const host = process.env.RABBIT_HOST || "";
const port = process.env.RABBIT_PORT || "5672";
const user = process.env.RABBIT_USER || "";
const pass = process.env.RABBIT_PASSWORD || "";
const vhost = process.env.RABBIT_VHOST || "/";
if (!host || !user || !pass) {
throw new Error("Missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL");
}
return `amqp://${user}:${pass}@${host}:${port}${vhost}`;
}
const pool = new Pool(buildPgConfig());
// Минимальная таблица для демо, если БД пустая.
async function ensureTable(table) {
await pool.query(
`CREATE TABLE IF NOT EXISTS ${table} (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())`
);
}
// Парсим JSON, нормализуем ключи, иначе считаем текстом.
function parseMessage(buffer) {
const body = buffer.toString("utf8");
try {
const parsed = JSON.parse(body);
const normalized = Object.keys(parsed || {}).reduce((acc, key) => {
acc[key.toLowerCase()] = parsed[key];
return acc;
}, {});
return {
action: normalized.action || "create",
id: normalized.id,
text: normalized.text,
requestId: normalized.request_id || normalized.requestid,
};
} catch {
return {
action: "create",
text: body.trim() || "(empty)",
};
}
}
// CRUD-операции по сообщению.
async function handleMessage(table, msg) {
const action = (msg.action || "create").toLowerCase();
if (action === "create") {
const text = msg.text || "(empty)";
await pool.query(`INSERT INTO ${table} (test_data) VALUES ($1)`, [text]);
return;
}
if (action === "update") {
if (!msg.id) {
throw new Error("missing id for update");
}
await pool.query(`UPDATE ${table} SET test_data=$1 WHERE id=$2`, [msg.text || "", msg.id]);
return;
}
if (action === "delete") {
if (!msg.id) {
throw new Error("missing id for delete");
}
await pool.query(`DELETE FROM ${table} WHERE id=$1`, [msg.id]);
return;
}
throw new Error("invalid action");
}
// Небольшой HTTP-сервер для /healthz и HTML-статуса.
function startHttpServer() {
const port = process.env.PORT || 3000;
const server = http.createServer((req, res) => {
if (req.url === "/healthz") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(status));
return;
}
res.writeHead(200, { "Content-Type": "text/html; charset=utf-8" });
res.end(renderStatusPage());
});
server.listen(port, () => {
console.log(`health server listening on ${port}`);
});
}
// Основной цикл: подключение к RabbitMQ и обработка сообщений.
async function consumeLoop() {
const table = process.env.PG_TABLE || "rabbit_messages";
const queueName = (process.env.RABBIT_QUEUES || "crud_queue")
.split(",")
.map((q) => q.trim())
.filter(Boolean)[0] || "crud_queue";
const durable = String(process.env.RABBIT_DURABLE || "true").toLowerCase() === "true";
const prefetch = Number(process.env.RABBIT_PREFETCH || "1");
const requeueOnError = String(process.env.REQUEUE_ON_ERROR || "true").toLowerCase() === "true";
// Гарантируем наличие таблицы перед стартом консьюмера.
await ensureTable(table);
let backoffMs = 2000;
while (true) {
try {
const amqpUrl = buildAmqpUrl();
const connection = await amqp.connect(amqpUrl);
const channel = await connection.createChannel();
await channel.prefetch(prefetch);
await channel.assertQueue(queueName, { durable });
setStatus("running", "");
// Обрабатываем сообщения и фиксируем успех/ошибки.
channel.consume(queueName, async (msg) => {
if (!msg) {
return;
}
try {
const parsed = parseMessage(msg.content);
await handleMessage(table, parsed);
metrics.processed += 1;
metrics.lastAction = parsed.action || "create";
metrics.lastMessageAt = new Date().toISOString();
channel.ack(msg);
} catch (err) {
metrics.failed += 1;
metrics.lastError = String(err.message || err);
metrics.lastMessageAt = new Date().toISOString();
if (String(err.message || "").includes("missing id") || String(err.message || "").includes("invalid action")) {
channel.nack(msg, false, false);
return;
}
channel.nack(msg, false, requeueOnError);
}
});
await new Promise((resolve, reject) => {
connection.on("close", resolve);
connection.on("error", reject);
});
} catch (err) {
setStatus("degraded", String(err.message || err));
await new Promise((r) => setTimeout(r, backoffMs));
if (backoffMs < 30000) {
backoffMs *= 2;
}
continue;
}
}
}
startHttpServer();
consumeLoop().catch((err) => {
setStatus("failed", String(err.message || err));
console.error(err);
});