-- TOPIC-BASED ROUTING TESTS -- This test file validates the topic-based routing implementation -- Aligned with topics_test.py -- Stabilize output and ensure clean extension state SET client_min_messages = warning; DROP EXTENSION IF EXISTS pgmq CASCADE; CREATE EXTENSION pgmq; -- ============================================================================= -- Tests for validate_routing_key() -- ============================================================================= -- test_valid_simple_routing_key SELECT pgmq.validate_routing_key('logs.error') = true; ?column? ---------- t (1 row) -- test_valid_routing_key_with_hyphens SELECT pgmq.validate_routing_key('app.user-service.auth') = true; ?column? ---------- t (1 row) -- test_valid_routing_key_with_underscores SELECT pgmq.validate_routing_key('system_events.db.connection_failed') = true; ?column? ---------- t (1 row) -- test_valid_single_segment_routing_key SELECT pgmq.validate_routing_key('logs') = true; ?column? ---------- t (1 row) -- test_invalid_empty_routing_key (expect error - using DO block to catch) DO $$ BEGIN PERFORM pgmq.validate_routing_key(''); RAISE EXCEPTION 'Should have raised an error for empty routing key'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_null_routing_key DO $$ BEGIN PERFORM pgmq.validate_routing_key(NULL); RAISE EXCEPTION 'Should have raised an error for NULL routing key'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_routing_key_starts_with_dot DO $$ BEGIN PERFORM pgmq.validate_routing_key('.logs.error'); RAISE EXCEPTION 'Should have raised an error for routing key starting with dot'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_routing_key_ends_with_dot DO $$ BEGIN PERFORM pgmq.validate_routing_key('logs.error.'); RAISE EXCEPTION 'Should have raised an error for routing key ending with dot'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_routing_key_consecutive_dots DO $$ BEGIN PERFORM pgmq.validate_routing_key('logs..error'); RAISE EXCEPTION 'Should have raised an error for consecutive dots'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_routing_key_with_wildcards DO $$ BEGIN PERFORM pgmq.validate_routing_key('logs.*'); RAISE EXCEPTION 'Should have raised an error for wildcards in routing key'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_routing_key_with_special_chars DO $$ BEGIN PERFORM pgmq.validate_routing_key('logs.error!'); RAISE EXCEPTION 'Should have raised an error for special characters'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_routing_key_with_space DO $$ BEGIN PERFORM pgmq.validate_routing_key('logs error'); RAISE EXCEPTION 'Should have raised an error for space in routing key'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_routing_key_too_long DO $$ BEGIN PERFORM pgmq.validate_routing_key(repeat('a', 256)); RAISE EXCEPTION 'Should have raised an error for routing key > 255 chars'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- ============================================================================= -- Tests for validate_topic_pattern() -- ============================================================================= -- test_valid_pattern_with_star SELECT pgmq.validate_topic_pattern('logs.*') = true; ?column? ---------- t (1 row) -- test_valid_pattern_with_hash SELECT pgmq.validate_topic_pattern('logs.#') = true; ?column? ---------- t (1 row) -- test_valid_pattern_star_at_start SELECT pgmq.validate_topic_pattern('*.error') = true; ?column? ---------- t (1 row) -- test_valid_pattern_hash_at_start SELECT pgmq.validate_topic_pattern('#.error') = true; ?column? ---------- t (1 row) -- test_valid_pattern_mixed_wildcards SELECT pgmq.validate_topic_pattern('app.*.#') = true; ?column? ---------- t (1 row) -- test_valid_pattern_exact_match SELECT pgmq.validate_topic_pattern('logs.error.fatal') = true; ?column? ---------- t (1 row) -- test_valid_pattern_only_hash SELECT pgmq.validate_topic_pattern('#') = true; ?column? ---------- t (1 row) -- test_valid_pattern_only_star SELECT pgmq.validate_topic_pattern('*') = true; ?column? ---------- t (1 row) -- test_invalid_empty_pattern DO $$ BEGIN PERFORM pgmq.validate_topic_pattern(''); RAISE EXCEPTION 'Should have raised an error for empty pattern'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_null_pattern DO $$ BEGIN PERFORM pgmq.validate_topic_pattern(NULL); RAISE EXCEPTION 'Should have raised an error for NULL pattern'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_starts_with_dot DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('.logs.*'); RAISE EXCEPTION 'Should have raised an error for pattern starting with dot'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_ends_with_dot DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.*.'); RAISE EXCEPTION 'Should have raised an error for pattern ending with dot'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_consecutive_dots DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs..error'); RAISE EXCEPTION 'Should have raised an error for consecutive dots in pattern'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_consecutive_stars DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.**'); RAISE EXCEPTION 'Should have raised an error for consecutive stars'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_consecutive_hashes DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.##'); RAISE EXCEPTION 'Should have raised an error for consecutive hashes'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_adjacent_wildcards_star_hash DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.*#'); RAISE EXCEPTION 'Should have raised an error for adjacent *#'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_adjacent_wildcards_hash_star DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.#*'); RAISE EXCEPTION 'Should have raised an error for adjacent #*'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_special_chars DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.error!'); RAISE EXCEPTION 'Should have raised an error for special characters in pattern'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_invalid_pattern_too_long DO $$ BEGIN PERFORM pgmq.validate_topic_pattern(repeat('a', 256)); RAISE EXCEPTION 'Should have raised an error for pattern > 255 chars'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- ============================================================================= -- Tests for regex injection protection in validation -- ============================================================================= -- test_pattern_rejects_parentheses (e.g., logs.(foo|bar)) DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.(foo|bar)'); RAISE EXCEPTION 'Should have raised an error for parentheses in pattern'; EXCEPTION WHEN OTHERS THEN IF SQLERRM NOT LIKE '%invalid characters%' THEN RAISE EXCEPTION 'Expected invalid characters error, got: %', SQLERRM; END IF; END $$; -- test_pattern_rejects_square_brackets DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.[error]'); RAISE EXCEPTION 'Should have raised an error for square brackets in pattern'; EXCEPTION WHEN OTHERS THEN IF SQLERRM NOT LIKE '%invalid characters%' THEN RAISE EXCEPTION 'Expected invalid characters error, got: %', SQLERRM; END IF; END $$; -- test_pattern_rejects_dollar DO $$ BEGIN PERFORM pgmq.validate_topic_pattern('logs.error$'); RAISE EXCEPTION 'Should have raised an error for dollar sign in pattern'; EXCEPTION WHEN OTHERS THEN IF SQLERRM NOT LIKE '%invalid characters%' THEN RAISE EXCEPTION 'Expected invalid characters error, got: %', SQLERRM; END IF; END $$; -- test_routing_key_rejects_regex_metacharacters DO $$ BEGIN PERFORM pgmq.validate_routing_key('logs.(foo|bar)'); RAISE EXCEPTION 'Should have raised an error for regex metacharacters in routing key'; EXCEPTION WHEN OTHERS THEN IF SQLERRM NOT LIKE '%invalid characters%' THEN RAISE EXCEPTION 'Expected invalid characters error, got: %', SQLERRM; END IF; END $$; -- ============================================================================= -- Tests for bind_topic() and unbind_topic() -- ============================================================================= -- Setup test environment SELECT pgmq.create('topic_queue_1'); create -------- (1 row) SELECT pgmq.create('topic_queue_2'); create -------- (1 row) SELECT pgmq.create('topic_queue_3'); create -------- (1 row) -- test_bind_topic_creates_binding -- Bind a topic pattern to a queue SELECT pgmq.bind_topic('orders.#', 'topic_queue_1'); bind_topic ------------ (1 row) -- Verify binding was created SELECT COUNT(*) = 1 FROM pgmq.topic_bindings WHERE queue_name = 'topic_queue_1' AND pattern = 'orders.#'; ?column? ---------- t (1 row) -- test_bind_topic_idempotent -- Binding the same pattern multiple times should be idempotent SELECT pgmq.bind_topic('logs.info', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('logs.info', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('logs.info', 'topic_queue_1'); bind_topic ------------ (1 row) -- Should have exactly one binding SELECT COUNT(*) = 1 FROM pgmq.topic_bindings WHERE queue_name = 'topic_queue_1' AND pattern = 'logs.info'; ?column? ---------- t (1 row) -- test_bind_topic_multiple_patterns -- Bind multiple patterns to the same queue SELECT pgmq.bind_topic('events.*', 'topic_queue_2'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('alerts.#', 'topic_queue_2'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('system.startup', 'topic_queue_2'); bind_topic ------------ (1 row) -- Should have exactly 3 bindings for topic_queue_2 SELECT COUNT(*) = 3 FROM pgmq.topic_bindings WHERE queue_name = 'topic_queue_2'; ?column? ---------- t (1 row) -- test_bind_topic_same_pattern_different_queues -- Bind the same pattern to different queues SELECT pgmq.bind_topic('broadcasts.#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('broadcasts.#', 'topic_queue_2'); bind_topic ------------ (1 row) -- Both queues should have the binding SELECT COUNT(*) = 2 FROM pgmq.topic_bindings WHERE pattern = 'broadcasts.#'; ?column? ---------- t (1 row) -- test_unbind_topic_removes_binding -- Unbind a topic pattern SELECT pgmq.unbind_topic('logs.info', 'topic_queue_1'); unbind_topic -------------- t (1 row) -- Verify binding was removed SELECT COUNT(*) = 0 FROM pgmq.topic_bindings WHERE queue_name = 'topic_queue_1' AND pattern = 'logs.info'; ?column? ---------- t (1 row) -- test_unbind_topic_nonexistent_returns_false -- Unbinding a non-existent pattern should return false SELECT pgmq.unbind_topic('nonexistent.pattern', 'topic_queue_1') = false; ?column? ---------- t (1 row) -- Clean up bindings for next tests DELETE FROM pgmq.topic_bindings; -- ============================================================================= -- Tests for test_routing() - dry-run routing tests -- ============================================================================= -- test_routing_exact_match_dry_run SELECT pgmq.bind_topic('logs.error', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.error'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_star_wildcard_dry_run SELECT pgmq.bind_topic('logs.*.error', 'topic_queue_1'); bind_topic ------------ (1 row) -- Should match SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.api.error'); ?column? ---------- t (1 row) -- Should NOT match SELECT COUNT(*) = 0 FROM pgmq.test_routing('logs.api.db.error'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_hash_wildcard_dry_run SELECT pgmq.bind_topic('logs.#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.error'); ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.api.error'); ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.api.db.critical'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_star_vs_hash_difference_dry_run SELECT pgmq.bind_topic('logs.*', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('logs.#', 'topic_queue_2'); bind_topic ------------ (1 row) -- One segment: both match SELECT COUNT(*) = 2 FROM pgmq.test_routing('logs.error'); ?column? ---------- t (1 row) -- Two segments: only hash matches SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.error.fatal'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_multiple_queues_dry_run SELECT pgmq.bind_topic('logs.#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('logs.*.error', 'topic_queue_2'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('logs.*.critical', 'topic_queue_3'); bind_topic ------------ (1 row) -- Info message: only topic_queue_1 SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.api.info'); ?column? ---------- t (1 row) -- Error message: topic_queue_1 and topic_queue_2 SELECT COUNT(*) = 2 FROM pgmq.test_routing('logs.api.error'); ?column? ---------- t (1 row) -- Critical message: topic_queue_1 and topic_queue_3 SELECT COUNT(*) = 2 FROM pgmq.test_routing('logs.db.critical'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_no_matches_dry_run SELECT pgmq.bind_topic('logs.*', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT COUNT(*) = 0 FROM pgmq.test_routing('metrics.cpu.usage'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_hash_at_start_dry_run SELECT pgmq.bind_topic('#.error', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT COUNT(*) >= 1 FROM pgmq.test_routing('logs.error'); ?column? ---------- t (1 row) SELECT COUNT(*) >= 1 FROM pgmq.test_routing('logs.api.error'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_catch_all_dry_run SELECT pgmq.bind_topic('#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs'); ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.test_routing('logs.error'); ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.test_routing('anything.at.all'); ?column? ---------- t (1 row) DELETE FROM pgmq.topic_bindings; -- ============================================================================= -- Tests for topic_bindings() functions and bound_at timestamp -- ============================================================================= -- test_bound_at_set_on_bind -- Verify that bound_at is set when a binding is created SELECT pgmq.bind_topic('logs.#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT bound_at IS NOT NULL AND bound_at <= now() FROM pgmq.topic_bindings WHERE pattern = 'logs.#' AND queue_name = 'topic_queue_1'; ?column? ---------- t (1 row) -- test_topic_bindings_returns_all -- Test that topic_bindings() returns all bindings SELECT pgmq.bind_topic('events.*', 'topic_queue_2'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('alerts.#', 'topic_queue_3'); bind_topic ------------ (1 row) -- Should return 3 bindings total SELECT COUNT(*) = 3 FROM pgmq.list_topic_bindings(); ?column? ---------- t (1 row) -- Verify structure includes all columns SELECT COUNT(*) = 3 AS has_all_bindings, bool_and(pattern IS NOT NULL) AS has_pattern, bool_and(queue_name IS NOT NULL) AS has_queue_name, bool_and(bound_at IS NOT NULL) AS has_bound_at, bool_and(compiled_regex IS NOT NULL) AS has_compiled_regex FROM pgmq.list_topic_bindings(); has_all_bindings | has_pattern | has_queue_name | has_bound_at | has_compiled_regex ------------------+-------------+----------------+--------------+-------------------- t | t | t | t | t (1 row) -- test_list_topic_bindings_by_queue_name -- Test that list_topic_bindings(queue_name) returns bindings for specific queue SELECT pgmq.bind_topic('orders.created', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('orders.updated', 'topic_queue_1'); bind_topic ------------ (1 row) -- Should return 3 bindings for topic_queue_1 (logs.#, orders.created, orders.updated) SELECT COUNT(*) = 3 FROM pgmq.list_topic_bindings('topic_queue_1'); ?column? ---------- t (1 row) -- Should return 1 binding for topic_queue_2 SELECT COUNT(*) = 1 FROM pgmq.list_topic_bindings('topic_queue_2'); ?column? ---------- t (1 row) -- Should return 1 binding for topic_queue_3 SELECT COUNT(*) = 1 FROM pgmq.list_topic_bindings('topic_queue_3'); ?column? ---------- t (1 row) -- Verify structure for queue-specific function SELECT COUNT(*) = 3 AS correct_count, bool_and(pattern IS NOT NULL) AS has_pattern, bool_and(queue_name = 'topic_queue_1') AS correct_queue, bool_and(bound_at IS NOT NULL) AS has_bound_at, bool_and(compiled_regex IS NOT NULL) AS has_compiled_regex FROM pgmq.list_topic_bindings('topic_queue_1'); correct_count | has_pattern | correct_queue | has_bound_at | has_compiled_regex ---------------+-------------+---------------+--------------+-------------------- t | t | t | t | t (1 row) -- test_list_topic_bindings_ordering -- Verify bindings are ordered by bound_at DESC, then pattern -- Add bindings with slight delays to ensure different timestamps SELECT pgmq.bind_topic('first.pattern', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pg_sleep(0.01); pg_sleep ---------- (1 row) SELECT pgmq.bind_topic('second.pattern', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pg_sleep(0.01); pg_sleep ---------- (1 row) SELECT pgmq.bind_topic('third.pattern', 'topic_queue_1'); bind_topic ------------ (1 row) -- Get the first binding (should be most recent: third.pattern) SELECT pattern = 'third.pattern' AS most_recent_first FROM pgmq.list_topic_bindings('topic_queue_1') LIMIT 1; most_recent_first ------------------- t (1 row) -- Clean up bindings for next tests DELETE FROM pgmq.topic_bindings; -- ============================================================================= -- Tests for compiled_regex correctness (regex injection protection) -- ============================================================================= -- test_compiled_regex_escapes_dots -- Verify that dots in patterns are compiled to literal dot matchers, not regex wildcards SELECT pgmq.bind_topic('logs.error', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT compiled_regex = '^logs\.error$' AS dot_properly_escaped FROM pgmq.topic_bindings WHERE pattern = 'logs.error' AND queue_name = 'topic_queue_1'; dot_properly_escaped ---------------------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_compiled_regex_for_star_wildcard SELECT pgmq.bind_topic('logs.*', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT compiled_regex = '^logs\.[^.]+$' AS star_compiled_correctly FROM pgmq.topic_bindings WHERE pattern = 'logs.*' AND queue_name = 'topic_queue_1'; star_compiled_correctly ------------------------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_compiled_regex_for_hash_wildcard SELECT pgmq.bind_topic('logs.#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT compiled_regex = '^logs\..*$' AS hash_compiled_correctly FROM pgmq.topic_bindings WHERE pattern = 'logs.#' AND queue_name = 'topic_queue_1'; hash_compiled_correctly ------------------------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_compiled_regex_for_mixed_pattern SELECT pgmq.bind_topic('app.*.logs.#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT compiled_regex = '^app\.[^.]+\.logs\..*$' AS mixed_compiled_correctly FROM pgmq.topic_bindings WHERE pattern = 'app.*.logs.#' AND queue_name = 'topic_queue_1'; mixed_compiled_correctly -------------------------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_dot_not_matching_arbitrary_characters -- Pattern 'a.b' should NOT match routing key 'aXb' (dot must be literal) SELECT pgmq.bind_topic('a.b', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT COUNT(*) = 0 AS dot_is_literal FROM pgmq.test_routing('aXb'); dot_is_literal ---------------- t (1 row) SELECT COUNT(*) = 1 AS literal_dot_matches FROM pgmq.test_routing('a.b'); literal_dot_matches --------------------- t (1 row) DELETE FROM pgmq.topic_bindings; -- test_hyphen_in_pattern_safe -- Hyphens should be treated literally, not as regex range operators SELECT pgmq.bind_topic('my-app.logs.*', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT COUNT(*) = 1 AS hyphen_pattern_works FROM pgmq.test_routing('my-app.logs.error'); hyphen_pattern_works ---------------------- t (1 row) SELECT COUNT(*) = 0 AS hyphen_is_literal FROM pgmq.test_routing('myXapp.logs.error'); hyphen_is_literal ------------------- t (1 row) DELETE FROM pgmq.topic_bindings; -- ============================================================================= -- Tests for actual message routing with send_topic() -- ============================================================================= -- test_routing_exact_match -- Test exact routing key matching SELECT pgmq.bind_topic('orders.created', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.send_topic('orders.created', '{"order_id": 123}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- Should have 1 message in topic_queue_1 SELECT COUNT(*) = 1 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) SELECT COUNT(*) = 0 FROM pgmq.q_topic_queue_2; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 1 (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_star_wildcard -- Test star (*) wildcard - matches exactly one segment SELECT pgmq.bind_topic('logs.*.error', 'topic_queue_1'); bind_topic ------------ (1 row) -- Should match SELECT pgmq.send_topic('logs.app.error', '{"message": "error1"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) SELECT pgmq.send_topic('logs.db.error', '{"message": "error2"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- Should NOT match (wrong number of segments) SELECT pgmq.send_topic('logs.error', '{"message": "error3"}'::jsonb, NULL, 0); send_topic ------------ 0 (1 row) SELECT pgmq.send_topic('logs.app.system.error', '{"message": "error4"}'::jsonb, NULL, 0); send_topic ------------ 0 (1 row) -- Should have exactly 2 messages in topic_queue_1 SELECT COUNT(*) = 2 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 2 (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_hash_wildcard -- Test hash (#) wildcard - matches zero or more segments after the dot SELECT pgmq.bind_topic('events.#', 'topic_queue_1'); bind_topic ------------ (1 row) -- Should match (has dot after events) SELECT pgmq.send_topic('events.user', '{"event": "2"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) SELECT pgmq.send_topic('events.user.login', '{"event": "3"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) SELECT pgmq.send_topic('events.user.login.success', '{"event": "4"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- Should NOT match (no dot after events, or different prefix) SELECT pgmq.send_topic('events', '{"event": "1"}'::jsonb, NULL, 0); send_topic ------------ 0 (1 row) SELECT pgmq.send_topic('notifications.user', '{"event": "5"}'::jsonb, NULL, 0); send_topic ------------ 0 (1 row) -- Should have exactly 3 messages in topic_queue_1 SELECT COUNT(*) = 3 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 3 (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_star_vs_hash_difference -- Test the difference between * and # SELECT pgmq.bind_topic('data.*.processed', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('data.#.processed', 'topic_queue_2'); bind_topic ------------ (1 row) SELECT pgmq.send_topic('data.user.processed', '{"msg": "1"}'::jsonb, NULL, 0); send_topic ------------ 2 (1 row) SELECT pgmq.send_topic('data.order.item.processed', '{"msg": "2"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- topic_queue_1 should have 1 message (* matches exactly one segment) SELECT COUNT(*) = 1 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) -- topic_queue_2 should have 2 messages (# matches zero or more segments) SELECT COUNT(*) = 2 FROM pgmq.q_topic_queue_2; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 1 (1 row) SELECT pgmq.purge_queue('topic_queue_2'); purge_queue ------------- 2 (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_multiple_queues -- Test routing to multiple queues SELECT pgmq.bind_topic('alerts.#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('alerts.critical.#', 'topic_queue_2'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('#', 'topic_queue_3'); bind_topic ------------ (1 row) SELECT pgmq.send_topic('alerts.critical.database', '{"msg": "critical alert"}'::jsonb, NULL, 0); send_topic ------------ 3 (1 row) -- All three queues should receive the message SELECT COUNT(*) = 1 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.q_topic_queue_2; ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.q_topic_queue_3; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 1 (1 row) SELECT pgmq.purge_queue('topic_queue_2'); purge_queue ------------- 1 (1 row) SELECT pgmq.purge_queue('topic_queue_3'); purge_queue ------------- 1 (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_no_matches -- Test routing with no matching patterns SELECT pgmq.bind_topic('specific.pattern', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.send_topic('different.pattern', '{"msg": "no match"}'::jsonb, NULL, 0); send_topic ------------ 0 (1 row) -- No queues should have messages SELECT COUNT(*) = 0 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) SELECT COUNT(*) = 0 FROM pgmq.q_topic_queue_2; ?column? ---------- t (1 row) -- Clean up DELETE FROM pgmq.topic_bindings; -- test_routing_hash_at_start -- Test # wildcard at the start of pattern (matches zero or more segments before .error) SELECT pgmq.bind_topic('#.error', 'topic_queue_1'); bind_topic ------------ (1 row) -- Should match (have .error suffix) SELECT pgmq.send_topic('app.error', '{"msg": "2"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) SELECT pgmq.send_topic('system.database.error', '{"msg": "3"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- Should NOT match (no .error suffix or missing dot before error) SELECT pgmq.send_topic('error', '{"msg": "1"}'::jsonb, NULL, 0); send_topic ------------ 0 (1 row) SELECT pgmq.send_topic('info', '{"msg": "4"}'::jsonb, NULL, 0); send_topic ------------ 0 (1 row) -- Should have 2 messages (all ending with .error and having at least one segment before) SELECT COUNT(*) = 2 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 2 (1 row) DELETE FROM pgmq.topic_bindings; -- test_routing_catch_all -- Test catch-all pattern SELECT pgmq.bind_topic('#', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.send_topic('any', '{"msg": "1"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) SELECT pgmq.send_topic('any.pattern', '{"msg": "2"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) SELECT pgmq.send_topic('any.pattern.works', '{"msg": "3"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- Should have 3 messages (catch-all matches everything) SELECT COUNT(*) = 3 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 3 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_topic_with_headers -- Test sending topic messages with custom headers SELECT pgmq.bind_topic('orders.created', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.send_topic( 'orders.created', '{"order_id": 456}'::jsonb, '{"priority": "high", "source": "web"}'::jsonb, 0 ); send_topic ------------ 1 (1 row) -- Verify message was delivered with headers WITH msg AS ( SELECT * FROM pgmq.read('topic_queue_1', 10, 1) ) SELECT COUNT(*) = 1 as has_message, (SELECT message->>'order_id' FROM msg) = '456' as correct_body, (SELECT headers->>'priority' FROM msg) = 'high' as has_priority, (SELECT headers->>'source' FROM msg) = 'web' as has_source; has_message | correct_body | has_priority | has_source -------------+--------------+--------------+------------ t | t | t | t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 1 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_topic_with_delay -- Test sending topic messages with delay SELECT pgmq.bind_topic('delayed.message', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.send_topic( 'delayed.message', '{"msg": "delayed"}'::jsonb, 2 ); send_topic ------------ 1 (1 row) -- Should have 0 immediately readable messages (delayed) SELECT COUNT(*) = 0 FROM pgmq.read('topic_queue_1', 10, 1); ?column? ---------- t (1 row) -- Wait for delay to expire SELECT pg_sleep(3); pg_sleep ---------- (1 row) -- Should now have 1 message SELECT COUNT(*) = 1 FROM pgmq.read('topic_queue_1', 10, 1); ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 1 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_topic_overloaded_functions -- Test different function signatures SELECT pgmq.bind_topic('test.routing', 'topic_queue_1'); bind_topic ------------ (1 row) -- Basic: routing_key, message SELECT pgmq.send_topic('test.routing', '{"type": "basic"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- With headers: routing_key, message, headers, delay SELECT pgmq.send_topic('test.routing', '{"type": "headers"}'::jsonb, '{"priority": "low"}'::jsonb, 0); send_topic ------------ 1 (1 row) -- With delay: routing_key, message, delay SELECT pgmq.send_topic('test.routing', '{"type": "delayed"}'::jsonb, 0); send_topic ------------ 1 (1 row) -- With headers and delay: routing_key, message, headers, delay SELECT pgmq.send_topic('test.routing', '{"type": "full"}'::jsonb, '{"priority": "high"}'::jsonb, 0); send_topic ------------ 1 (1 row) -- Should have 4 messages total SELECT COUNT(*) = 4 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 4 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_topic_invalid_routing_key DO $$ BEGIN PERFORM pgmq.send_topic('invalid..key', '{"test": true}'::jsonb, NULL, 0); RAISE EXCEPTION 'Should have raised an error for invalid routing key'; EXCEPTION WHEN OTHERS THEN -- Expected error for consecutive dots END $$; -- test_send_topic_null_message DO $$ BEGIN PERFORM pgmq.send_topic('valid.key', NULL, NULL, 0); RAISE EXCEPTION 'Should have raised an error for NULL message'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_send_topic_negative_delay DO $$ BEGIN PERFORM pgmq.send_topic('valid.key', '{"test": true}'::jsonb, NULL, -1); RAISE EXCEPTION 'Should have raised an error for negative delay'; EXCEPTION WHEN OTHERS THEN -- Expected error END $$; -- test_send_topic_selective_routing -- Test selective routing based on patterns SELECT pgmq.bind_topic('payments.*.completed', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('payments.card.#', 'topic_queue_2'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('payments.#', 'topic_queue_3'); bind_topic ------------ (1 row) SELECT pgmq.send_topic('payments.card.completed', '{"payment_id": 1}'::jsonb, NULL, 0); send_topic ------------ 3 (1 row) SELECT pgmq.send_topic('payments.paypal.completed', '{"payment_id": 2}'::jsonb, NULL, 0); send_topic ------------ 2 (1 row) SELECT pgmq.send_topic('payments.card.declined', '{"payment_id": 3}'::jsonb, NULL, 0); send_topic ------------ 2 (1 row) -- topic_queue_1: payments.*.completed - should have 2 messages SELECT COUNT(*) = 2 FROM pgmq.q_topic_queue_1; ?column? ---------- t (1 row) -- topic_queue_2: payments.card.# - should have 2 messages SELECT COUNT(*) = 2 FROM pgmq.q_topic_queue_2; ?column? ---------- t (1 row) -- topic_queue_3: payments.# - should have 3 messages (catch-all) SELECT COUNT(*) = 3 FROM pgmq.q_topic_queue_3; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('topic_queue_1'); purge_queue ------------- 2 (1 row) SELECT pgmq.purge_queue('topic_queue_2'); purge_queue ------------- 2 (1 row) SELECT pgmq.purge_queue('topic_queue_3'); purge_queue ------------- 3 (1 row) DELETE FROM pgmq.topic_bindings; -- test_drop_queue_cascades_bindings -- Test that dropping a queue removes its bindings SELECT pgmq.bind_topic('test.pattern.1', 'topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('test.pattern.2', 'topic_queue_1'); bind_topic ------------ (1 row) -- Verify bindings exist SELECT COUNT(*) = 2 FROM pgmq.topic_bindings WHERE queue_name = 'topic_queue_1'; ?column? ---------- t (1 row) -- Drop the queue SELECT pgmq.drop_queue('topic_queue_1'); drop_queue ------------ t (1 row) -- Bindings should be removed due to CASCADE SELECT COUNT(*) = 0 FROM pgmq.topic_bindings WHERE queue_name = 'topic_queue_1'; ?column? ---------- t (1 row) -- test_validate_routing_key_valid_cases -- Test valid routing keys SELECT pgmq.create('validation_queue'); create -------- (1 row) SELECT pgmq.bind_topic('test.#', 'validation_queue'); bind_topic ------------ (1 row) -- Valid routing keys SELECT pgmq.send_topic('simple.key', '{"test": 1}'::jsonb, NULL, 0) IS NOT NULL; ?column? ---------- t (1 row) SELECT pgmq.send_topic('key-with-hyphens', '{"test": 2}'::jsonb, NULL, 0) IS NOT NULL; ?column? ---------- t (1 row) SELECT pgmq.send_topic('key_with_underscores', '{"test": 3}'::jsonb, NULL, 0) IS NOT NULL; ?column? ---------- t (1 row) SELECT pgmq.send_topic('single', '{"test": 4}'::jsonb, NULL, 0) IS NOT NULL; ?column? ---------- t (1 row) -- Clean up validation tests SELECT pgmq.drop_queue('validation_queue'); drop_queue ------------ t (1 row) DELETE FROM pgmq.topic_bindings; -- ============================================================================= -- Tests for send_batch_topic() -- ============================================================================= -- test_send_batch_topic_basic -- Test basic batch sending to topics SELECT pgmq.create('batch_topic_queue_1'); create -------- (1 row) SELECT pgmq.create('batch_topic_queue_2'); create -------- (1 row) SELECT pgmq.bind_topic('batch.test.*', 'batch_topic_queue_1'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('batch.#', 'batch_topic_queue_2'); bind_topic ------------ (1 row) -- Send a batch of 3 messages SELECT queue_name, msg_id FROM pgmq.send_batch_topic( 'batch.test.messages', ARRAY['{"id": 1}'::jsonb, '{"id": 2}'::jsonb, '{"id": 3}'::jsonb] ) ORDER BY queue_name, msg_id; queue_name | msg_id ---------------------+-------- batch_topic_queue_1 | 1 batch_topic_queue_1 | 2 batch_topic_queue_1 | 3 batch_topic_queue_2 | 1 batch_topic_queue_2 | 2 batch_topic_queue_2 | 3 (6 rows) -- Should have 3 messages in each queue SELECT COUNT(*) = 3 FROM pgmq.q_batch_topic_queue_1; ?column? ---------- t (1 row) SELECT COUNT(*) = 3 FROM pgmq.q_batch_topic_queue_2; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('batch_topic_queue_1'); purge_queue ------------- 3 (1 row) SELECT pgmq.purge_queue('batch_topic_queue_2'); purge_queue ------------- 3 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_batch_topic_with_headers -- Test batch sending with headers SELECT pgmq.bind_topic('headers.test', 'batch_topic_queue_1'); bind_topic ------------ (1 row) SELECT queue_name, msg_id FROM pgmq.send_batch_topic( 'headers.test', ARRAY['{"msg": 1}'::jsonb, '{"msg": 2}'::jsonb]::jsonb[], ARRAY['{"header": "A"}'::jsonb, '{"header": "B"}'::jsonb]::jsonb[] ) ORDER BY queue_name, msg_id; queue_name | msg_id ---------------------+-------- batch_topic_queue_1 | 4 batch_topic_queue_1 | 5 (2 rows) -- Verify messages and headers SELECT message, headers FROM pgmq.q_batch_topic_queue_1 ORDER BY msg_id; message | headers ------------+----------------- {"msg": 1} | {"header": "A"} {"msg": 2} | {"header": "B"} (2 rows) -- Clean up SELECT pgmq.purge_queue('batch_topic_queue_1'); purge_queue ------------- 2 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_batch_topic_with_delay -- Test batch sending with delay SELECT pgmq.bind_topic('delay.test', 'batch_topic_queue_1'); bind_topic ------------ (1 row) SELECT queue_name, msg_id FROM pgmq.send_batch_topic( 'delay.test', ARRAY['{"delayed": true}'::jsonb]::jsonb[], 5 ) ORDER BY queue_name, msg_id; queue_name | msg_id ---------------------+-------- batch_topic_queue_1 | 6 (1 row) -- Message should be invisible due to delay SELECT COUNT(*) = 0 FROM pgmq.read('batch_topic_queue_1', 1, 1); ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.q_batch_topic_queue_1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('batch_topic_queue_1'); purge_queue ------------- 1 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_batch_topic_with_timestamp_delay -- Test batch sending with timestamp delay SELECT pgmq.bind_topic('timestamp_delay.test', 'batch_topic_queue_1'); bind_topic ------------ (1 row) SELECT queue_name, msg_id FROM pgmq.send_batch_topic( 'timestamp_delay.test', ARRAY['{"delayed_timestamp": true}'::jsonb]::jsonb[], clock_timestamp() + interval '3 seconds' ) ORDER BY queue_name, msg_id; queue_name | msg_id ---------------------+-------- batch_topic_queue_1 | 7 (1 row) -- Message should be invisible due to delay SELECT COUNT(*) = 0 FROM pgmq.read('batch_topic_queue_1', 1, 1); ?column? ---------- t (1 row) SELECT COUNT(*) = 1 FROM pgmq.q_batch_topic_queue_1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('batch_topic_queue_1'); purge_queue ------------- 1 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_batch_topic_no_matches -- Test sending to routing key with no bindings SELECT queue_name, msg_id FROM pgmq.send_batch_topic( 'no.matches.here', ARRAY['{"test": 1}'::jsonb] ); queue_name | msg_id ------------+-------- (0 rows) -- Should have 0 messages in queues SELECT COUNT(*) = 0 FROM pgmq.q_batch_topic_queue_1; ?column? ---------- t (1 row) SELECT COUNT(*) = 0 FROM pgmq.q_batch_topic_queue_2; ?column? ---------- t (1 row) -- test_send_batch_topic_single_queue -- Test sending to a single queue (edge case between 0 and multiple) SELECT pgmq.bind_topic('single.queue.test', 'batch_topic_queue_1'); bind_topic ------------ (1 row) SELECT queue_name, COUNT(*) as msg_count FROM pgmq.send_batch_topic( 'single.queue.test', ARRAY['{"a": 1}'::jsonb, '{"b": 2}'::jsonb] ) GROUP BY queue_name; queue_name | msg_count ---------------------+----------- batch_topic_queue_1 | 2 (1 row) -- Clean up SELECT pgmq.purge_queue('batch_topic_queue_1'); purge_queue ------------- 2 (1 row) DELETE FROM pgmq.topic_bindings; -- test_send_batch_topic_null_validation -- Test NULL validation SELECT pgmq.bind_topic('validation.test', 'batch_topic_queue_1'); bind_topic ------------ (1 row) -- Use terse verbosity to make error output consistent across PG versions \set VERBOSITY terse -- Should fail: NULL messages array \set ON_ERROR_STOP 0 SELECT pgmq.send_batch_topic('validation.test', NULL); ERROR: msgs cannot be NULL or empty \set ON_ERROR_STOP 1 -- Should fail: empty messages array \set ON_ERROR_STOP 0 SELECT pgmq.send_batch_topic('validation.test', ARRAY[]::jsonb[]); ERROR: msgs cannot be NULL or empty \set ON_ERROR_STOP 1 -- Should fail: invalid routing key \set ON_ERROR_STOP 0 SELECT pgmq.send_batch_topic('invalid..key', ARRAY['{"test": 1}'::jsonb]); ERROR: routing_key cannot contain consecutive dots. Got: invalid..key \set ON_ERROR_STOP 1 -- Should fail: headers array length greater than msgs array length \set ON_ERROR_STOP 0 SELECT pgmq.send_batch_topic('validation.test', ARRAY['{"test": 1}'::jsonb], ARRAY['{"h": 1}'::jsonb, '{"h": 2}'::jsonb]); ERROR: headers array length (2) must match msgs array length (1) \set ON_ERROR_STOP 1 -- Should fail: headers array length less than msgs array length \set ON_ERROR_STOP 0 SELECT pgmq.send_batch_topic('validation.test', ARRAY['{"test": 1}'::jsonb, '{"test": 2}'::jsonb], ARRAY['{"h": 1}'::jsonb]); ERROR: headers array length (1) must match msgs array length (2) \set ON_ERROR_STOP 1 -- Should fail: empty headers array with non-empty msgs array \set ON_ERROR_STOP 0 SELECT pgmq.send_batch_topic('validation.test', ARRAY['{"test": 1}'::jsonb], ARRAY[]::jsonb[]); ERROR: headers array length (0) must match msgs array length (1) \set ON_ERROR_STOP 1 -- Restore default verbosity \set VERBOSITY default -- Clean up DELETE FROM pgmq.topic_bindings; SELECT pgmq.drop_queue('batch_topic_queue_1'); drop_queue ------------ t (1 row) SELECT pgmq.drop_queue('batch_topic_queue_2'); drop_queue ------------ t (1 row) -- ============================================================================= -- Regression test for duplicate queue prevention (issue #455) -- ============================================================================= -- test_no_duplicate_sends_when_multiple_patterns_match_same_queue -- Verify that when multiple patterns bind to the same queue and both match -- a routing key, the message is only sent once to that queue SELECT pgmq.create('dedup_test_queue'); create -------- (1 row) -- Bind two different patterns to the same queue, both will match the same routing key SELECT pgmq.bind_topic('logs.*', 'dedup_test_queue'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('logs.#', 'dedup_test_queue'); bind_topic ------------ (1 row) -- Both patterns match 'logs.error', but message should only be sent once SELECT pgmq.send_topic('logs.error', '{"msg": "test"}'::jsonb, NULL, 0); send_topic ------------ 1 (1 row) -- Should have exactly 1 message (not 2) SELECT COUNT(*) = 1 FROM pgmq.q_dedup_test_queue; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.purge_queue('dedup_test_queue'); purge_queue ------------- 1 (1 row) -- test_no_duplicate_batch_sends_when_multiple_patterns_match_same_queue -- Same test but for send_batch_topic SELECT pgmq.bind_topic('batch.test.*', 'dedup_test_queue'); bind_topic ------------ (1 row) SELECT pgmq.bind_topic('batch.test.#', 'dedup_test_queue'); bind_topic ------------ (1 row) -- Both patterns match 'batch.test.msg', but batch should only be sent once SELECT queue_name, COUNT(*) as msg_count FROM pgmq.send_batch_topic( 'batch.test.msg', ARRAY['{"id": 1}'::jsonb, '{"id": 2}'::jsonb, '{"id": 3}'::jsonb] ) GROUP BY queue_name; queue_name | msg_count ------------------+----------- dedup_test_queue | 3 (1 row) -- Should have exactly 3 messages (not 6) SELECT COUNT(*) = 3 FROM pgmq.q_dedup_test_queue; ?column? ---------- t (1 row) -- Verify matched_count is correct for send_topic with deduplication SELECT pgmq.purge_queue('dedup_test_queue'); purge_queue ------------- 3 (1 row) SELECT pgmq.send_topic('batch.test.msg', '{"msg": "count test"}'::jsonb, NULL, 0) = 1; ?column? ---------- t (1 row) -- Clean up SELECT pgmq.drop_queue('dedup_test_queue'); drop_queue ------------ t (1 row) DELETE FROM pgmq.topic_bindings; -- Clean up all test queues SELECT pgmq.drop_queue('topic_queue_2'); drop_queue ------------ t (1 row) SELECT pgmq.drop_queue('topic_queue_3'); drop_queue ------------ t (1 row) -- Verify all queues were dropped SELECT COUNT(*) = 0 FROM pgmq.list_queues() WHERE queue_name IN ('topic_queue_1', 'topic_queue_2', 'topic_queue_3', 'validation_queue'); ?column? ---------- t (1 row)