196 lines
9.4 KiB
Plaintext
196 lines
9.4 KiB
Plaintext
<cfcomponent displayname="Application" output="true">
|
|
<cfset this.Name = "rabbit-lsd-ui" />
|
|
<cfset this.sessionmanagement = "Yes" />
|
|
<cfset this.datasource = "testds" />
|
|
<cfset this.javaSettings = { loadPaths = [ expandPath("./lib") ] } />
|
|
<cfset getDS(this.datasource) />
|
|
|
|
<cffunction name="getEnv" access="private" returntype="string">
|
|
<cfargument name="key" type="string" required="true" />
|
|
<cfargument name="def" type="string" required="true" />
|
|
<cfset var system = createObject("java", "java.lang.System") />
|
|
<cfset var val = system.getEnv(arguments.key) />
|
|
<cfif isDefined("val") AND len(val)><cfreturn val /></cfif>
|
|
<cfreturn arguments.def />
|
|
</cffunction>
|
|
|
|
<cffunction name="getDS" access="private" returntype="void">
|
|
<cfargument name="dsname" type="string" required="true" />
|
|
<cfset var system = createObject("java", "java.lang.System") />
|
|
<cfset var ds = {} />
|
|
<cfloop list="class,connectionString,database,driver,host,port,type,url,username,password,bundleName,bundleVersion,connectionLimit,liveTimeout,validate" item="field">
|
|
<cfset var envVal = system.getEnv("#arguments.dsname#_#field#") />
|
|
<cfif isDefined("envVal") AND len(envVal)><cfset ds[field] = envVal /></cfif>
|
|
</cfloop>
|
|
<cfset this.datasources[arguments.dsname] = ds />
|
|
</cffunction>
|
|
|
|
<cffunction name="publishToRabbit" access="private" returntype="void">
|
|
<cfargument name="payload" type="struct" required="true" />
|
|
<cfset var host = getEnv("RABBIT_HOST", "") />
|
|
<cfset var user = getEnv("RABBIT_USER", "") />
|
|
<cfset var pass = getEnv("RABBIT_PASSWORD", "") />
|
|
<cfset var vhost = getEnv("RABBIT_VHOST", "/") />
|
|
<cfset var port = getEnv("RABBIT_PORT", "5672") />
|
|
<cfset var queueList = listToArray(getEnv("RABBIT_QUEUES", "crud_queue")) />
|
|
<cfset var queueName = trim(queueList[1]) />
|
|
<cfset var durable = lcase(getEnv("RABBIT_DURABLE", "true")) EQ "true" />
|
|
<cfset var factory = createObject("java", "com.rabbitmq.client.ConnectionFactory") />
|
|
<cfset var connection = "" />
|
|
<cfset var channel = "" />
|
|
|
|
<cfif host EQ "" OR user EQ "" OR pass EQ "">
|
|
<cfthrow message="Rabbit credentials are missing" />
|
|
</cfif>
|
|
|
|
<cfset factory.setHost(host) />
|
|
<cfset factory.setPort(JavaCast("int", port)) />
|
|
<cfset factory.setUsername(user) />
|
|
<cfset factory.setPassword(pass) />
|
|
<cfset factory.setVirtualHost(vhost) />
|
|
|
|
<cftry>
|
|
<cfset connection = factory.newConnection() />
|
|
<cfset channel = connection.createChannel() />
|
|
<cfset channel.queueDeclare(queueName, durable, false, false, JavaCast("null", "")) />
|
|
<cfset var payloadJson = serializeJSON(arguments.payload) />
|
|
<cfset var body = createObject("java", "java.lang.String").init(payloadJson).getBytes("UTF-8") />
|
|
<cfset channel.basicPublish("", queueName, JavaCast("null", ""), body) />
|
|
<cfif isDefined("channel")><cfset channel.close() /></cfif>
|
|
<cfif isDefined("connection")><cfset connection.close() /></cfif>
|
|
<cfcatch>
|
|
<cfif isDefined("channel")><cfset channel.close() /></cfif>
|
|
<cfif isDefined("connection")><cfset connection.close() /></cfif>
|
|
<cfthrow message="#cfcatch.message#" />
|
|
</cfcatch>
|
|
</cftry>
|
|
</cffunction>
|
|
|
|
<cffunction name="OnRequest" access="public" returntype="void" output="true">
|
|
<cfargument name="template" type="string" required="true" />
|
|
<cfset request.DS = this.datasource />
|
|
<cfset request.tableName = getEnv("PG_TABLE", "rabbit_messages") />
|
|
<cfset request.logTableName = getEnv("UI_LOG_TABLE", "rabbit_ui_log") />
|
|
<cfset request.rabbitAdminUrl = getEnv("RABBIT_ADMIN_URL", "") />
|
|
<cfset request.rabbitUser = getEnv("RABBIT_USER", "") />
|
|
<cfset request.rabbitPassword = getEnv("RABBIT_PASSWORD", "") />
|
|
<cfset request.nodeworkerUrl = getEnv("NODEWORKER_URL", "") />
|
|
|
|
<cftry>
|
|
<cfquery datasource="#request.DS#">
|
|
CREATE TABLE IF NOT EXISTS #request.tableName# (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)
|
|
</cfquery>
|
|
<cfcatch><cfset request.db_error = cfcatch.message /></cfcatch>
|
|
</cftry>
|
|
|
|
<cftry>
|
|
<cfquery datasource="#request.DS#">
|
|
CREATE TABLE IF NOT EXISTS #request.logTableName# (
|
|
id SERIAL PRIMARY KEY,
|
|
action TEXT NOT NULL,
|
|
text TEXT,
|
|
request_id TEXT,
|
|
target_id INTEGER,
|
|
queued_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
processed_at TIMESTAMP,
|
|
status TEXT DEFAULT 'queued',
|
|
error_reason TEXT
|
|
)
|
|
</cfquery>
|
|
<cfquery datasource="#request.DS#">
|
|
ALTER TABLE #request.logTableName# ADD COLUMN IF NOT EXISTS error_reason TEXT
|
|
</cfquery>
|
|
<cfquery datasource="#request.DS#">
|
|
UPDATE #request.logTableName#
|
|
SET processed_at = NOW(), status = 'done'
|
|
WHERE status = 'queued'
|
|
AND action IN ('create','update')
|
|
AND request_id IS NOT NULL
|
|
AND EXISTS (
|
|
SELECT 1 FROM #request.tableName#
|
|
WHERE test_data LIKE '%' || '[req:' || request_id || ']%'
|
|
)
|
|
</cfquery>
|
|
<cfquery datasource="#request.DS#">
|
|
UPDATE #request.logTableName#
|
|
SET processed_at = NOW(), status = 'done'
|
|
WHERE status = 'queued'
|
|
AND action = 'delete'
|
|
AND target_id IS NOT NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM #request.tableName# WHERE id = target_id
|
|
)
|
|
</cfquery>
|
|
<cfquery datasource="#request.DS#">
|
|
UPDATE #request.logTableName#
|
|
SET status = 'failed', error_reason = 'target not found'
|
|
WHERE status = 'queued'
|
|
AND action = 'update'
|
|
AND target_id IS NOT NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM #request.tableName# WHERE id = target_id
|
|
)
|
|
</cfquery>
|
|
<cfcatch><cfset request.db_error = cfcatch.message /></cfcatch>
|
|
</cftry>
|
|
|
|
<cfif CGI.REQUEST_METHOD EQ "POST" AND structKeyExists(form, "crud_action")>
|
|
<cfset var action = lcase(form.crud_action) />
|
|
<cfset var requestId = createUUID() />
|
|
<cfset var payload = { "request_id" = requestId } />
|
|
<cfset var logText = "" />
|
|
<cfset var logTargetId = "" />
|
|
|
|
<cftry>
|
|
<cfswitch expression="#action#">
|
|
<cfcase value="insert">
|
|
<cfset payload.action = "create" />
|
|
<cfset logText = trim(form.txt_content) />
|
|
<cfset payload.text = logText & " [req:" & requestId & "]" />
|
|
</cfcase>
|
|
<cfcase value="update">
|
|
<cfif NOT structKeyExists(form, "id")>
|
|
<cfthrow message="Missing id for update" />
|
|
</cfif>
|
|
<cfset payload.action = "update" />
|
|
<cfset payload.id = val(form.id) />
|
|
<cfset logText = trim(form.txt_content) />
|
|
<cfset logTargetId = toString(val(form.id)) />
|
|
<cfset payload.text = logText & " [req:" & requestId & "]" />
|
|
</cfcase>
|
|
<cfcase value="delete">
|
|
<cfif NOT structKeyExists(form, "id")>
|
|
<cfthrow message="Missing id for delete" />
|
|
</cfif>
|
|
<cfset payload.action = "delete" />
|
|
<cfset payload.id = val(form.id) />
|
|
<cfset logTargetId = toString(val(form.id)) />
|
|
</cfcase>
|
|
<cfdefaultcase>
|
|
<cfthrow message="Unknown action" />
|
|
</cfdefaultcase>
|
|
</cfswitch>
|
|
|
|
<cfset publishToRabbit(payload) />
|
|
<cfquery datasource="#request.DS#">
|
|
INSERT INTO #request.logTableName# (action, text, request_id, target_id, queued_at, status)
|
|
VALUES (
|
|
<cfqueryparam value="#payload.action#" cfsqltype="cf_sql_varchar">,
|
|
<cfqueryparam value="#logText#" cfsqltype="cf_sql_varchar">,
|
|
<cfqueryparam value="#requestId#" cfsqltype="cf_sql_varchar">,
|
|
<cfqueryparam value="#logTargetId#" cfsqltype="cf_sql_integer" null="#NOT len(logTargetId)#">,
|
|
NOW(),
|
|
'queued'
|
|
)
|
|
</cfquery>
|
|
<cflocation url="#CGI.SCRIPT_NAME#" addtoken="false">
|
|
<cfcatch>
|
|
<cfset request.queue_error = cfcatch.message />
|
|
</cfcatch>
|
|
</cftry>
|
|
</cfif>
|
|
|
|
<cfinclude template="#arguments.template#" />
|
|
</cffunction>
|
|
</cfcomponent>
|