-- 20_kafka_config.sql: Kafka endpoint configuration validation -- pg_regress test for ulak -- NOTE: Only runs when built with ENABLE_KAFKA=1 -- ============================================================================ -- VALID KAFKA CONFIGS -- ============================================================================ -- Minimal valid Kafka config (broker + topic required) SELECT ulak.create_endpoint( 'kafka_minimal', 'kafka', '{"broker": "localhost:9092", "topic": "test-topic"}'::jsonb ) IS NOT NULL AS minimal_created; INFO: [ulak] Created endpoint with ID 51 minimal_created ----------------- t (1 row) -- Kafka config with all optional fields SELECT ulak.create_endpoint( 'kafka_full', 'kafka', '{"broker": "broker1:9092,broker2:9092", "topic": "events", "key": "static-key", "partition": 0, "headers": {"ce_type": "test.event", "x-source": "ulak"}}'::jsonb ) IS NOT NULL AS full_created; INFO: [ulak] Created endpoint with ID 52 full_created -------------- t (1 row) -- Kafka config with librdkafka options SELECT ulak.create_endpoint( 'kafka_options', 'kafka', '{"broker": "localhost:9092", "topic": "opts-topic", "options": {"compression.codec": "lz4", "linger.ms": "10", "batch.num.messages": "1000"}}'::jsonb ) IS NOT NULL AS options_created; INFO: [ulak] Created endpoint with ID 53 options_created ----------------- t (1 row) -- Verify configs stored correctly SELECT name, protocol, config->>'broker' AS broker, config->>'topic' AS topic FROM ulak.endpoints WHERE name LIKE 'kafka_%' ORDER BY name; name | protocol | broker | topic ---------------+----------+---------------------------+------------ kafka_full | kafka | broker1:9092,broker2:9092 | events kafka_minimal | kafka | localhost:9092 | test-topic kafka_options | kafka | localhost:9092 | opts-topic (3 rows) -- ============================================================================ -- VALIDATE_ENDPOINT_CONFIG FOR KAFKA -- ============================================================================ -- Valid Kafka config passes validation SELECT ulak.validate_endpoint_config( 'kafka', '{"broker": "localhost:9092", "topic": "test"}'::jsonb ) AS valid_minimal; valid_minimal --------------- t (1 row) -- Multiple brokers SELECT ulak.validate_endpoint_config( 'kafka', '{"broker": "b1:9092,b2:9092,b3:9092", "topic": "multi-broker"}'::jsonb ) AS valid_multi_broker; valid_multi_broker -------------------- t (1 row) -- With partition and key SELECT ulak.validate_endpoint_config( 'kafka', '{"broker": "localhost:9092", "topic": "keyed", "key": "my-key", "partition": 2}'::jsonb ) AS valid_with_key; valid_with_key ---------------- t (1 row) -- ============================================================================ -- INVALID KAFKA CONFIGS -- ============================================================================ -- Missing broker SELECT ulak.validate_endpoint_config( 'kafka', '{"topic": "no-broker"}'::jsonb ) AS missing_broker; missing_broker ---------------- f (1 row) -- Missing topic SELECT ulak.validate_endpoint_config( 'kafka', '{"broker": "localhost:9092"}'::jsonb ) AS missing_topic; missing_topic --------------- f (1 row) -- Empty broker SELECT ulak.validate_endpoint_config( 'kafka', '{"broker": "", "topic": "test"}'::jsonb ) AS empty_broker; empty_broker -------------- f (1 row) -- Empty topic SELECT ulak.validate_endpoint_config( 'kafka', '{"broker": "localhost:9092", "topic": ""}'::jsonb ) AS empty_topic; empty_topic ------------- f (1 row) -- Unknown config key (strict validation) SELECT ulak.validate_endpoint_config( 'kafka', '{"broker": "localhost:9092", "topic": "test", "unknown_key": "value"}'::jsonb ) AS unknown_key; WARNING: [ulak] ERROR: Kafka config contains unknown key 'unknown_key'. Allowed keys: broker, topic, options, key, partition unknown_key ------------- f (1 row) -- ============================================================================ -- KAFKA ENDPOINT CRUD OPERATIONS -- ============================================================================ -- Alter endpoint config SELECT ulak.alter_endpoint( 'kafka_minimal', '{"broker": "new-broker:9092", "topic": "new-topic"}'::jsonb ) AS altered; INFO: [ulak] Altered endpoint 'kafka_minimal' altered --------- t (1 row) -- Verify alter took effect SELECT config->>'broker' AS new_broker, config->>'topic' AS new_topic FROM ulak.endpoints WHERE name = 'kafka_minimal'; new_broker | new_topic -----------------+----------- new-broker:9092 | new-topic (1 row) -- Enable/disable SELECT ulak.disable_endpoint('kafka_minimal') AS disabled; disabled ---------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'kafka_minimal'; enabled --------- f (1 row) SELECT ulak.enable_endpoint('kafka_minimal') AS enabled; enabled --------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'kafka_minimal'; enabled --------- t (1 row) -- ============================================================================ -- KAFKA MESSAGE SEND (queue insertion only — actual dispatch needs running broker) -- ============================================================================ -- Send single message to Kafka endpoint SELECT ulak.send('kafka_full', '{"event": "order.created", "order_id": 42}'::jsonb) AS sent; sent ------ t (1 row) -- Send with options (priority, ordering key) SELECT ulak.send_with_options( 'kafka_full', '{"event": "order.updated", "order_id": 42}'::jsonb, p_priority := 5::smallint, p_ordering_key := 'order-42' ) IS NOT NULL AS sent_with_options; sent_with_options ------------------- t (1 row) -- Verify messages in queue SELECT count(*) AS kafka_queue_count FROM ulak.queue q JOIN ulak.endpoints e ON q.endpoint_id = e.id WHERE e.protocol = 'kafka'; kafka_queue_count ------------------- 2 (1 row) -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id IN ( SELECT id FROM ulak.endpoints WHERE name LIKE 'kafka_%' ); SELECT ulak.drop_endpoint('kafka_minimal'); INFO: [ulak] Dropped endpoint 'kafka_minimal' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('kafka_full'); INFO: [ulak] Dropped endpoint 'kafka_full' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('kafka_options'); INFO: [ulak] Dropped endpoint 'kafka_options' drop_endpoint --------------- t (1 row)