-- 23_amqp_config.sql: AMQP endpoint configuration validation -- pg_regress test for ulak -- NOTE: Only runs when built with ENABLE_AMQP=1 -- ============================================================================ -- VALID AMQP CONFIGS -- ============================================================================ -- Minimal valid AMQP config SELECT ulak.create_endpoint( 'amqp_minimal', 'amqp', '{"host": "localhost", "exchange": "events", "routing_key": "orders"}'::jsonb ) IS NOT NULL AS minimal_created; INFO: [ulak] Created endpoint with ID 61 minimal_created ----------------- t (1 row) -- Default exchange (empty string) SELECT ulak.create_endpoint( 'amqp_default_exchange', 'amqp', '{"host": "localhost", "exchange": "", "routing_key": "my-queue"}'::jsonb ) IS NOT NULL AS default_exchange_created; INFO: [ulak] Created endpoint with ID 62 default_exchange_created -------------------------- t (1 row) -- Full config SELECT ulak.create_endpoint( 'amqp_full', 'amqp', '{"host": "rabbitmq", "port": 5672, "vhost": "/", "exchange": "events", "exchange_type": "topic", "routing_key": "orders.created", "username": "user", "password": "pass", "persistent": true, "mandatory": false}'::jsonb ) IS NOT NULL AS full_created; INFO: [ulak] Created endpoint with ID 63 full_created -------------- t (1 row) -- Verify configs stored correctly SELECT name, protocol, config->>'host' AS host, config->>'exchange' AS exchange FROM ulak.endpoints WHERE name LIKE 'amqp_%' ORDER BY name; name | protocol | host | exchange -----------------------+----------+-----------+---------- amqp_default_exchange | amqp | localhost | amqp_full | amqp | rabbitmq | events amqp_minimal | amqp | localhost | events (3 rows) -- ============================================================================ -- VALIDATE_ENDPOINT_CONFIG FOR AMQP -- ============================================================================ -- Valid config SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "exchange": "events", "routing_key": "test"}'::jsonb ) AS valid_minimal; valid_minimal --------------- t (1 row) -- Valid exchange types SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "exchange": "e", "routing_key": "r", "exchange_type": "direct"}'::jsonb ) AS valid_direct; valid_direct -------------- t (1 row) SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "exchange": "e", "routing_key": "r", "exchange_type": "fanout"}'::jsonb ) AS valid_fanout; valid_fanout -------------- t (1 row) SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "exchange": "e", "routing_key": "r", "exchange_type": "topic"}'::jsonb ) AS valid_topic; valid_topic ------------- t (1 row) -- ============================================================================ -- INVALID AMQP CONFIGS -- ============================================================================ -- Missing host SELECT ulak.validate_endpoint_config( 'amqp', '{"exchange": "events", "routing_key": "test"}'::jsonb ) AS missing_host; WARNING: [ulak] ERROR: AMQP config missing or invalid 'host' field missing_host -------------- f (1 row) -- Missing exchange SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "routing_key": "test"}'::jsonb ) AS missing_exchange; WARNING: [ulak] ERROR: AMQP config missing or invalid 'exchange' field missing_exchange ------------------ f (1 row) -- Missing routing_key SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "exchange": "events"}'::jsonb ) AS missing_routing_key; WARNING: [ulak] ERROR: AMQP config missing or invalid 'routing_key' field missing_routing_key --------------------- f (1 row) -- Invalid exchange_type SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "exchange": "e", "routing_key": "r", "exchange_type": "invalid"}'::jsonb ) AS invalid_exchange_type; WARNING: [ulak] ERROR: AMQP config 'exchange_type' must be one of: direct, fanout, topic, headers invalid_exchange_type ----------------------- f (1 row) -- Unknown config key SELECT ulak.validate_endpoint_config( 'amqp', '{"host": "localhost", "exchange": "e", "routing_key": "r", "unknown": "value"}'::jsonb ) AS unknown_key; WARNING: [ulak] ERROR: AMQP config contains unknown key 'unknown'. Allowed keys: host, port, vhost, username, password, exchange, routing_key, exchange_type, persistent, mandatory, heartbeat, frame_max, tls, tls_ca_cert, tls_cert, tls_key, tls_verify_peer unknown_key ------------- f (1 row) -- ============================================================================ -- AMQP ENDPOINT CRUD -- ============================================================================ SELECT ulak.alter_endpoint( 'amqp_minimal', '{"host": "new-host", "exchange": "new-exchange", "routing_key": "new-key"}'::jsonb ) AS altered; INFO: [ulak] Altered endpoint 'amqp_minimal' altered --------- t (1 row) SELECT config->>'host' AS new_host, config->>'exchange' AS new_exchange FROM ulak.endpoints WHERE name = 'amqp_minimal'; new_host | new_exchange ----------+-------------- new-host | new-exchange (1 row) SELECT ulak.disable_endpoint('amqp_minimal') AS disabled; disabled ---------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'amqp_minimal'; enabled --------- f (1 row) SELECT ulak.enable_endpoint('amqp_minimal') AS enabled; enabled --------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'amqp_minimal'; enabled --------- t (1 row) -- ============================================================================ -- AMQP MESSAGE SEND (queue insertion) -- ============================================================================ SELECT ulak.send('amqp_full', '{"event": "order.created"}'::jsonb) AS sent; sent ------ t (1 row) SELECT count(*) AS amqp_queue_count FROM ulak.queue q JOIN ulak.endpoints e ON q.endpoint_id = e.id WHERE e.protocol = 'amqp'; amqp_queue_count ------------------ 1 (1 row) -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id IN ( SELECT id FROM ulak.endpoints WHERE name LIKE 'amqp_%' ); SELECT ulak.drop_endpoint('amqp_minimal'); INFO: [ulak] Dropped endpoint 'amqp_minimal' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('amqp_default_exchange'); INFO: [ulak] Dropped endpoint 'amqp_default_exchange' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('amqp_full'); INFO: [ulak] Dropped endpoint 'amqp_full' drop_endpoint --------------- t (1 row)