sless-primer/POSTGRES/code/stress-go-pgstorm/handler.go
Naeel 4b04cde84b chore(examples): remove stale examples, keep only POSTGRES
- Deleted: TNAR, demo-event-log, demo-managed-functions, hello-go, hello-node,
  k8s, notes-python, pg-list-python, simple-node, simple-python
- POSTGRES: removed luceUNDnode.tf (commented-out legacy), stress_log_1.txt,
  funcs_list.py; disabled stress_destroy_apply.sh (PG lifecycle stress has
  delete_user bug); added README.md
- examples/README.md: updated to reflect current state (sless_service + sless_job)
2026-03-21 07:49:23 +03:00

149 lines
4.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 2026-03-19
// handler.go — Go стресс-тест PostgreSQL через pgxpool.
// Запускает N горутин (default 100), каждая в цикле duration_sec (default 600)
// долбит PG попеременно: INSERT / SELECT COUNT / SELECT MAX с случайными задержками.
// Цель: проверить Go runtime под конкурентной нагрузкой и устойчивость PG connection pool.
// Entrypoint: handler.Handle
package handler
import (
"context"
"fmt"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
// pgDSN собирает DSN из env vars (PGHOST, PGPORT, PGDATABASE, PGUSER, PGPASSWORD, PGSSLMODE).
func pgDSN() string {
host := os.Getenv("PGHOST")
port := os.Getenv("PGPORT")
if port == "" {
port = "5432"
}
db := os.Getenv("PGDATABASE")
user := os.Getenv("PGUSER")
pass := os.Getenv("PGPASSWORD")
sslmode := os.Getenv("PGSSLMODE")
if sslmode == "" {
sslmode = "require"
}
return fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s",
host, port, db, user, pass, sslmode)
}
// worker — одна горутина: чередует INSERT/COUNT/MAX с случайной задержкой до maxDelayMs.
// При ошибке инкрементирует errOps и продолжает (не паникует).
func worker(ctx context.Context, pool *pgxpool.Pool, workerID int, maxDelayMs int, okOps, errOps *int64) {
rng := rand.New(rand.NewSource(time.Now().UnixNano() + int64(workerID)))
op := 0
for {
select {
case <-ctx.Done():
return
default:
}
// Случайная задержка перед следующей операцией: 0..maxDelayMs мс
delay := rng.Intn(maxDelayMs + 1)
time.Sleep(time.Duration(delay) * time.Millisecond)
var err error
switch op % 3 {
case 0: // INSERT
title := fmt.Sprintf("pgstorm-w%d-%d", workerID, time.Now().UnixNano())
_, err = pool.Exec(ctx,
"INSERT INTO terraform_demo_table (title) VALUES ($1)", title)
case 1: // SELECT COUNT
var count int64
err = pool.QueryRow(ctx,
"SELECT COUNT(*) FROM terraform_demo_table").Scan(&count)
case 2: // SELECT MAX id
var maxID *int64
err = pool.QueryRow(ctx,
"SELECT MAX(id) FROM terraform_demo_table").Scan(&maxID)
}
if err != nil && ctx.Err() == nil {
atomic.AddInt64(errOps, 1)
} else if err == nil {
atomic.AddInt64(okOps, 1)
}
op++
}
}
func Handle(event map[string]interface{}) interface{} {
// Параметры из event (все опциональны — разумные defaults)
workers := 100
if v, ok := event["workers"].(float64); ok && v > 0 && v <= 500 {
workers = int(v)
}
durationSec := 600
if v, ok := event["duration_sec"].(float64); ok && v > 0 && v <= 3600 {
durationSec = int(v)
}
maxDelayMs := 300
if v, ok := event["max_delay_ms"].(float64); ok && v >= 0 && v <= 5000 {
maxDelayMs = int(v)
}
// Инициализация pgxpool — единый pool на всю функцию, MaxConns ограничен
// чтобы не перегрузить managed PG при большом числе горутин.
poolCfg, err := pgxpool.ParseConfig(pgDSN())
if err != nil {
return map[string]interface{}{"error": fmt.Sprintf("parse dsn: %v", err)}
}
maxConns := 20
if workers < 20 {
maxConns = workers
}
poolCfg.MaxConns = int32(maxConns)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(durationSec)*time.Second)
defer cancel()
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
if err != nil {
return map[string]interface{}{"error": fmt.Sprintf("connect pool: %v", err)}
}
defer pool.Close()
var okOps, errOps int64
startTime := time.Now()
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(ctx, pool, id, maxDelayMs, &okOps, &errOps)
}(i)
}
wg.Wait()
elapsed := time.Since(startTime).Seconds()
total := okOps + errOps
opsPerSec := 0.0
if elapsed > 0 {
opsPerSec = float64(total) / elapsed
}
return map[string]interface{}{
"runtime": "go1.23",
"version": "v1",
"workers": workers,
"duration_sec": durationSec,
"max_delay_ms": maxDelayMs,
"elapsed_sec": fmt.Sprintf("%.1f", elapsed),
"total_ops": total,
"ok_ops": okOps,
"err_ops": errOps,
"ops_per_sec": fmt.Sprintf("%.1f", opsPerSec),
}
}