rabbit-nodeworker/server.js

252 lines
7.7 KiB
JavaScript

const http = require("http");
const amqp = require("amqplib");
const { Pool } = require("pg");
const status = {
state: "starting",
error: "",
lastSuccess: "",
};
const metrics = {
processed: 0,
failed: 0,
lastError: "",
lastMessageAt: "",
lastAction: "",
};
function setStatus(state, error) {
status.state = state;
status.error = error || "";
if (!error) {
status.lastSuccess = new Date().toISOString();
}
}
function renderStatusPage() {
const queueName = (process.env.RABBIT_QUEUES || "crud_queue").split(",")[0] || "crud_queue";
const tableName = process.env.PG_TABLE || "rabbit_messages";
return `<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Rabbit NodeJS Worker</title>
<style>
body { font-family: Arial, sans-serif; background: #f6f7fb; color: #1c1c1c; margin: 0; }
.wrap { max-width: 900px; margin: 40px auto; padding: 0 20px; }
.card { background: #fff; border-radius: 14px; box-shadow: 0 8px 24px rgba(0,0,0,0.06); padding: 24px; }
.grid { display: grid; grid-template-columns: repeat(2, minmax(0, 1fr)); gap: 16px; }
.stat { padding: 16px; border: 1px solid #e5e7eb; border-radius: 12px; background: #fafbff; }
.label { color: #6b7280; font-size: 12px; text-transform: uppercase; letter-spacing: 0.08em; }
.value { font-size: 20px; font-weight: 700; margin-top: 6px; }
.small { font-size: 12px; color: #6b7280; margin-top: 6px; }
.status { padding: 6px 12px; border-radius: 999px; display: inline-block; font-weight: 700; }
.ok { background: #e7f6ee; color: #1e4620; }
.bad { background: #fdeeee; color: #b42318; }
</style>
</head>
<body>
<div class="wrap">
<div class="card">
<h2>NodeJS Worker</h2>
<div class="small">Queue: ${queueName} | Table: ${tableName}</div>
<div style="margin: 12px 0;">
<span class="status ${status.state === "running" ? "ok" : "bad"}">${status.state}</span>
</div>
<div class="grid">
<div class="stat">
<div class="label">Processed</div>
<div class="value">${metrics.processed}</div>
</div>
<div class="stat">
<div class="label">Failed</div>
<div class="value">${metrics.failed}</div>
</div>
<div class="stat">
<div class="label">Last Action</div>
<div class="value">${metrics.lastAction || "-"}</div>
<div class="small">${metrics.lastMessageAt || ""}</div>
</div>
<div class="stat">
<div class="label">Last Error</div>
<div class="value">${metrics.lastError || "-"}</div>
<div class="small">${status.lastSuccess || ""}</div>
</div>
</div>
</div>
</div>
</body>
</html>`;
}
function buildPgConfig() {
if (process.env.DATABASE_URL) {
return {
connectionString: process.env.DATABASE_URL,
ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined,
};
}
return {
host: process.env.PGHOST,
port: Number(process.env.PGPORT || "5432"),
user: process.env.PGUSER,
password: process.env.PGPASSWORD,
database: process.env.PGDATABASE || "postgres",
ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined,
};
}
function buildAmqpUrl() {
if (process.env.AMQP_URL) {
return process.env.AMQP_URL;
}
const host = process.env.RABBIT_HOST || "";
const port = process.env.RABBIT_PORT || "5672";
const user = process.env.RABBIT_USER || "";
const pass = process.env.RABBIT_PASSWORD || "";
const vhost = process.env.RABBIT_VHOST || "/";
if (!host || !user || !pass) {
throw new Error("Missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL");
}
return `amqp://${user}:${pass}@${host}:${port}${vhost}`;
}
const pool = new Pool(buildPgConfig());
async function ensureTable(table) {
await pool.query(
`CREATE TABLE IF NOT EXISTS ${table} (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())`
);
}
function parseMessage(buffer) {
const body = buffer.toString("utf8");
try {
const parsed = JSON.parse(body);
const normalized = Object.keys(parsed || {}).reduce((acc, key) => {
acc[key.toLowerCase()] = parsed[key];
return acc;
}, {});
return {
action: normalized.action || "create",
id: normalized.id,
text: normalized.text,
requestId: normalized.request_id || normalized.requestid,
};
} catch {
return {
action: "create",
text: body.trim() || "(empty)",
};
}
}
async function handleMessage(table, msg) {
const action = (msg.action || "create").toLowerCase();
if (action === "create") {
const text = msg.text || "(empty)";
await pool.query(`INSERT INTO ${table} (test_data) VALUES ($1)`, [text]);
return;
}
if (action === "update") {
if (!msg.id) {
throw new Error("missing id for update");
}
await pool.query(`UPDATE ${table} SET test_data=$1 WHERE id=$2`, [msg.text || "", msg.id]);
return;
}
if (action === "delete") {
if (!msg.id) {
throw new Error("missing id for delete");
}
await pool.query(`DELETE FROM ${table} WHERE id=$1`, [msg.id]);
return;
}
throw new Error("invalid action");
}
function startHttpServer() {
const port = process.env.PORT || 3000;
const server = http.createServer((req, res) => {
if (req.url === "/healthz") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify(status));
return;
}
res.writeHead(200, { "Content-Type": "text/html; charset=utf-8" });
res.end(renderStatusPage());
});
server.listen(port, () => {
console.log(`health server listening on ${port}`);
});
}
async function consumeLoop() {
const table = process.env.PG_TABLE || "rabbit_messages";
const queueName = (process.env.RABBIT_QUEUES || "crud_queue")
.split(",")
.map((q) => q.trim())
.filter(Boolean)[0] || "crud_queue";
const durable = String(process.env.RABBIT_DURABLE || "true").toLowerCase() === "true";
const prefetch = Number(process.env.RABBIT_PREFETCH || "1");
const requeueOnError = String(process.env.REQUEUE_ON_ERROR || "true").toLowerCase() === "true";
await ensureTable(table);
let backoffMs = 2000;
while (true) {
try {
const amqpUrl = buildAmqpUrl();
const connection = await amqp.connect(amqpUrl);
const channel = await connection.createChannel();
await channel.prefetch(prefetch);
await channel.assertQueue(queueName, { durable });
setStatus("running", "");
channel.consume(queueName, async (msg) => {
if (!msg) {
return;
}
try {
const parsed = parseMessage(msg.content);
await handleMessage(table, parsed);
metrics.processed += 1;
metrics.lastAction = parsed.action || "create";
metrics.lastMessageAt = new Date().toISOString();
channel.ack(msg);
} catch (err) {
metrics.failed += 1;
metrics.lastError = String(err.message || err);
metrics.lastMessageAt = new Date().toISOString();
if (String(err.message || "").includes("missing id") || String(err.message || "").includes("invalid action")) {
channel.nack(msg, false, false);
return;
}
channel.nack(msg, false, requeueOnError);
}
});
await new Promise((resolve, reject) => {
connection.on("close", resolve);
connection.on("error", reject);
});
} catch (err) {
setStatus("degraded", String(err.message || err));
await new Promise((r) => setTimeout(r, backoffMs));
if (backoffMs < 30000) {
backoffMs *= 2;
}
continue;
}
}
}
startHttpServer();
consumeLoop().catch((err) => {
setStatus("failed", String(err.message || err));
console.error(err);
});