-- SQS-STYLE FIFO TESTS ONLY -- This test file validates the SQS-style FIFO queue implementation -- Stabilize output and ensure clean extension state SET client_min_messages = warning; DROP EXTENSION IF EXISTS pgmq CASCADE; CREATE EXTENSION pgmq; -- Setup test environment SELECT pgmq.create('fifo_test_queue'); create -------- (1 row) -- test_fifo_sqs_style_basic_batch_filling -- Create multiple groups with different message counts SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "message": 1}'::jsonb, '{"x-pgmq-group": "group_A"}'::jsonb); send ------ 1 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "message": 2}'::jsonb, '{"x-pgmq-group": "group_A"}'::jsonb); send ------ 2 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "message": 3}'::jsonb, '{"x-pgmq-group": "group_A"}'::jsonb); send ------ 3 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "B", "message": 1}'::jsonb, '{"x-pgmq-group": "group_B"}'::jsonb); send ------ 4 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "B", "message": 2}'::jsonb, '{"x-pgmq-group": "group_B"}'::jsonb); send ------ 5 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "C", "message": 1}'::jsonb, '{"x-pgmq-group": "group_C"}'::jsonb); send ------ 6 (1 row) -- Verify we have 6 messages in queue SELECT COUNT(*) = 6 FROM pgmq.q_fifo_test_queue; ?column? ---------- t (1 row) -- SQS-style should return multiple messages from the same group (group A first) -- Request 4 messages - should get all 3 from group A + 1 from group B WITH results AS ( SELECT * FROM pgmq.read_grouped('fifo_test_queue', 10, 4) ) SELECT (SELECT COUNT(*) FROM results) = 4 as count_correct, (SELECT ARRAY_AGG((message->>'group')::text ORDER BY msg_id) FROM results) = ARRAY['A', 'A', 'A', 'B']::text[] as order_correct; count_correct | order_correct ---------------+--------------- t | t (1 row) -- Clean up for next SQS test SELECT * FROM pgmq.purge_queue('fifo_test_queue'); purge_queue ------------- 6 (1 row) -- test_fifo_sqs_style_mixed_groups -- SQS-style with mixed groups (with and without FIFO headers) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "default1"}'::jsonb); send ------ 7 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "default2"}'::jsonb); send ------ 8 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "fifo1"}'::jsonb, '{"x-pgmq-group": "group1"}'::jsonb); send ------ 9 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "fifo2"}'::jsonb, '{"x-pgmq-group": "group1"}'::jsonb); send ------ 10 (1 row) -- SQS-style should handle mixed groups correctly -- Should return all 4 messages, with default group messages first, then group1 messages WITH results AS ( SELECT * FROM pgmq.read_grouped('fifo_test_queue', 10, 10) ) SELECT (SELECT COUNT(*) FROM results) = 4 as count_correct, (SELECT ARRAY_AGG((message->>'message')::text ORDER BY msg_id) FROM results) = ARRAY['default1', 'default2', 'fifo1', 'fifo2']::text[] as correct_mixed_order; count_correct | correct_mixed_order ---------------+--------------------- t | t (1 row) -- Clean up for next test SELECT * FROM pgmq.purge_queue('fifo_test_queue'); purge_queue ------------- 4 (1 row) -- test_fifo_sqs_style_all_messages_read -- SQS-style reading all messages from a single group SELECT * FROM pgmq.send('fifo_test_queue', '{"type": "order", "priority": "high"}'::jsonb, '{"x-pgmq-group": "orders"}'::jsonb); send ------ 11 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"type": "order", "priority": "medium"}'::jsonb, '{"x-pgmq-group": "orders"}'::jsonb); send ------ 12 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"type": "notification", "priority": "low"}'::jsonb, '{"x-pgmq-group": "orders"}'::jsonb); send ------ 13 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"type": "order", "priority": "low"}'::jsonb, '{"x-pgmq-group": "orders"}'::jsonb); send ------ 14 (1 row) -- Should return all 4 messages in FIFO order from the orders group WITH results AS ( SELECT * FROM pgmq.read_grouped('fifo_test_queue', 10, 10) ) SELECT (SELECT COUNT(*) FROM results) = 4 as count_correct, (SELECT ARRAY_AGG((message->>'priority')::text ORDER BY msg_id) FROM results) = ARRAY['high', 'medium', 'low', 'low']::text[] as correct_fifo_order; count_correct | correct_fifo_order ---------------+-------------------- t | t (1 row) -- Clean up for next test SELECT * FROM pgmq.purge_queue('fifo_test_queue'); purge_queue ------------- 4 (1 row) -- test_fifo_sqs_style_visibility_timeout -- SQS-style with visibility timeout SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "timeout1"}'::jsonb, '{"x-pgmq-group": "timeout_group"}'::jsonb); send ------ 15 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "timeout2"}'::jsonb, '{"x-pgmq-group": "timeout_group"}'::jsonb); send ------ 16 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "timeout3"}'::jsonb, '{"x-pgmq-group": "timeout_group"}'::jsonb); send ------ 17 (1 row) -- Read with short visibility timeout - should get all 3 messages SELECT COUNT(*) = 3 FROM pgmq.read_grouped('fifo_test_queue', 1, 10); ?column? ---------- t (1 row) -- Should return no messages (all messages still visible) SELECT COUNT(*) = 0 FROM pgmq.read_grouped('fifo_test_queue', 10, 10); ?column? ---------- t (1 row) -- Wait for visibility timeout to expire SELECT pg_sleep(2); pg_sleep ---------- (1 row) -- Should now return all messages again WITH results AS ( SELECT * FROM pgmq.read_grouped('fifo_test_queue', 10, 10) ) SELECT (SELECT COUNT(*) FROM results) = 3 as count_correct, (SELECT COUNT(DISTINCT msg_id) FROM results) = 3 as all_unique; count_correct | all_unique ---------------+------------ t | t (1 row) -- Clean up for next test SELECT * FROM pgmq.purge_queue('fifo_test_queue'); purge_queue ------------- 3 (1 row) -- test_fifo_sqs_style_polling -- SQS-style polling functionality SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "poll_test1"}'::jsonb, '{"x-pgmq-group": "poll_group"}'::jsonb); send ------ 18 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "poll_test2"}'::jsonb, '{"x-pgmq-group": "poll_group"}'::jsonb); send ------ 19 (1 row) -- Test SQS-style polling with immediate availability WITH results AS ( SELECT * FROM pgmq.read_grouped_with_poll('fifo_test_queue', 10, 10, 1, 100) ) SELECT (SELECT COUNT(*) FROM results) = 2 as count_correct, (SELECT COUNT(DISTINCT msg_id) FROM results) = 2 as all_unique; count_correct | all_unique ---------------+------------ t | t (1 row) -- Clean up for next test SELECT * FROM pgmq.purge_queue('fifo_test_queue'); purge_queue ------------- 2 (1 row) -- test_fifo_sqs_style_batch_sizes -- SQS-style with different batch sizes -- Create 5 messages in group A, 3 in group B SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "seq": 1}'::jsonb, '{"x-pgmq-group": "batch_group_A"}'::jsonb); send ------ 20 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "seq": 2}'::jsonb, '{"x-pgmq-group": "batch_group_A"}'::jsonb); send ------ 21 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "seq": 3}'::jsonb, '{"x-pgmq-group": "batch_group_A"}'::jsonb); send ------ 22 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "seq": 4}'::jsonb, '{"x-pgmq-group": "batch_group_A"}'::jsonb); send ------ 23 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "A", "seq": 5}'::jsonb, '{"x-pgmq-group": "batch_group_A"}'::jsonb); send ------ 24 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "B", "seq": 1}'::jsonb, '{"x-pgmq-group": "batch_group_B"}'::jsonb); send ------ 25 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "B", "seq": 2}'::jsonb, '{"x-pgmq-group": "batch_group_B"}'::jsonb); send ------ 26 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"group": "B", "seq": 3}'::jsonb, '{"x-pgmq-group": "batch_group_B"}'::jsonb); send ------ 27 (1 row) -- Test batch size 3 - should get 3 messages from group A WITH results AS ( SELECT * FROM pgmq.read_grouped('fifo_test_queue', 10, 3) ) SELECT (SELECT COUNT(*) FROM results) = 3 as count_correct, (SELECT ARRAY_AGG((message->>'group')::text ORDER BY msg_id) FROM results) = ARRAY['A', 'A', 'A']::text[] as all_from_group_a; count_correct | all_from_group_a ---------------+------------------ t | t (1 row) -- Reset visibility timeout UPDATE pgmq.q_fifo_test_queue SET vt = clock_timestamp() - interval '1 second'; -- Test batch size 7 - should get 5 from group A + 2 from group B WITH results AS ( SELECT * FROM pgmq.read_grouped('fifo_test_queue', 10, 7) ) SELECT (SELECT COUNT(*) FROM results) = 7 as count_correct, (SELECT ARRAY_AGG((message->>'group')::text ORDER BY msg_id) FROM results) = ARRAY['A', 'A', 'A', 'A', 'A', 'B', 'B']::text[] as correct_batch_order; count_correct | correct_batch_order ---------------+--------------------- t | t (1 row) -- Clean up for next test SELECT * FROM pgmq.purge_queue('fifo_test_queue'); purge_queue ------------- 8 (1 row) -- test_fifo_sqs_style_edge_cases -- SQS-style edge cases -- Test with empty FIFO key, null key, and no header (should all work as default group) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "empty_fifo_sqs"}'::jsonb, '{"x-pgmq-group": ""}'::jsonb); send ------ 28 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "null_fifo_sqs"}'::jsonb, '{"x-pgmq-group": null}'::jsonb); send ------ 29 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "no_header"}'::jsonb); send ------ 30 (1 row) SELECT * FROM pgmq.send('fifo_test_queue', '{"message": "explicit_group"}'::jsonb, '{"x-pgmq-group": "explicit"}'::jsonb); send ------ 31 (1 row) -- All three (empty, null, no header) should be treated as same default group -- Should get them in order: empty_fifo_sqs, null_fifo_sqs, no_header (all default), then explicit_group WITH results AS ( SELECT * FROM pgmq.read_grouped('fifo_test_queue', 10, 10) ) SELECT (SELECT COUNT(*) FROM results) = 4 as count_correct, (SELECT ARRAY_AGG((message->>'message')::text ORDER BY msg_id) FROM results) = ARRAY['empty_fifo_sqs', 'null_fifo_sqs', 'no_header', 'explicit_group']::text[] as all_defaults_together; count_correct | all_defaults_together ---------------+----------------------- t | t (1 row) -- Clean up SELECT pgmq.drop_queue('fifo_test_queue'); drop_queue ------------ t (1 row) -- Verify queue was dropped SELECT COUNT(*) = 0 FROM pgmq.list_queues() WHERE queue_name = 'fifo_test_queue'; ?column? ---------- t (1 row)