From 84583abb8c317791a83ed1096fe50a372c0357a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CNaeel=E2=80=9D?= Date: Mon, 23 Feb 2026 16:56:35 +0400 Subject: [PATCH] Initial NodeJS Rabbit worker --- .gitignore | 3 + README.md | 22 +++++++ package.json | 13 ++++ server.js | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 215 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 package.json create mode 100644 server.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5773660 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +node_modules/ +.npmrc +.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..91ceeaa --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# Rabbit NodeJS Worker + +Consumes CRUD messages from RabbitMQ and writes to Postgres. + +## Environment variables + +RabbitMQ: +- `AMQP_URL` (preferred), example: `amqp://user:pass@host:5672/vhost` +- or `RABBIT_HOST`, `RABBIT_PORT` (default 5672), `RABBIT_USER`, `RABBIT_PASSWORD`, `RABBIT_VHOST` (default `/`) +- `RABBIT_QUEUES` (default `crud_queue`) +- `RABBIT_DURABLE` (default `true`) +- `RABBIT_PREFETCH` (default `1`) +- `REQUEUE_ON_ERROR` (default `true`) + +Postgres: +- `DATABASE_URL` (optional) +- or `PGHOST`, `PGPORT` (default 5432), `PGUSER`, `PGPASSWORD`, `PGDATABASE` (default `postgres`), `PGSSLMODE` (default `require`) +- `PG_TABLE` (default `rabbit_messages`) + +## Notes + +Health endpoint: `/healthz`. diff --git a/package.json b/package.json new file mode 100644 index 0000000..3413215 --- /dev/null +++ b/package.json @@ -0,0 +1,13 @@ +{ + "name": "rabbit-nodeworker", + "version": "0.1.0", + "description": "RabbitMQ worker for CRUD demo", + "main": "server.js", + "scripts": { + "start": "node server.js" + }, + "dependencies": { + "amqplib": "^0.10.4", + "pg": "^8.12.0" + } +} diff --git a/server.js b/server.js new file mode 100644 index 0000000..a145d32 --- /dev/null +++ b/server.js @@ -0,0 +1,177 @@ +const http = require("http"); +const amqp = require("amqplib"); +const { Pool } = require("pg"); + +const status = { + state: "starting", + error: "", + lastSuccess: "", +}; + +function setStatus(state, error) { + status.state = state; + status.error = error || ""; + if (!error) { + status.lastSuccess = new Date().toISOString(); + } +} + +function buildPgConfig() { + if (process.env.DATABASE_URL) { + return { + connectionString: process.env.DATABASE_URL, + ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined, + }; + } + + return { + host: process.env.PGHOST, + port: Number(process.env.PGPORT || "5432"), + user: process.env.PGUSER, + password: process.env.PGPASSWORD, + database: process.env.PGDATABASE || "postgres", + ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined, + }; +} + +function buildAmqpUrl() { + if (process.env.AMQP_URL) { + return process.env.AMQP_URL; + } + const host = process.env.RABBIT_HOST || ""; + const port = process.env.RABBIT_PORT || "5672"; + const user = process.env.RABBIT_USER || ""; + const pass = process.env.RABBIT_PASSWORD || ""; + const vhost = process.env.RABBIT_VHOST || "/"; + if (!host || !user || !pass) { + throw new Error("Missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL"); + } + return `amqp://${user}:${pass}@${host}:${port}${vhost}`; +} + +const pool = new Pool(buildPgConfig()); + +async function ensureTable(table) { + await pool.query( + `CREATE TABLE IF NOT EXISTS ${table} (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())` + ); +} + +function parseMessage(buffer) { + const body = buffer.toString("utf8"); + try { + const parsed = JSON.parse(body); + return { + action: parsed.action || "create", + id: parsed.id, + text: parsed.text, + requestId: parsed.request_id, + }; + } catch { + return { + action: "create", + text: body.trim() || "(empty)", + }; + } +} + +async function handleMessage(table, msg) { + const action = (msg.action || "create").toLowerCase(); + if (action === "create") { + const text = msg.text || "(empty)"; + await pool.query(`INSERT INTO ${table} (test_data) VALUES ($1)`, [text]); + return; + } + if (action === "update") { + if (!msg.id) { + throw new Error("missing id for update"); + } + await pool.query(`UPDATE ${table} SET test_data=$1 WHERE id=$2`, [msg.text || "", msg.id]); + return; + } + if (action === "delete") { + if (!msg.id) { + throw new Error("missing id for delete"); + } + await pool.query(`DELETE FROM ${table} WHERE id=$1`, [msg.id]); + return; + } + throw new Error("invalid action"); +} + +function startHttpServer() { + const port = process.env.PORT || 3000; + const server = http.createServer((req, res) => { + if (req.url === "/healthz") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify(status)); + return; + } + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("rabbit-nodeworker ready"); + }); + server.listen(port, () => { + console.log(`health server listening on ${port}`); + }); +} + +async function consumeLoop() { + const table = process.env.PG_TABLE || "rabbit_messages"; + const queueName = (process.env.RABBIT_QUEUES || "crud_queue") + .split(",") + .map((q) => q.trim()) + .filter(Boolean)[0] || "crud_queue"; + const durable = String(process.env.RABBIT_DURABLE || "true").toLowerCase() === "true"; + const prefetch = Number(process.env.RABBIT_PREFETCH || "1"); + const requeueOnError = String(process.env.REQUEUE_ON_ERROR || "true").toLowerCase() === "true"; + + await ensureTable(table); + + let backoffMs = 2000; + while (true) { + try { + const amqpUrl = buildAmqpUrl(); + const connection = await amqp.connect(amqpUrl); + const channel = await connection.createChannel(); + await channel.prefetch(prefetch); + await channel.assertQueue(queueName, { durable }); + + setStatus("running", ""); + + channel.consume(queueName, async (msg) => { + if (!msg) { + return; + } + try { + const parsed = parseMessage(msg.content); + await handleMessage(table, parsed); + channel.ack(msg); + } catch (err) { + if (String(err.message || "").includes("missing id") || String(err.message || "").includes("invalid action")) { + channel.nack(msg, false, false); + return; + } + channel.nack(msg, false, requeueOnError); + } + }); + + await new Promise((resolve, reject) => { + connection.on("close", resolve); + connection.on("error", reject); + }); + } catch (err) { + setStatus("degraded", String(err.message || err)); + await new Promise((r) => setTimeout(r, backoffMs)); + if (backoffMs < 30000) { + backoffMs *= 2; + } + continue; + } + } +} + +startHttpServer(); +consumeLoop().catch((err) => { + setStatus("failed", String(err.message || err)); + console.error(err); +});