-- CREATE pgmq. CREATE EXTENSION IF NOT EXISTS pgmq; CREATE EXTENSION IF NOT EXISTS pg_partman; -- test_unlogged -- CREATE with default retention and partition strategy SELECT pgmq.create_unlogged('test_unlogged_queue'); create_unlogged ----------------- (1 row) SELECT * from pgmq.send('test_unlogged_queue', '{"hello": "world"}', '{"header": 1}'::jsonb); send ------ 1 (1 row) SELECT msg_id, read_ct, enqueued_at > NOW(), vt > NOW(), message, headers FROM pgmq.read('test_unlogged_queue', 2, 1); msg_id | read_ct | ?column? | ?column? | message | headers --------+---------+----------+----------+--------------------+--------------- 1 | 1 | f | t | {"hello": "world"} | {"header": 1} (1 row) -- test_max_queue_name_size -- CREATE with default retention and partition strategy SELECT pgmq.create(repeat('a', 48)); ERROR: queue name is too long, maximum length is 47 characters CONTEXT: PL/pgSQL function pgmq.validate_queue_name(text) line 8 at RAISE SQL statement "SELECT pgmq.validate_queue_name(queue_name)" PL/pgSQL function pgmq.create_non_partitioned(text) line 7 at PERFORM SQL statement "SELECT pgmq.create_non_partitioned(queue_name)" PL/pgSQL function pgmq."create"(text) line 3 at PERFORM SELECT pgmq.create(repeat('a', 47)); create -------- (1 row) SELECT pgmq.create_partitioned(repeat('p', 48)); ERROR: queue name is too long, maximum length is 47 characters CONTEXT: PL/pgSQL function pgmq.validate_queue_name(text) line 8 at RAISE SQL statement "SELECT pgmq.validate_queue_name(queue_name)" PL/pgSQL function pgmq.create_partitioned(text,text,text) line 11 at PERFORM SELECT pgmq.create_partitioned(repeat('p', 47)); create_partitioned -------------------- (1 row) -- test_lifecycle -- CREATE with default retention and partition strategy SELECT pgmq.create('test_default_queue'); create -------- (1 row) -- creating a queue must be idempotent -- create with same name again, must be no error SELECT pgmq.create('test_default_queue'); NOTICE: relation "q_test_default_queue" already exists, skipping NOTICE: relation "a_test_default_queue" already exists, skipping NOTICE: relation "q_test_default_queue_vt_idx" already exists, skipping NOTICE: relation "archived_at_idx_test_default_queue" already exists, skipping create -------- (1 row) SELECT * from pgmq.send('test_default_queue', '{"hello": "world"}'); send ------ 1 (1 row) -- read message -- vt=0, limit=1 \set msg_id 1 SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 0, 1); ?column? ---------- t (1 row) -- read message using conditional SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1, '{"hello": "world"}'); ?column? ---------- t (1 row) -- set VT to 5 seconds SELECT vt > clock_timestamp() + '4 seconds'::interval FROM pgmq.set_vt('test_default_queue', :msg_id, 5); ?column? ---------- t (1 row) -- read again, assert no messages because we just set VT to the future SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); ?column? ---------- (0 rows) -- read again, now using poll to block until message is ready SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10); ?column? ---------- t (1 row) -- set VT to 5 seconds again for another read_with_poll test SELECT vt > clock_timestamp() + '4 seconds'::interval FROM pgmq.set_vt('test_default_queue', :msg_id, 5); ?column? ---------- t (1 row) -- read again, now using poll to block until message is ready SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue', 10, 1, 10, 100, '{"hello": "world"}'); ?column? ---------- t (1 row) -- after reading it, set VT to now SELECT msg_id = :msg_id FROM pgmq.set_vt('test_default_queue', :msg_id, 0); ?column? ---------- t (1 row) -- read again, should have msg_id 1 again SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); ?column? ---------- t (1 row) SELECT pgmq.create('test_default_queue_vt'); create -------- (1 row) -- send message with timestamp SELECT * from pgmq.send('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); send ------ 1 (1 row) -- read, assert no messages because we set timestamp to the future SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue_vt', 2, 1); ?column? ---------- (0 rows) -- read again, now using poll to block until message is ready SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue_vt', 10, 1, 10); ?column? ---------- t (1 row) -- send a batch of 2 messages SELECT pgmq.create('batch_queue'); create -------- (1 row) SELECT ARRAY( SELECT pgmq.send_batch( 'batch_queue', ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], ARRAY['{"header": 1}', '{"header": 2}']::jsonb[] )) = ARRAY[1, 2]::BIGINT[]; ?column? ---------- t (1 row) SELECT msg_id, message, headers from pgmq.read('batch_queue', 0, 2); msg_id | message | headers --------+----------------------+--------------- 1 | {"hello": "world_0"} | {"header": 1} 2 | {"hello": "world_1"} | {"header": 2} (2 rows) -- send a batch of 2 messages with timestamp SELECT pgmq.create('batch_queue_vt'); create -------- (1 row) SELECT ARRAY( SELECT pgmq.send_batch( 'batch_queue_vt', ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], CURRENT_TIMESTAMP + '5 seconds'::interval )) = ARRAY[1, 2]::BIGINT[]; ?column? ---------- t (1 row) -- CREATE with 5 seconds per partition, 10 seconds retention SELECT pgmq.create_partitioned('test_duration_queue', '5 seconds', '10 seconds'); create_partitioned -------------------- (1 row) -- CREATE with 10 messages per partition, 20 messages retention SELECT pgmq.create_partitioned('test_numeric_queue', '10 seconds', '20 seconds'); create_partitioned -------------------- (1 row) -- create a queue for metrics SELECT pgmq.create('test_metrics_queue'); create -------- (1 row) -- doing some operations to get some numbers in SELECT pgmq.send_batch('test_metrics_queue', ARRAY['1', '2', '3', '4', '5']::jsonb[]); send_batch ------------ 1 2 3 4 5 (5 rows) SELECT pgmq.send_batch('test_metrics_queue', ARRAY['6', '7']::jsonb[], 10); send_batch ------------ 6 7 (2 rows) SELECT pgmq.archive('test_metrics_queue', 1); archive --------- t (1 row) -- actually reading metrics SELECT queue_name, queue_length, newest_msg_age_sec, oldest_msg_age_sec, total_messages, queue_visible_length FROM pgmq.metrics('test_metrics_queue'); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | queue_visible_length --------------------+--------------+--------------------+--------------------+----------------+---------------------- test_metrics_queue | 6 | 0 | 0 | 7 | 4 (1 row) -- get metrics all SELECT COUNT(1) from pgmq.metrics_all(); count ------- 10 (1 row) -- delete an existing queue returns true select pgmq.create('exists'); create -------- (1 row) select pgmq.drop_queue('exists'); drop_queue ------------ t (1 row) -- delete a queue does not exists returns false select pgmq.drop_queue('does_not_exist'); NOTICE: pgmq queue `does_not_exist` does not exist drop_queue ------------ f (1 row) -- delete all the queues -- delete partitioned queues SELECT pgmq.drop_queue(queue, true) FROM unnest('{test_numeric_queue}'::text[]) AS queue; WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead drop_queue ------------ t (1 row) -- test automatic partitioned status checking SELECT pgmq.drop_queue(queue) FROM unnest('{test_duration_queue}'::text[]) AS queue; drop_queue ------------ t (1 row) -- drop the rest of the queues SELECT pgmq.drop_queue(q.queue_name, true) FROM (SELECT queue_name FROM pgmq.list_queues()) AS q; WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead drop_queue ------------ t t t t t t t t (8 rows) SELECT queue_name FROM pgmq.list_queues(); queue_name ------------ (0 rows) -- test_archive SELECT pgmq.create('archive_queue'); create -------- (1 row) -- no messages in the queue SELECT COUNT(*) = 0 FROM pgmq.q_archive_queue; ?column? ---------- t (1 row) -- no messages in queue archive SELECT COUNT(*) = 0 FROM pgmq.a_archive_queue; ?column? ---------- t (1 row) -- put messages on the queue \set msg_id1 1::bigint \set msg_id2 2::bigint SELECT send = :msg_id1 FROM pgmq.send('archive_queue', '0', '{"headers": 1}'::jsonb); ?column? ---------- t (1 row) SELECT send = :msg_id2 FROM pgmq.send('archive_queue', '0'); ?column? ---------- t (1 row) -- two messages in the queue SELECT COUNT(*) = 2 FROM pgmq.q_archive_queue; ?column? ---------- t (1 row) -- archive the message. The first two exist so the id should be returned, the -- last one doesn't SELECT ARRAY( SELECT * FROM pgmq.archive('archive_queue', ARRAY[:msg_id1, :msg_id2]) ) = ARRAY[:msg_id1, :msg_id2]; ?column? ---------- t (1 row) -- should be no messages left on the queue table SELECT COUNT(*) = 0 FROM pgmq.q_archive_queue; ?column? ---------- t (1 row) -- should be two messages in archive SELECT COUNT(*) = 2 FROM pgmq.a_archive_queue; ?column? ---------- t (1 row) \set msg_id3 3::bigint SELECT send = :msg_id3 FROM pgmq.send('archive_queue', '0'); ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.q_archive_queue; ?column? ---------- t (1 row) SELECT * FROM pgmq.archive('archive_queue', :msg_id3); archive --------- t (1 row) SELECT COUNT(*) = 0 FROM pgmq.q_archive_queue; ?column? ---------- t (1 row) SELECT COUNT(*) = 3 FROM pgmq.a_archive_queue; ?column? ---------- t (1 row) -- body and headers are archived SELECT msg_id, message, headers FROM pgmq.a_archive_queue; msg_id | message | headers --------+---------+---------------- 1 | 0 | {"headers": 1} 2 | 0 | 3 | 0 | (3 rows) -- test_read_read_with_poll -- Creating queue SELECT pgmq.create('test_read_queue'); create -------- (1 row) -- Sending 3 messages to the queue SELECT send = :msg_id1 FROM pgmq.send('test_read_queue', '0'); ?column? ---------- t (1 row) SELECT send = :msg_id2 FROM pgmq.send('test_read_queue', '0'); ?column? ---------- t (1 row) SELECT send = :msg_id3 FROM pgmq.send('test_read_queue', '0'); ?column? ---------- t (1 row) -- Reading with limit respects the limit SELECT msg_id = :msg_id1 FROM pgmq.read('test_read_queue', 5, 1); ?column? ---------- t (1 row) -- Reading respects the VT SELECT ARRAY( SELECT msg_id FROM pgmq.read('test_read_queue', 10, 5) ) = ARRAY[:msg_id2, :msg_id3]; ?column? ---------- t (1 row) -- Read with poll will poll until the first message is available SELECT clock_timestamp() AS start \gset SELECT msg_id = :msg_id1 FROM pgmq.read_with_poll('test_read_queue', 10, 5, 6, 100); ?column? ---------- t (1 row) SELECT clock_timestamp() - :'start' > '3 second'::interval; ?column? ---------- t (1 row) -- test_purge_queue SELECT pgmq.create('test_purge_queue'); create -------- (1 row) SELECT * from pgmq.send('test_purge_queue', '0'); send ------ 1 (1 row) SELECT * from pgmq.send('test_purge_queue', '0'); send ------ 2 (1 row) SELECT * from pgmq.send('test_purge_queue', '0'); send ------ 3 (1 row) SELECT * from pgmq.send('test_purge_queue', '0'); send ------ 4 (1 row) SELECT * from pgmq.send('test_purge_queue', '0'); send ------ 5 (1 row) SELECT * FROM pgmq.purge_queue('test_purge_queue'); purge_queue ------------- 5 (1 row) SELECT COUNT(*) = 0 FROM pgmq.q_test_purge_queue; ?column? ---------- t (1 row) -- test_pop SELECT pgmq.create('test_pop_queue'); create -------- (1 row) SELECT * FROM pgmq.pop('test_pop_queue'); msg_id | read_ct | enqueued_at | vt | message | headers --------+---------+-------------+----+---------+--------- (0 rows) SELECT send AS first_msg_id from pgmq.send('test_pop_queue', '0') \gset SELECT * from pgmq.send('test_pop_queue', '0'); send ------ 2 (1 row) SELECT * from pgmq.send('test_pop_queue', '0'); send ------ 3 (1 row) SELECT msg_id = :first_msg_id FROM pgmq.pop('test_pop_queue'); ?column? ---------- t (1 row) SELECT msg_id from pgmq.pop('test_pop_queue', 10); msg_id -------- 2 3 (2 rows) SELECT * from pgmq.send('test_pop_queue', '1'); send ------ 4 (1 row) SELECT * from pgmq.send('test_pop_queue', '2'); send ------ 5 (1 row) SELECT * from pgmq.send('test_pop_queue', '3'); send ------ 6 (1 row) SELECT msg_id from pgmq.pop('test_pop_queue', 3); msg_id -------- 4 5 6 (3 rows) -- test_set_vt SELECT pgmq.create('test_set_vt_queue'); create -------- (1 row) SELECT * FROM pgmq.set_vt('test_set_vt_queue', 9999, 0); msg_id | read_ct | enqueued_at | vt | message | headers --------+---------+-------------+----+---------+--------- (0 rows) SELECT send AS first_msg_id from pgmq.send('test_set_vt_queue', '0') \gset -- set message invisible for 100 seconds SELECT msg_id FROM pgmq.set_vt('test_set_vt_queue', :first_msg_id, 100); msg_id -------- 1 (1 row) -- read message, it should not be visible SELECT msg_id from pgmq.read('test_set_vt_queue', 1, 1); msg_id -------- (0 rows) -- make it visible SELECT msg_id FROM pgmq.set_vt('test_set_vt_queue', :first_msg_id, 0); msg_id -------- 1 (1 row) -- set vt works if message is readable SELECT msg_id from pgmq.read('test_set_vt_queue', 1, 1); msg_id -------- 1 (1 row) -- test_partitioned_delete \set partition_interval 2 \set retention_interval 2 -- We first will drop pg_partman and assert that create fails without the -- extension installed DROP EXTENSION pg_partman; SELECT * FROM pgmq.create_partitioned( 'test_partitioned_queue', :'partition_interval', :'retention_interval' ); ERROR: pg_partman is required for partitioned queues CONTEXT: PL/pgSQL function pgmq._ensure_pg_partman_installed() line 4 at RAISE SQL statement "SELECT pgmq._ensure_pg_partman_installed()" PL/pgSQL function pgmq.create_partitioned(text,text,text) line 13 at PERFORM -- With the extension existing, the queue is created successfully CREATE EXTENSION pg_partman; SELECT * FROM pgmq.create_partitioned( 'test_partitioned_queue', :'partition_interval', :'retention_interval' ); create_partitioned -------------------- (1 row) -- queue shows up in list queues SELECT queue_name FROM pgmq.list_queues() WHERE queue_name = 'test_partitioned_queue'; queue_name ------------------------ test_partitioned_queue (1 row) -- Sending 3 messages to the queue SELECT send AS msg_id1 from pgmq.send('test_partitioned_queue', '0') \gset SELECT send AS msg_id2 from pgmq.send('test_partitioned_queue', '0') \gset SELECT send AS msg_id3 from pgmq.send('test_partitioned_queue', '0') \gset SELECT COUNT(*) = 3 FROM pgmq.q_test_partitioned_queue; ?column? ---------- t (1 row) -- Deleting message 3 SELECT * FROM pgmq.delete('test_partitioned_queue', :msg_id3); delete -------- t (1 row) SELECT COUNT(*) = 2 FROM pgmq.q_test_partitioned_queue; ?column? ---------- t (1 row) -- Deleting batch SELECT ARRAY( SELECT archive FROM pgmq.archive( 'test_partitioned_queue', ARRAY[:msg_id1, :msg_id2, :msg_id3, -3] ) ) = ARRAY[:msg_id1, :msg_id2]::bigint[]; ?column? ---------- t (1 row) -- test_transaction_create BEGIN; SELECT pgmq.create('transaction_test_queue'); create -------- (1 row) ROLLBACK; SELECT tablename FROM pg_tables WHERE schemaname = 'pgmq' AND tablename = 'q_transaction_test_queue'; tablename ----------- (0 rows) -- test_detach_archive SELECT pgmq.create('detach_archive_queue'); create -------- (1 row) DROP EXTENSION pgmq CASCADE; SELECT tablename FROM pg_tables WHERE schemaname = 'pgmq' AND tablename = 'a_detach_archive_queue'; tablename ------------------------ a_detach_archive_queue (1 row) -- queues and archive remains CREATE EXTENSION pgmq; SELECT pgmq.create('queue_remains'); create -------- (1 row) DROP EXTENSION pgmq CASCADE; SELECT tablename FROM pg_tables WHERE schemaname = 'pgmq' AND tablename = 'q_queue_remains'; tablename ----------------- q_queue_remains (1 row) SELECT tablename FROM pg_tables WHERE schemaname = 'pgmq' AND tablename = 'a_queue_remains'; tablename ----------------- a_queue_remains (1 row) --Truncated Index When queue name is max. CREATE EXTENSION pgmq; SELECT pgmq.create('long_queue_name_123456789012345678901234567890'); create -------- (1 row) SELECT pgmq.convert_archive_partitioned('long_queue_name_123456789012345678901234567890'); NOTICE: identifier "archived_at_idx_long_queue_name_123456789012345678901234567890_old" will be truncated to "archived_at_idx_long_queue_name_123456789012345678901234567890_" convert_archive_partitioned ----------------------------- (1 row) --Check for archive is already partitioned SELECT pgmq.convert_archive_partitioned('long_queue_name_123456789012345678901234567890'); NOTICE: Table a_long_queue_name_123456789012345678901234567890s is already partitioned convert_archive_partitioned ----------------------------- (1 row) --Error out due to Index duplicate index at old table. SELECT pgmq.create('long_queue_name_1234567890123456789012345678901'); create -------- (1 row) SELECT pgmq.convert_archive_partitioned('long_queue_name_1234567890123456789012345678901'); NOTICE: identifier "archived_at_idx_long_queue_name_1234567890123456789012345678901_old" will be truncated to "archived_at_idx_long_queue_name_1234567890123456789012345678901" ERROR: relation "archived_at_idx_long_queue_name_1234567890123456789012345678901" already exists CONTEXT: SQL statement "ALTER INDEX pgmq.archived_at_idx_long_queue_name_1234567890123456789012345678901 RENAME TO archived_at_idx_long_queue_name_1234567890123456789012345678901_old" PL/pgSQL function pgmq.convert_archive_partitioned(text,text,text,integer) line 34 at EXECUTE --Success SELECT pgmq.create('long_queue_name_'); create -------- (1 row) SELECT pgmq.convert_archive_partitioned('long_queue_name_'); convert_archive_partitioned ----------------------------- (1 row) \set SHOW_CONTEXT never --Failed SQL injection attack SELECT pgmq.create('abc'); create -------- (1 row) SELECT pgmq.delete( 'abc where false; create table public.attack_vector(id int); delete from pgmq.q_abc', 1 ); ERROR: queue name contains invalid characters: $, ;, --, or \' --Special characters in queue name SELECT pgmq.create('queue-hyphened'); create -------- (1 row) SELECT pgmq.send('queue-hyphened', '{"hello":"world"}'); send ------ 1 (1 row) SELECT msg_id, read_ct, message FROM pgmq.read('queue-hyphened', 1, 1); msg_id | read_ct | message --------+---------+-------------------- 1 | 1 | {"hello": "world"} (1 row) SELECT pgmq.archive('queue-hyphened', 1); archive --------- t (1 row) SELECT pgmq.create('QueueCased'); create -------- (1 row) SELECT pgmq.send('QueueCased', '{"hello":"world"}'); send ------ 1 (1 row) SELECT msg_id, read_ct, message FROM pgmq.read('QueueCased', 1, 1); msg_id | read_ct | message --------+---------+-------------------- 1 | 1 | {"hello": "world"} (1 row) SELECT pgmq.archive('QueueCased', 1); archive --------- t (1 row) SELECT pgmq.create_partitioned('queue-hyphened-part'); create_partitioned -------------------- (1 row) SELECT pgmq.send('queue-hyphened-part', '{"hello":"world"}'); send ------ 1 (1 row) SELECT msg_id, read_ct, message FROM pgmq.read('queue-hyphened-part', 1, 1); msg_id | read_ct | message --------+---------+-------------------- 1 | 1 | {"hello": "world"} (1 row) SELECT pgmq.archive('queue-hyphened-part', 1); archive --------- t (1 row) SELECT pgmq.create_partitioned('QueueCasedPart'); create_partitioned -------------------- (1 row) SELECT pgmq.send('QueueCasedPart', '{"hello":"world"}'); send ------ 1 (1 row) SELECT msg_id, read_ct, message FROM pgmq.read('QueueCasedPart', 1, 1); msg_id | read_ct | message --------+---------+-------------------- 1 | 1 | {"hello": "world"} (1 row) SELECT pgmq.archive('QueueCasedPart', 1); archive --------- t (1 row) -- fails with invalid queue name SELECT pgmq.create('dollar$-signed'); ERROR: queue name contains invalid characters: $, ;, --, or \' SELECT pgmq.create_partitioned('dollar$-signed-part'); ERROR: queue name contains invalid characters: $, ;, --, or \' -- input validation success SELECT pgmq.format_table_name('cat', 'q'); format_table_name ------------------- q_cat (1 row) SELECT pgmq.format_table_name('cat-dog', 'a'); format_table_name ------------------- a_cat-dog (1 row) SELECT pgmq.format_table_name('cat_dog', 'q'); format_table_name ------------------- q_cat_dog (1 row) -- input validation failure SELECT pgmq.format_table_name('dollar$fail', 'q'); ERROR: queue name contains invalid characters: $, ;, --, or \' SELECT pgmq.format_table_name('double--hyphen-fail', 'a'); ERROR: queue name contains invalid characters: $, ;, --, or \' SELECT pgmq.format_table_name('semicolon;fail', 'a'); ERROR: queue name contains invalid characters: $, ;, --, or \' SELECT pgmq.format_table_name($$single'quote-fail$$, 'a'); ERROR: queue name contains invalid characters: $, ;, --, or \' -- test null message SELECT pgmq.create('null_message_queue'); create -------- (1 row) SELECT pgmq.send('null_message_queue', NULL); send ------ 1 (1 row) SELECT msg_id, read_ct, message FROM pgmq.read('null_message_queue', 1, 1); msg_id | read_ct | message --------+---------+--------- 1 | 1 | (1 row) --Cleanup tests DROP EXTENSION pgmq CASCADE; DROP EXTENSION pg_partman CASCADE;