-- 16_message_lifecycle.sql: TTL, DLQ, idempotency, replay, redrive -- ============================================================================ -- SETUP -- ============================================================================ SELECT ulak.create_endpoint('lc_ep', 'http', '{"url": "http://localhost:9999/lc"}'::jsonb) IS NOT NULL AS ep_created; CREATE TEMP TABLE lc_ids AS SELECT id AS endpoint_id FROM ulak.endpoints WHERE name = 'lc_ep'; -- Note: max_queue_size is PGC_SIGHUP, default 1M is sufficient for tests -- ============================================================================ -- TTL EXPIRY: mark_expired_messages -- ============================================================================ -- Past expires_at → should be expired SELECT ulak.send_with_options( 'lc_ep', '{"ttl": "expired"}'::jsonb, p_expires_at => NOW() - interval '1 hour' ) IS NOT NULL AS ttl_expired_msg; -- Future expires_at → should survive SELECT ulak.send_with_options( 'lc_ep', '{"ttl": "alive"}'::jsonb, p_expires_at => NOW() + interval '1 hour' ) IS NOT NULL AS ttl_alive_msg; -- No expires_at → should survive SELECT ulak.send_with_options( 'lc_ep', '{"ttl": "none"}'::jsonb ) IS NOT NULL AS ttl_none_msg; SELECT ulak.mark_expired_messages(); SELECT payload->>'ttl' AS ttl, status FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids) AND payload ? 'ttl' ORDER BY payload->>'ttl'; DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); -- ============================================================================ -- MAX RETRIES → DLQ: archive_single_to_dlq -- ============================================================================ INSERT INTO ulak.queue (endpoint_id, payload, status, retry_count, last_error, failed_at) SELECT endpoint_id, '{"dlq_test": "max_retries"}'::jsonb, 'failed', 10, 'max retries exceeded', NOW() FROM lc_ids; CREATE TEMP TABLE dlq_msg AS SELECT id FROM ulak.queue WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb; SET client_min_messages = warning; SELECT ulak.archive_single_to_dlq((SELECT id FROM dlq_msg)); RESET client_min_messages; -- Verify in DLQ SELECT EXISTS( SELECT 1 FROM ulak.dlq WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb ) AS in_dlq; -- Verify removed from queue SELECT NOT EXISTS( SELECT 1 FROM ulak.queue WHERE id = (SELECT id FROM dlq_msg) ) AS removed_from_queue; -- Verify DLQ fields SELECT endpoint_name = 'lc_ep' AS correct_endpoint, protocol = 'http' AS correct_protocol, retry_count = 10 AS correct_retries FROM ulak.dlq WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb; DROP TABLE dlq_msg; -- ============================================================================ -- IDEMPOTENCY KEY: DUPLICATE REJECTION (pending) -- ============================================================================ SELECT ulak.send_with_options( 'lc_ep', '{"idem": "first"}'::jsonb, p_idempotency_key => 'idem-key-001' ) IS NOT NULL AS idem_first; -- Same key, same payload → returns existing (no error, no duplicate) SELECT ulak.send_with_options( 'lc_ep', '{"idem": "first"}'::jsonb, p_idempotency_key => 'idem-key-001' ) IS NOT NULL AS idem_dup; SELECT count(*) AS idem_count FROM ulak.queue WHERE idempotency_key = 'idem-key-001'; -- ============================================================================ -- IDEMPOTENCY KEY: COMPLETED ALLOWS REUSE -- ============================================================================ UPDATE ulak.queue SET status = 'completed', completed_at = NOW() WHERE idempotency_key = 'idem-key-001'; -- Same key should work now (partial index only covers pending/processing) SELECT ulak.send_with_options( 'lc_ep', '{"idem": "reused"}'::jsonb, p_idempotency_key => 'idem-key-001' ) IS NOT NULL AS idem_reuse; DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); -- ============================================================================ -- REDRIVE: redrive_message() -- ============================================================================ CREATE TEMP TABLE redrive_dlq AS SELECT id FROM ulak.dlq WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb LIMIT 1; SET client_min_messages = warning; SELECT ulak.redrive_message((SELECT id FROM redrive_dlq)) IS NOT NULL AS redriven; RESET client_min_messages; SELECT status = 'redriven' AS dlq_redriven FROM ulak.dlq WHERE id = (SELECT id FROM redrive_dlq); SELECT status = 'pending' AS requeue_pending, retry_count = 0 AS retries_reset FROM ulak.queue WHERE payload @> '{"dlq_test": "max_retries"}'::jsonb AND status = 'pending'; DROP TABLE redrive_dlq; -- ============================================================================ -- REDRIVE: redrive_message() NOT FOUND -- ============================================================================ DO $$ BEGIN PERFORM ulak.redrive_message(999999999); EXCEPTION WHEN OTHERS THEN RAISE NOTICE 'redrive nonexistent: rejected'; END $$; -- ============================================================================ -- REDRIVE: redrive_endpoint() -- ============================================================================ INSERT INTO ulak.dlq ( original_message_id, endpoint_id, endpoint_name, protocol, payload, retry_count, last_error, status, original_created_at, failed_at ) SELECT 3000+s, (SELECT endpoint_id FROM lc_ids), 'lc_ep', 'http', format('{"ep_redrive": %s}', s)::jsonb, 5, 'err', 'failed', NOW()-interval '1 hour', NOW()-interval '30 minutes' FROM generate_series(1, 2) AS s; SET client_min_messages = warning; SELECT ulak.redrive_endpoint('lc_ep') AS ep_redriven; RESET client_min_messages; SELECT count(*) AS redriven_count FROM ulak.dlq WHERE endpoint_name = 'lc_ep' AND payload->>'ep_redrive' IS NOT NULL AND status = 'redriven'; -- ============================================================================ -- REDRIVE: redrive_all() -- ============================================================================ INSERT INTO ulak.dlq ( original_message_id, endpoint_id, endpoint_name, protocol, payload, retry_count, last_error, status, original_created_at, failed_at ) SELECT 4000+s, (SELECT endpoint_id FROM lc_ids), 'lc_ep', 'http', format('{"all_redrive": %s}', s)::jsonb, 3, 'err', 'failed', NOW()-interval '2 hours', NOW()-interval '1 hour' FROM generate_series(1, 3) AS s; SET client_min_messages = warning; SELECT ulak.redrive_all() AS all_redriven; RESET client_min_messages; SELECT count(*) = 0 AS no_failed_left FROM ulak.dlq WHERE status = 'failed'; -- ============================================================================ -- REPLAY_RANGE: ordering_key preserved -- ============================================================================ INSERT INTO ulak.archive ( id, endpoint_id, payload, status, priority, correlation_id, payload_hash, ordering_key, headers, metadata, created_at, updated_at, archived_at ) SELECT 99999, (SELECT endpoint_id FROM lc_ids), '{"replay_range": "ordering"}'::jsonb, 'completed', 0, NULL, md5('{"replay_range": "ordering"}'), 'order-replay-001', NULL, NULL, NOW() - interval '2 hours', NOW() - interval '2 hours', NOW() - interval '1 hour'; SET client_min_messages = warning; SELECT ulak.replay_range( (SELECT endpoint_id FROM lc_ids), NOW() - interval '3 hours', NOW() - interval '1 hour' ) AS replayed_range_count; RESET client_min_messages; SELECT ordering_key = 'order-replay-001' AS replay_range_keeps_ordering_key FROM ulak.queue WHERE payload @> '{"replay_range": "ordering"}'::jsonb AND status = 'pending'; -- ============================================================================ -- DLQ_SUMMARY -- ============================================================================ SELECT endpoint_name IS NOT NULL AS has_name, redriven_count > 0 AS has_redriven FROM ulak.dlq_summary() WHERE endpoint_name = 'lc_ep'; -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); DELETE FROM ulak.dlq WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); DELETE FROM ulak.archive WHERE endpoint_id = (SELECT endpoint_id FROM lc_ids); DELETE FROM ulak.event_log; DROP TABLE lc_ids; SELECT ulak.drop_endpoint('lc_ep');