-- 08_sql_functions.sql: PL/pgSQL functions — update_circuit_breaker, mark_expired_messages -- pg_regress test for ulak -- ============================================================================ -- SETUP -- ============================================================================ -- Create a dedicated endpoint for circuit breaker testing SELECT ulak.create_endpoint( 'cb_test_endpoint', 'http', '{"url": "http://localhost:9999/cb", "method": "POST"}'::jsonb ) IS NOT NULL AS created; INFO: [ulak] Created endpoint with ID 15 created --------- t (1 row) -- Get the endpoint ID for direct use -- Store it in a temp table for reference CREATE TEMP TABLE test_ids AS SELECT id AS endpoint_id FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; -- ============================================================================ -- UPDATE_CIRCUIT_BREAKER: SUCCESS PATH -- ============================================================================ -- Initial state: circuit_failure_count = 0, circuit_state = 'closed' SELECT circuit_failure_count, circuit_state = 'closed' AS cb_is_closed FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; circuit_failure_count | cb_is_closed -----------------------+-------------- 0 | t (1 row) -- Call with success — should keep everything at 0/NULL SELECT ulak.update_circuit_breaker( (SELECT endpoint_id FROM test_ids), true ); update_circuit_breaker ------------------------ (1 row) SELECT circuit_failure_count, circuit_state = 'closed' AS cb_still_closed FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; circuit_failure_count | cb_still_closed -----------------------+----------------- 0 | t (1 row) -- ============================================================================ -- UPDATE_CIRCUIT_BREAKER: FAILURE PATH -- ============================================================================ -- Simulate failures below threshold (GUC default threshold=10) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) -- After 3 failures: circuit_failure_count=3, circuit still closed SELECT circuit_failure_count, circuit_state = 'closed' AS cb_still_closed FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; circuit_failure_count | cb_still_closed -----------------------+----------------- 3 | t (1 row) -- Seven more failures to reach threshold (10) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); update_circuit_breaker ------------------------ (1 row) -- After 10 failures: circuit should be OPEN SELECT circuit_failure_count, circuit_state AS cb_state FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; circuit_failure_count | cb_state -----------------------+---------- 10 | open (1 row) -- ============================================================================ -- UPDATE_CIRCUIT_BREAKER: RESET ON SUCCESS -- ============================================================================ -- A success should reset failure_count and close the circuit SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), true); update_circuit_breaker ------------------------ (1 row) SELECT circuit_failure_count, circuit_state = 'closed' AS cb_closed_after_success FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; circuit_failure_count | cb_closed_after_success -----------------------+------------------------- 0 | t (1 row) -- ============================================================================ -- MARK_EXPIRED_MESSAGES -- ============================================================================ -- Insert messages with different expiry states INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"already_expired"}'::jsonb, 'pending', NOW() - interval '1 hour' FROM test_ids; INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"not_yet_expired"}'::jsonb, 'pending', NOW() + interval '1 hour' FROM test_ids; INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"no_expiry"}'::jsonb, 'pending', NULL FROM test_ids; INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"already_completed"}'::jsonb, 'completed', NOW() - interval '1 hour' FROM test_ids; -- Run mark_expired_messages — should mark only the already-expired pending message SELECT ulak.mark_expired_messages(); mark_expired_messages ----------------------- 1 (1 row) -- Verify: only the expired pending message changed to 'expired' SELECT payload->>'exp' AS msg, status FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM test_ids) AND payload->>'exp' IN ('already_expired', 'not_yet_expired', 'no_expiry', 'already_completed') ORDER BY payload->>'exp'; msg | status -------------------+----------- already_completed | completed already_expired | expired no_expiry | pending not_yet_expired | pending (4 rows) -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM test_ids); DROP TABLE test_ids; SELECT ulak.drop_endpoint('cb_test_endpoint'); INFO: [ulak] Dropped endpoint 'cb_test_endpoint' drop_endpoint --------------- t (1 row)