-- 12_publish.sql: Fan-out publishing tests -- pg_regress test for ulak pub/sub -- ============================================================================ -- SETUP -- ============================================================================ SELECT ulak.create_event_type('evt.alpha') IS NOT NULL AS et_alpha; et_alpha ---------- t (1 row) SELECT ulak.create_event_type('evt.beta') IS NOT NULL AS et_beta; et_beta --------- t (1 row) SELECT ulak.create_event_type('evt.empty') IS NOT NULL AS et_empty; et_empty ---------- t (1 row) SELECT ulak.create_endpoint('pub_ep_1', 'http', '{"url": "http://localhost:9999/1"}'::jsonb) IS NOT NULL AS ep1; INFO: [ulak] Created endpoint with ID 23 ep1 ----- t (1 row) SELECT ulak.create_endpoint('pub_ep_2', 'http', '{"url": "http://localhost:9999/2"}'::jsonb) IS NOT NULL AS ep2; INFO: [ulak] Created endpoint with ID 24 ep2 ----- t (1 row) SELECT ulak.create_endpoint('pub_ep_3', 'http', '{"url": "http://localhost:9999/3"}'::jsonb) IS NOT NULL AS ep3; INFO: [ulak] Created endpoint with ID 25 ep3 ----- t (1 row) -- Subscribe: alpha -> ep1, ep2 ; beta -> ep3 SELECT ulak.subscribe('evt.alpha', 'pub_ep_1') IS NOT NULL AS sub1; sub1 ------ t (1 row) SELECT ulak.subscribe('evt.alpha', 'pub_ep_2') IS NOT NULL AS sub2; sub2 ------ t (1 row) SELECT ulak.subscribe('evt.beta', 'pub_ep_3') IS NOT NULL AS sub3; sub3 ------ t (1 row) -- ============================================================================ -- PUBLISH: FAN-OUT TO 2 ENDPOINTS -- ============================================================================ DELETE FROM ulak.queue; SELECT ulak.publish('evt.alpha', '{"msg": "hello"}') AS fanout_2; fanout_2 ---------- 2 (1 row) SELECT e.name AS endpoint, q.status FROM ulak.queue q JOIN ulak.endpoints e ON e.id = q.endpoint_id ORDER BY e.name; endpoint | status ----------+--------- pub_ep_1 | pending pub_ep_2 | pending (2 rows) -- ============================================================================ -- PUBLISH: FAN-OUT TO 1 ENDPOINT -- ============================================================================ DELETE FROM ulak.queue; SELECT ulak.publish('evt.beta', '{"msg": "shipped"}') AS fanout_1; fanout_1 ---------- 1 (1 row) SELECT e.name AS endpoint FROM ulak.queue q JOIN ulak.endpoints e ON e.id = q.endpoint_id ORDER BY e.name; endpoint ---------- pub_ep_3 (1 row) -- ============================================================================ -- PUBLISH: 0 SUBSCRIBERS RETURNS 0 -- ============================================================================ SELECT ulak.publish('evt.empty', '{"msg": "nobody"}') AS fanout_0; fanout_0 ---------- 0 (1 row) -- ============================================================================ -- PUBLISH: NON-EXISTENT EVENT TYPE RAISES ERROR -- ============================================================================ DO $$ BEGIN PERFORM ulak.publish('nonexistent.event', '{"x": 1}'); EXCEPTION WHEN OTHERS THEN RAISE NOTICE 'publish to nonexistent event type rejected'; END $$; NOTICE: publish to nonexistent event type rejected -- ============================================================================ -- PUBLISH: DISABLED SUBSCRIPTION SKIPPED -- ============================================================================ DELETE FROM ulak.queue; UPDATE ulak.subscriptions SET enabled = false WHERE endpoint_id = (SELECT id FROM ulak.endpoints WHERE name = 'pub_ep_2'); SELECT ulak.publish('evt.alpha', '{"msg": "skip_disabled_sub"}') AS fanout_1; fanout_1 ---------- 1 (1 row) SELECT e.name AS endpoint FROM ulak.queue q JOIN ulak.endpoints e ON e.id = q.endpoint_id ORDER BY e.name; endpoint ---------- pub_ep_1 (1 row) -- Re-enable UPDATE ulak.subscriptions SET enabled = true WHERE endpoint_id = (SELECT id FROM ulak.endpoints WHERE name = 'pub_ep_2'); -- ============================================================================ -- PUBLISH: DISABLED ENDPOINT SKIPPED -- ============================================================================ DELETE FROM ulak.queue; UPDATE ulak.endpoints SET enabled = false WHERE name = 'pub_ep_1'; SELECT ulak.publish('evt.alpha', '{"msg": "skip_disabled_ep"}') AS fanout_1; fanout_1 ---------- 1 (1 row) SELECT e.name AS endpoint FROM ulak.queue q JOIN ulak.endpoints e ON e.id = q.endpoint_id ORDER BY e.name; endpoint ---------- pub_ep_2 (1 row) -- Re-enable UPDATE ulak.endpoints SET enabled = true WHERE name = 'pub_ep_1'; -- ============================================================================ -- PUBLISH: OPEN CIRCUIT BREAKER DOES NOT DROP FAN-OUT -- ============================================================================ DELETE FROM ulak.queue; UPDATE ulak.endpoints SET circuit_state = 'open' WHERE name = 'pub_ep_1'; SELECT ulak.publish('evt.alpha', '{"msg": "queue_open_cb"}') AS fanout_2; fanout_2 ---------- 2 (1 row) SELECT e.name AS endpoint FROM ulak.queue q JOIN ulak.endpoints e ON e.id = q.endpoint_id ORDER BY e.name; endpoint ---------- pub_ep_1 pub_ep_2 (2 rows) -- Reset circuit UPDATE ulak.endpoints SET circuit_state = 'closed' WHERE name = 'pub_ep_1'; -- ============================================================================ -- PUBLISH: JSONB FILTER (CONTAINMENT) -- ============================================================================ DELETE FROM ulak.queue; -- Add filtered subscription: ep3 also subscribes to alpha, but only VIP SELECT ulak.subscribe('evt.alpha', 'pub_ep_3', '{"vip": true}'::jsonb) IS NOT NULL AS filtered; filtered ---------- t (1 row) -- Matching payload: ep1 + ep2 (no filter) + ep3 (filter matches) = 3 SELECT ulak.publish('evt.alpha', '{"id": 1, "vip": true}') AS fanout_3; fanout_3 ---------- 3 (1 row) DELETE FROM ulak.queue; -- Non-matching payload: ep1 + ep2 = 2 (ep3 filtered out) SELECT ulak.publish('evt.alpha', '{"id": 2, "vip": false}') AS fanout_2; fanout_2 ---------- 2 (1 row) DELETE FROM ulak.queue; -- Missing key: ep1 + ep2 = 2 (ep3 filtered out, no "vip" key) SELECT ulak.publish('evt.alpha', '{"id": 3}') AS fanout_2_nokey; fanout_2_nokey ---------------- 2 (1 row) -- Remove filtered subscription for clean state DELETE FROM ulak.subscriptions WHERE endpoint_id = (SELECT id FROM ulak.endpoints WHERE name = 'pub_ep_3') AND event_type_id = (SELECT id FROM ulak.event_types WHERE name = 'evt.alpha'); -- ============================================================================ -- PUBLISH: ATOMICITY (all messages same created_at) -- ============================================================================ DELETE FROM ulak.queue; SELECT ulak.publish('evt.alpha', '{"atomic": true}') AS fanout_2_atomic; fanout_2_atomic ----------------- 2 (1 row) -- All messages should have the same created_at (atomic INSERT...SELECT) SELECT count(DISTINCT created_at) AS distinct_timestamps FROM ulak.queue; distinct_timestamps --------------------- 1 (1 row) -- ============================================================================ -- PUBLISH_BATCH: MULTIPLE EVENTS -- ============================================================================ DELETE FROM ulak.queue; SELECT ulak.publish_batch('[ {"event_type": "evt.alpha", "payload": {"batch": 1}}, {"event_type": "evt.alpha", "payload": {"batch": 2}}, {"event_type": "evt.beta", "payload": {"batch": 3}} ]'::jsonb) AS batch_total; batch_total ------------- 5 (1 row) -- alpha×2 = 2+2 = 4, beta×1 = 1, total = 5 SELECT count(*) AS batch_queue_count FROM ulak.queue WHERE status = 'pending'; batch_queue_count ------------------- 5 (1 row) -- Open circuit endpoints must still receive queued fan-out rows DELETE FROM ulak.queue; UPDATE ulak.endpoints SET circuit_state = 'open' WHERE name = 'pub_ep_1'; SELECT ulak.publish_batch('[ {"event_type": "evt.alpha", "payload": {"batch": "open_cb"}} ]'::jsonb) AS batch_open_circuit_total; batch_open_circuit_total -------------------------- 2 (1 row) SELECT e.name AS endpoint FROM ulak.queue q JOIN ulak.endpoints e ON e.id = q.endpoint_id ORDER BY e.name; endpoint ---------- pub_ep_1 pub_ep_2 (2 rows) UPDATE ulak.endpoints SET circuit_state = 'closed' WHERE name = 'pub_ep_1'; -- ============================================================================ -- PUBLISH_BATCH: NON-EXISTENT EVENT TYPE ABORTS ENTIRE BATCH -- ============================================================================ DELETE FROM ulak.queue; DO $$ BEGIN PERFORM ulak.publish_batch('[ {"event_type": "evt.alpha", "payload": {"ok": true}}, {"event_type": "nonexistent", "payload": {"fail": true}} ]'::jsonb); EXCEPTION WHEN OTHERS THEN RAISE NOTICE 'batch aborted on nonexistent event type'; END $$; NOTICE: batch aborted on nonexistent event type -- Queue should be empty (entire batch rolled back) SELECT count(*) AS should_be_0 FROM ulak.queue; should_be_0 ------------- 0 (1 row) -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue; DELETE FROM ulak.subscriptions; DELETE FROM ulak.event_types; SELECT ulak.drop_endpoint('pub_ep_1'); INFO: [ulak] Dropped endpoint 'pub_ep_1' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('pub_ep_2'); INFO: [ulak] Dropped endpoint 'pub_ep_2' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('pub_ep_3'); INFO: [ulak] Dropped endpoint 'pub_ep_3' drop_endpoint --------------- t (1 row)