-- 08_sql_functions.sql: PL/pgSQL functions — update_circuit_breaker, mark_expired_messages -- pg_regress test for ulak -- ============================================================================ -- SETUP -- ============================================================================ -- Create a dedicated endpoint for circuit breaker testing SELECT ulak.create_endpoint( 'cb_test_endpoint', 'http', '{"url": "http://localhost:9999/cb", "method": "POST"}'::jsonb ) IS NOT NULL AS created; -- Get the endpoint ID for direct use -- Store it in a temp table for reference CREATE TEMP TABLE test_ids AS SELECT id AS endpoint_id FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; -- ============================================================================ -- UPDATE_CIRCUIT_BREAKER: SUCCESS PATH -- ============================================================================ -- Initial state: circuit_failure_count = 0, circuit_state = 'closed' SELECT circuit_failure_count, circuit_state = 'closed' AS cb_is_closed FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; -- Call with success — should keep everything at 0/NULL SELECT ulak.update_circuit_breaker( (SELECT endpoint_id FROM test_ids), true ); SELECT circuit_failure_count, circuit_state = 'closed' AS cb_still_closed FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; -- ============================================================================ -- UPDATE_CIRCUIT_BREAKER: FAILURE PATH -- ============================================================================ -- Simulate failures below threshold (GUC default threshold=10) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); -- After 3 failures: circuit_failure_count=3, circuit still closed SELECT circuit_failure_count, circuit_state = 'closed' AS cb_still_closed FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; -- Seven more failures to reach threshold (10) SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), false); -- After 10 failures: circuit should be OPEN SELECT circuit_failure_count, circuit_state AS cb_state FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; -- ============================================================================ -- UPDATE_CIRCUIT_BREAKER: RESET ON SUCCESS -- ============================================================================ -- A success should reset failure_count and close the circuit SELECT ulak.update_circuit_breaker((SELECT endpoint_id FROM test_ids), true); SELECT circuit_failure_count, circuit_state = 'closed' AS cb_closed_after_success FROM ulak.endpoints WHERE name = 'cb_test_endpoint'; -- ============================================================================ -- MARK_EXPIRED_MESSAGES -- ============================================================================ -- Insert messages with different expiry states INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"already_expired"}'::jsonb, 'pending', NOW() - interval '1 hour' FROM test_ids; INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"not_yet_expired"}'::jsonb, 'pending', NOW() + interval '1 hour' FROM test_ids; INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"no_expiry"}'::jsonb, 'pending', NULL FROM test_ids; INSERT INTO ulak.queue (endpoint_id, payload, status, expires_at) SELECT endpoint_id, '{"exp":"already_completed"}'::jsonb, 'completed', NOW() - interval '1 hour' FROM test_ids; -- Run mark_expired_messages — should mark only the already-expired pending message SELECT ulak.mark_expired_messages(); -- Verify: only the expired pending message changed to 'expired' SELECT payload->>'exp' AS msg, status FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM test_ids) AND payload->>'exp' IN ('already_expired', 'not_yet_expired', 'no_expiry', 'already_completed') ORDER BY payload->>'exp'; -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM test_ids); DROP TABLE test_ids; SELECT ulak.drop_endpoint('cb_test_endpoint');