sless-primer/POSTGRES/code/py-retry-writer/py_retry_writer.py
2026-03-22 17:08:18 +04:00

55 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 2026-03-21 — py-retry-writer: пишет N строк с retry при PG ошибке.
# Тестирует: устойчивость к transient PG errors (simulate_error=true), retry logic,
# корректный rollback при частичном сбое.
import os, time, psycopg2, random
_MAX_RETRIES = 3
def retry_write(event):
n = min(int(event.get("n", 5)), 100)
prefix = str(event.get("prefix", "retry"))[:40]
# simulate_error: с вероятностью 30% кидает OperationalError на 2-й попытке
simulate = str(event.get("simulate_error", "false")).lower() in ("true", "1")
attempt = 0
last_err = None
while attempt < _MAX_RETRIES:
attempt += 1
try:
conn = psycopg2.connect(
host=os.environ["PGHOST"], port=int(os.environ.get("PGPORT", 5432)),
dbname=os.environ["PGDATABASE"], user=os.environ["PGUSER"],
password=os.environ["PGPASSWORD"], sslmode=os.environ.get("PGSSLMODE", "require"),
)
inserted = []
try:
with conn.cursor() as cur:
for i in range(n):
# Симуляция: на первой попытке падаем с вероятностью 50%
if simulate and attempt == 1 and i == n // 2:
raise psycopg2.OperationalError("simulated transient error")
title = f"{prefix}-{int(time.time()*1000)}-{i}-a{attempt}"
cur.execute(
"INSERT INTO terraform_demo_table (title) VALUES (%s) RETURNING id",
(title,),
)
inserted.append(cur.fetchone()[0])
conn.commit()
return {
"ok": True, "inserted": len(inserted),
"attempts": attempt, "first_id": inserted[0] if inserted else None,
}
except Exception as e:
conn.rollback()
raise
finally:
conn.close()
except psycopg2.OperationalError as e:
last_err = str(e)
if attempt < _MAX_RETRIES:
time.sleep(0.3 * attempt) # exponential backoff
continue
return {"ok": False, "attempts": attempt, "last_error": last_err}