-- 16_message_lifecycle.sql: TTL, DLQ, idempotency, replay, redrive -- ============================================================================ -- SETUP -- ============================================================================ SELECT ulak.create_endpoint('lc_ep', 'http', '{"url": "http://localhost:9999/lc"}'::jsonb) IS NOT NULL AS ep_created; INFO: [ulak] Created endpoint with ID 36 ep_created ------------ t (1 row) CREATE TEMP TABLE lc_ids AS SELECT id AS endpoint_id FROM ulak.endpoints WHERE name = 'lc_ep'; -- Note: max_queue_size is PGC_SIGHUP, default 1M is sufficient for tests -- ============================================================================ -- TTL EXPIRY: mark_expired_messages -- ============================================================================ -- Past expires_at → should be expired SELECT ulak.send_with_options( 'lc_ep', '{"ttl": "expired"}'::jsonb, p_expires_at => NOW() - interval '1 hour' ) IS NOT NULL AS ttl_expired_msg; ttl_expired_msg ----------------- t (1 row) -- Future expires_at → should survive SELECT ulak.send_with_options( 'lc_ep', '{"ttl": "alive"}'::jsonb, p_expires_at => NOW() + interval '1 hour' ) IS NOT NULL AS ttl_alive_msg; ttl_alive_msg --------------- t (1 row) -- No expires_at → should survive SELECT ulak.send_with_options( 'lc_ep', '{"ttl": "none"}'::jsonb ) IS NOT NULL AS ttl_none_msg; ttl_none_msg -------------- t (1 row) SELECT ulak.mark_expired_messages(); mark_expired_messages ----------------------- 1 (1 row) SELECT payload->>'ttl' AS ttl, status FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids) AND payload ? 'ttl' ORDER BY payload->>'ttl'; ttl | status ---------+--------- alive | pending expired | expired none | pending (3 rows) DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); -- ============================================================================ -- MAX RETRIES → DLQ: archive_single_to_dlq -- ============================================================================ INSERT INTO ulak.queue (endpoint_id, payload, status, retry_count, last_error, failed_at) SELECT endpoint_id, '{"dlq_test": "max_retries"}'::jsonb, 'failed', 10, 'max retries exceeded', NOW() FROM lc_ids; CREATE TEMP TABLE dlq_msg AS SELECT id FROM ulak.queue WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb; SET client_min_messages = warning; SELECT ulak.archive_single_to_dlq((SELECT id FROM dlq_msg)); archive_single_to_dlq ----------------------- t (1 row) RESET client_min_messages; -- Verify in DLQ SELECT EXISTS( SELECT 1 FROM ulak.dlq WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb ) AS in_dlq; in_dlq -------- t (1 row) -- Verify removed from queue SELECT NOT EXISTS( SELECT 1 FROM ulak.queue WHERE id = (SELECT id FROM dlq_msg) ) AS removed_from_queue; removed_from_queue -------------------- t (1 row) -- Verify DLQ fields SELECT endpoint_name = 'lc_ep' AS correct_endpoint, protocol = 'http' AS correct_protocol, retry_count = 10 AS correct_retries FROM ulak.dlq WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb; correct_endpoint | correct_protocol | correct_retries ------------------+------------------+----------------- t | t | t (1 row) DROP TABLE dlq_msg; -- ============================================================================ -- IDEMPOTENCY KEY: DUPLICATE REJECTION (pending) -- ============================================================================ SELECT ulak.send_with_options( 'lc_ep', '{"idem": "first"}'::jsonb, p_idempotency_key => 'idem-key-001' ) IS NOT NULL AS idem_first; idem_first ------------ t (1 row) -- Same key, same payload → returns existing (no error, no duplicate) SELECT ulak.send_with_options( 'lc_ep', '{"idem": "first"}'::jsonb, p_idempotency_key => 'idem-key-001' ) IS NOT NULL AS idem_dup; idem_dup ---------- t (1 row) SELECT count(*) AS idem_count FROM ulak.queue WHERE idempotency_key = 'idem-key-001'; idem_count ------------ 1 (1 row) -- ============================================================================ -- IDEMPOTENCY KEY: COMPLETED ALLOWS REUSE -- ============================================================================ UPDATE ulak.queue SET status = 'completed', completed_at = NOW() WHERE idempotency_key = 'idem-key-001'; -- Same key should work now (partial index only covers pending/processing) SELECT ulak.send_with_options( 'lc_ep', '{"idem": "reused"}'::jsonb, p_idempotency_key => 'idem-key-001' ) IS NOT NULL AS idem_reuse; idem_reuse ------------ t (1 row) DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); -- ============================================================================ -- REDRIVE: redrive_message() -- ============================================================================ CREATE TEMP TABLE redrive_dlq AS SELECT id FROM ulak.dlq WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb LIMIT 1; SET client_min_messages = warning; SELECT ulak.redrive_message((SELECT id FROM redrive_dlq)) IS NOT NULL AS redriven; redriven ---------- t (1 row) RESET client_min_messages; SELECT status = 'redriven' AS dlq_redriven FROM ulak.dlq WHERE id = (SELECT id FROM redrive_dlq); dlq_redriven -------------- t (1 row) SELECT status = 'pending' AS requeue_pending, retry_count = 0 AS retries_reset FROM ulak.queue WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb AND status = 'pending'; requeue_pending | retries_reset -----------------+--------------- t | t (1 row) DROP TABLE redrive_dlq; -- ============================================================================ -- REDRIVE: redrive_message() NOT FOUND -- ============================================================================ DO $$ BEGIN PERFORM ulak.redrive_message(999999999); EXCEPTION WHEN OTHERS THEN RAISE NOTICE 'redrive nonexistent: rejected'; END $$; NOTICE: redrive nonexistent: rejected -- ============================================================================ -- REDRIVE: redrive_endpoint() -- ============================================================================ INSERT INTO ulak.dlq ( original_message_id, endpoint_id, endpoint_name, protocol, payload, retry_count, last_error, status, original_created_at, failed_at ) SELECT 3000+s, (SELECT endpoint_id FROM lc_ids), 'lc_ep', 'http', format('{"ep_redrive": %s}', s)::jsonb, 5, 'err', 'failed', NOW()-interval '1 hour', NOW()-interval '30 minutes' FROM generate_series(1, 2) AS s; SET client_min_messages = warning; SELECT ulak.redrive_endpoint('lc_ep') AS ep_redriven; ep_redriven ------------- 2 (1 row) RESET client_min_messages; SELECT count(*) AS redriven_count FROM ulak.dlq WHERE endpoint_name = 'lc_ep' AND payload->>'ep_redrive' IS NOT NULL AND status = 'redriven'; redriven_count ---------------- 2 (1 row) -- ============================================================================ -- REDRIVE: redrive_all() -- ============================================================================ INSERT INTO ulak.dlq ( original_message_id, endpoint_id, endpoint_name, protocol, payload, retry_count, last_error, status, original_created_at, failed_at ) SELECT 4000+s, (SELECT endpoint_id FROM lc_ids), 'lc_ep', 'http', format('{"all_redrive": %s}', s)::jsonb, 3, 'err', 'failed', NOW()-interval '2 hours', NOW()-interval '1 hour' FROM generate_series(1, 3) AS s; SET client_min_messages = warning; SELECT ulak.redrive_all() AS all_redriven; all_redriven -------------- 3 (1 row) RESET client_min_messages; SELECT count(*) = 0 AS no_failed_left FROM ulak.dlq WHERE status = 'failed'; no_failed_left ---------------- t (1 row) -- ============================================================================ -- REPLAY_RANGE: ordering_key preserved -- ============================================================================ INSERT INTO ulak.archive ( id, endpoint_id, payload, status, priority, correlation_id, payload_hash, ordering_key, headers, metadata, created_at, updated_at, archived_at ) SELECT 99999, (SELECT endpoint_id FROM lc_ids), '{"replay_range": "ordering"}'::jsonb, 'completed', 0, NULL, md5('{"replay_range": "ordering"}'), 'order-replay-001', NULL, NULL, NOW() - interval '2 hours', NOW() - interval '2 hours', NOW() - interval '1 hour'; SET client_min_messages = warning; SELECT ulak.replay_range( (SELECT endpoint_id FROM lc_ids), NOW() - interval '3 hours', NOW() - interval '1 hour' ) AS replayed_range_count; replayed_range_count ---------------------- 1 (1 row) RESET client_min_messages; SELECT ordering_key = 'order-replay-001' AS replay_range_keeps_ordering_key FROM ulak.queue WHERE payload @> '{"replay_range": "ordering"}'::jsonb AND status = 'pending'; replay_range_keeps_ordering_key --------------------------------- t (1 row) -- ============================================================================ -- DLQ_SUMMARY -- ============================================================================ SELECT endpoint_name IS NOT NULL AS has_name, redriven_count > 0 AS has_redriven FROM ulak.dlq_summary() WHERE endpoint_name = 'lc_ep'; has_name | has_redriven ----------+-------------- t | t (1 row) -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); DELETE FROM ulak.dlq WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); DELETE FROM ulak.archive WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); DELETE FROM ulak.event_log; DROP TABLE lc_ids; SELECT ulak.drop_endpoint('lc_ep'); INFO: [ulak] Dropped endpoint 'lc_ep' drop_endpoint --------------- t (1 row)