sless-primer/TNAR/code/sql-runner/sql_runner.py
2026-03-19 10:29:26 +04:00

40 lines
1.4 KiB
Python

# 2026-03-17 00:00
# sql_runner.py — функция для выполнения SQL-операторов из входного события.
import os
import psycopg2
def run_sql(event):
# Выполняет список SQL-операторов в одной транзакции для атомарной инициализации схемы.
# Параметры подключения передаются раздельно, чтобы избежать ошибок парсинга DSN при спецсимволах.
pg_host = os.environ["PGHOST"]
pg_port = os.environ.get("PGPORT", "5432")
pg_database = os.environ["PGDATABASE"]
pg_user = os.environ["PGUSER"]
pg_password = os.environ["PGPASSWORD"]
pg_sslmode = os.environ.get("PGSSLMODE", "require")
statements = event.get("statements", [])
if not statements:
return {"error": "no statements provided"}
connection = psycopg2.connect(
host=pg_host,
port=pg_port,
dbname=pg_database,
user=pg_user,
password=pg_password,
sslmode=pg_sslmode,
)
try:
cursor = connection.cursor()
for statement in statements:
cursor.execute(statement)
connection.commit()
return {"ok": True, "executed": len(statements)}
except Exception as error:
connection.rollback()
return {"error": str(error)}
finally:
connection.close()