Initial NodeJS Rabbit worker
This commit is contained in:
commit
84583abb8c
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
node_modules/
|
||||
.npmrc
|
||||
.env
|
||||
22
README.md
Normal file
22
README.md
Normal file
@ -0,0 +1,22 @@
|
||||
# Rabbit NodeJS Worker
|
||||
|
||||
Consumes CRUD messages from RabbitMQ and writes to Postgres.
|
||||
|
||||
## Environment variables
|
||||
|
||||
RabbitMQ:
|
||||
- `AMQP_URL` (preferred), example: `amqp://user:pass@host:5672/vhost`
|
||||
- or `RABBIT_HOST`, `RABBIT_PORT` (default 5672), `RABBIT_USER`, `RABBIT_PASSWORD`, `RABBIT_VHOST` (default `/`)
|
||||
- `RABBIT_QUEUES` (default `crud_queue`)
|
||||
- `RABBIT_DURABLE` (default `true`)
|
||||
- `RABBIT_PREFETCH` (default `1`)
|
||||
- `REQUEUE_ON_ERROR` (default `true`)
|
||||
|
||||
Postgres:
|
||||
- `DATABASE_URL` (optional)
|
||||
- or `PGHOST`, `PGPORT` (default 5432), `PGUSER`, `PGPASSWORD`, `PGDATABASE` (default `postgres`), `PGSSLMODE` (default `require`)
|
||||
- `PG_TABLE` (default `rabbit_messages`)
|
||||
|
||||
## Notes
|
||||
|
||||
Health endpoint: `/healthz`.
|
||||
13
package.json
Normal file
13
package.json
Normal file
@ -0,0 +1,13 @@
|
||||
{
|
||||
"name": "rabbit-nodeworker",
|
||||
"version": "0.1.0",
|
||||
"description": "RabbitMQ worker for CRUD demo",
|
||||
"main": "server.js",
|
||||
"scripts": {
|
||||
"start": "node server.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"amqplib": "^0.10.4",
|
||||
"pg": "^8.12.0"
|
||||
}
|
||||
}
|
||||
177
server.js
Normal file
177
server.js
Normal file
@ -0,0 +1,177 @@
|
||||
const http = require("http");
|
||||
const amqp = require("amqplib");
|
||||
const { Pool } = require("pg");
|
||||
|
||||
const status = {
|
||||
state: "starting",
|
||||
error: "",
|
||||
lastSuccess: "",
|
||||
};
|
||||
|
||||
function setStatus(state, error) {
|
||||
status.state = state;
|
||||
status.error = error || "";
|
||||
if (!error) {
|
||||
status.lastSuccess = new Date().toISOString();
|
||||
}
|
||||
}
|
||||
|
||||
function buildPgConfig() {
|
||||
if (process.env.DATABASE_URL) {
|
||||
return {
|
||||
connectionString: process.env.DATABASE_URL,
|
||||
ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
host: process.env.PGHOST,
|
||||
port: Number(process.env.PGPORT || "5432"),
|
||||
user: process.env.PGUSER,
|
||||
password: process.env.PGPASSWORD,
|
||||
database: process.env.PGDATABASE || "postgres",
|
||||
ssl: process.env.PGSSLMODE === "require" ? { rejectUnauthorized: false } : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function buildAmqpUrl() {
|
||||
if (process.env.AMQP_URL) {
|
||||
return process.env.AMQP_URL;
|
||||
}
|
||||
const host = process.env.RABBIT_HOST || "";
|
||||
const port = process.env.RABBIT_PORT || "5672";
|
||||
const user = process.env.RABBIT_USER || "";
|
||||
const pass = process.env.RABBIT_PASSWORD || "";
|
||||
const vhost = process.env.RABBIT_VHOST || "/";
|
||||
if (!host || !user || !pass) {
|
||||
throw new Error("Missing RABBIT_HOST/RABBIT_USER/RABBIT_PASSWORD or AMQP_URL");
|
||||
}
|
||||
return `amqp://${user}:${pass}@${host}:${port}${vhost}`;
|
||||
}
|
||||
|
||||
const pool = new Pool(buildPgConfig());
|
||||
|
||||
async function ensureTable(table) {
|
||||
await pool.query(
|
||||
`CREATE TABLE IF NOT EXISTS ${table} (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT NOW())`
|
||||
);
|
||||
}
|
||||
|
||||
function parseMessage(buffer) {
|
||||
const body = buffer.toString("utf8");
|
||||
try {
|
||||
const parsed = JSON.parse(body);
|
||||
return {
|
||||
action: parsed.action || "create",
|
||||
id: parsed.id,
|
||||
text: parsed.text,
|
||||
requestId: parsed.request_id,
|
||||
};
|
||||
} catch {
|
||||
return {
|
||||
action: "create",
|
||||
text: body.trim() || "(empty)",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async function handleMessage(table, msg) {
|
||||
const action = (msg.action || "create").toLowerCase();
|
||||
if (action === "create") {
|
||||
const text = msg.text || "(empty)";
|
||||
await pool.query(`INSERT INTO ${table} (test_data) VALUES ($1)`, [text]);
|
||||
return;
|
||||
}
|
||||
if (action === "update") {
|
||||
if (!msg.id) {
|
||||
throw new Error("missing id for update");
|
||||
}
|
||||
await pool.query(`UPDATE ${table} SET test_data=$1 WHERE id=$2`, [msg.text || "", msg.id]);
|
||||
return;
|
||||
}
|
||||
if (action === "delete") {
|
||||
if (!msg.id) {
|
||||
throw new Error("missing id for delete");
|
||||
}
|
||||
await pool.query(`DELETE FROM ${table} WHERE id=$1`, [msg.id]);
|
||||
return;
|
||||
}
|
||||
throw new Error("invalid action");
|
||||
}
|
||||
|
||||
function startHttpServer() {
|
||||
const port = process.env.PORT || 3000;
|
||||
const server = http.createServer((req, res) => {
|
||||
if (req.url === "/healthz") {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify(status));
|
||||
return;
|
||||
}
|
||||
res.writeHead(200, { "Content-Type": "text/plain" });
|
||||
res.end("rabbit-nodeworker ready");
|
||||
});
|
||||
server.listen(port, () => {
|
||||
console.log(`health server listening on ${port}`);
|
||||
});
|
||||
}
|
||||
|
||||
async function consumeLoop() {
|
||||
const table = process.env.PG_TABLE || "rabbit_messages";
|
||||
const queueName = (process.env.RABBIT_QUEUES || "crud_queue")
|
||||
.split(",")
|
||||
.map((q) => q.trim())
|
||||
.filter(Boolean)[0] || "crud_queue";
|
||||
const durable = String(process.env.RABBIT_DURABLE || "true").toLowerCase() === "true";
|
||||
const prefetch = Number(process.env.RABBIT_PREFETCH || "1");
|
||||
const requeueOnError = String(process.env.REQUEUE_ON_ERROR || "true").toLowerCase() === "true";
|
||||
|
||||
await ensureTable(table);
|
||||
|
||||
let backoffMs = 2000;
|
||||
while (true) {
|
||||
try {
|
||||
const amqpUrl = buildAmqpUrl();
|
||||
const connection = await amqp.connect(amqpUrl);
|
||||
const channel = await connection.createChannel();
|
||||
await channel.prefetch(prefetch);
|
||||
await channel.assertQueue(queueName, { durable });
|
||||
|
||||
setStatus("running", "");
|
||||
|
||||
channel.consume(queueName, async (msg) => {
|
||||
if (!msg) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const parsed = parseMessage(msg.content);
|
||||
await handleMessage(table, parsed);
|
||||
channel.ack(msg);
|
||||
} catch (err) {
|
||||
if (String(err.message || "").includes("missing id") || String(err.message || "").includes("invalid action")) {
|
||||
channel.nack(msg, false, false);
|
||||
return;
|
||||
}
|
||||
channel.nack(msg, false, requeueOnError);
|
||||
}
|
||||
});
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
connection.on("close", resolve);
|
||||
connection.on("error", reject);
|
||||
});
|
||||
} catch (err) {
|
||||
setStatus("degraded", String(err.message || err));
|
||||
await new Promise((r) => setTimeout(r, backoffMs));
|
||||
if (backoffMs < 30000) {
|
||||
backoffMs *= 2;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startHttpServer();
|
||||
consumeLoop().catch((err) => {
|
||||
setStatus("failed", String(err.message || err));
|
||||
console.error(err);
|
||||
});
|
||||
Loading…
Reference in New Issue
Block a user