rabbit-lsd/Application.cfc

210 lines
11 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<cfcomponent displayname="Application" output="true">
<!--- Базовая конфигурация приложения и подключение JAR-клиента RabbitMQ. --->
<cfset this.Name = "rabbit-lsd-ui" />
<cfset this.sessionmanagement = "Yes" />
<cfset this.datasource = "testds" />
<cfset this.javaSettings = { loadPaths = [ expandPath("./lib") ] } />
<!--- Инициализируем datasource из переменных окружения. --->
<cfset getDS(this.datasource) />
<!--- Безопасное чтение переменной окружения с дефолтом. --->
<cffunction name="getEnv" access="private" returntype="string">
<cfargument name="key" type="string" required="true" />
<cfargument name="def" type="string" required="true" />
<cfset var system = createObject("java", "java.lang.System") />
<cfset var val = system.getEnv(arguments.key) />
<cfif isDefined("val") AND len(val)><cfreturn val /></cfif>
<cfreturn arguments.def />
</cffunction>
<!--- Собираем datasource из набора *_field переменных окружения. --->
<cffunction name="getDS" access="private" returntype="void">
<cfargument name="dsname" type="string" required="true" />
<cfset var system = createObject("java", "java.lang.System") />
<cfset var ds = {} />
<cfloop list="class,connectionString,database,driver,host,port,type,url,username,password,bundleName,bundleVersion,connectionLimit,liveTimeout,validate" item="field">
<cfset var envVal = system.getEnv("#arguments.dsname#_#field#") />
<cfif isDefined("envVal") AND len(envVal)><cfset ds[field] = envVal /></cfif>
</cfloop>
<cfset this.datasources[arguments.dsname] = ds />
</cffunction>
<!--- Публикация CRUD-сообщения в RabbitMQ. --->
<cffunction name="publishToRabbit" access="private" returntype="void">
<cfargument name="payload" type="struct" required="true" />
<cfset var host = getEnv("RABBIT_HOST", "") />
<cfset var user = getEnv("RABBIT_USER", "") />
<cfset var pass = getEnv("RABBIT_PASSWORD", "") />
<cfset var vhost = getEnv("RABBIT_VHOST", "/") />
<cfset var port = getEnv("RABBIT_PORT", "5672") />
<cfset var queueList = listToArray(getEnv("RABBIT_QUEUES", "crud_queue")) />
<cfset var queueName = trim(queueList[1]) />
<cfset var durable = lcase(getEnv("RABBIT_DURABLE", "true")) EQ "true" />
<cfset var factory = createObject("java", "com.rabbitmq.client.ConnectionFactory") />
<cfset var connection = "" />
<cfset var channel = "" />
<!--- Не стартуем публикацию без базовых кредов. --->
<cfif host EQ "" OR user EQ "" OR pass EQ "">
<cfthrow message="Rabbit credentials are missing" />
</cfif>
<cfset factory.setHost(host) />
<cfset factory.setPort(JavaCast("int", port)) />
<cfset factory.setUsername(user) />
<cfset factory.setPassword(pass) />
<cfset factory.setVirtualHost(vhost) />
<!--- Подключаемся, объявляем очередь и отправляем JSON. --->
<cftry>
<cfset connection = factory.newConnection() />
<cfset channel = connection.createChannel() />
<cfset channel.queueDeclare(queueName, durable, false, false, JavaCast("null", "")) />
<cfset var payloadJson = serializeJSON(arguments.payload) />
<cfset var body = createObject("java", "java.lang.String").init(payloadJson).getBytes("UTF-8") />
<cfset channel.basicPublish("", queueName, JavaCast("null", ""), body) />
<cfif isDefined("channel")><cfset channel.close() /></cfif>
<cfif isDefined("connection")><cfset connection.close() /></cfif>
<cfcatch>
<cfif isDefined("channel")><cfset channel.close() /></cfif>
<cfif isDefined("connection")><cfset connection.close() /></cfif>
<cfthrow message="#cfcatch.message#" />
</cfcatch>
</cftry>
</cffunction>
<!--- Основной обработчик запроса: подготовка БД, логов и POST-операций. --->
<cffunction name="OnRequest" access="public" returntype="void" output="true">
<cfargument name="template" type="string" required="true" />
<cfset request.DS = this.datasource />
<cfset request.tableName = getEnv("PG_TABLE", "rabbit_messages") />
<cfset request.logTableName = getEnv("UI_LOG_TABLE", "rabbit_ui_log") />
<cfset request.rabbitAdminUrl = getEnv("RABBIT_ADMIN_URL", "") />
<cfset request.rabbitUser = getEnv("RABBIT_USER", "") />
<cfset request.rabbitPassword = getEnv("RABBIT_PASSWORD", "") />
<cfset request.nodeworkerUrl = getEnv("NODEWORKER_URL", "") />
<!--- Гарантируем наличие основной таблицы сообщений. --->
<cftry>
<cfquery datasource="#request.DS#">
CREATE TABLE IF NOT EXISTS #request.tableName# (id SERIAL PRIMARY KEY, test_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)
</cfquery>
<cfcatch><cfset request.db_error = cfcatch.message /></cfcatch>
</cftry>
<!--- Гарантируем наличие таблицы лога UI и обновляем статусы. --->
<cftry>
<cfquery datasource="#request.DS#">
CREATE TABLE IF NOT EXISTS #request.logTableName# (
id SERIAL PRIMARY KEY,
action TEXT NOT NULL,
text TEXT,
request_id TEXT,
target_id INTEGER,
queued_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP,
status TEXT DEFAULT 'queued',
error_reason TEXT
)
</cfquery>
<cfquery datasource="#request.DS#">
ALTER TABLE #request.logTableName# ADD COLUMN IF NOT EXISTS error_reason TEXT
</cfquery>
<cfquery datasource="#request.DS#">
UPDATE #request.logTableName#
SET processed_at = NOW(), status = 'done'
WHERE status = 'queued'
AND action IN ('create','update')
AND request_id IS NOT NULL
AND EXISTS (
SELECT 1 FROM #request.tableName#
WHERE test_data LIKE '%' || '[req:' || request_id || ']%'
)
</cfquery>
<cfquery datasource="#request.DS#">
UPDATE #request.logTableName#
SET processed_at = NOW(), status = 'done'
WHERE status = 'queued'
AND action = 'delete'
AND target_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM #request.tableName# WHERE id = target_id
)
</cfquery>
<cfquery datasource="#request.DS#">
UPDATE #request.logTableName#
SET status = 'failed', error_reason = 'target not found'
WHERE status = 'queued'
AND action = 'update'
AND target_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM #request.tableName# WHERE id = target_id
)
</cfquery>
<cfcatch><cfset request.db_error = cfcatch.message /></cfcatch>
</cftry>
<!--- Обрабатываем POST CRUD и отправляем событие в очередь. --->
<cfif CGI.REQUEST_METHOD EQ "POST" AND structKeyExists(form, "crud_action")>
<cfset var action = lcase(form.crud_action) />
<cfset var requestId = createUUID() />
<cfset var payload = { "request_id" = requestId } />
<cfset var logText = "" />
<cfset var logTargetId = "" />
<!--- Формируем payload и лог для очереди. --->
<cftry>
<cfswitch expression="#action#">
<cfcase value="insert">
<cfset payload.action = "create" />
<cfset logText = trim(form.txt_content) />
<cfset payload.text = logText & " [req:" & requestId & "]" />
</cfcase>
<cfcase value="update">
<cfif NOT structKeyExists(form, "id")>
<cfthrow message="Missing id for update" />
</cfif>
<cfset payload.action = "update" />
<cfset payload.id = val(form.id) />
<cfset logText = trim(form.txt_content) />
<cfset logTargetId = toString(val(form.id)) />
<cfset payload.text = logText & " [req:" & requestId & "]" />
</cfcase>
<cfcase value="delete">
<cfif NOT structKeyExists(form, "id")>
<cfthrow message="Missing id for delete" />
</cfif>
<cfset payload.action = "delete" />
<cfset payload.id = val(form.id) />
<cfset logTargetId = toString(val(form.id)) />
<cfset logText = "delete id " & logTargetId />
</cfcase>
<cfdefaultcase>
<cfthrow message="Unknown action" />
</cfdefaultcase>
</cfswitch>
<!--- Публикуем сообщение и пишем запись в лог UI. --->
<cfset publishToRabbit(payload) />
<cfquery datasource="#request.DS#">
INSERT INTO #request.logTableName# (action, text, request_id, target_id, queued_at, status)
VALUES (
<cfqueryparam value="#payload.action#" cfsqltype="cf_sql_varchar">,
<cfqueryparam value="#logText#" cfsqltype="cf_sql_varchar">,
<cfqueryparam value="#requestId#" cfsqltype="cf_sql_varchar">,
<cfqueryparam value="#logTargetId#" cfsqltype="cf_sql_integer" null="#NOT len(logTargetId)#">,
NOW(),
'queued'
)
</cfquery>
<cflocation url="#CGI.SCRIPT_NAME#" addtoken="false">
<cfcatch>
<cfset request.queue_error = cfcatch.message />
</cfcatch>
</cftry>
</cfif>
<cfinclude template="#arguments.template#" />
</cffunction>
</cfcomponent>