-- 24_nats_config.sql: NATS endpoint configuration validation -- pg_regress test for ulak -- NOTE: Only runs when built with ENABLE_NATS=1 -- ============================================================================ -- VALID NATS CONFIGS -- ============================================================================ -- Minimal valid NATS config (JetStream default) SELECT ulak.create_endpoint( 'nats_minimal', 'nats', '{"url": "nats://localhost:4222", "subject": "orders.created"}'::jsonb ) IS NOT NULL AS minimal_created; INFO: [ulak] Created endpoint with ID 64 minimal_created ----------------- t (1 row) -- Core NATS (fire-and-forget) SELECT ulak.create_endpoint( 'nats_core', 'nats', '{"url": "nats://localhost:4222", "subject": "events.raw", "jetstream": false}'::jsonb ) IS NOT NULL AS core_created; INFO: [ulak] Created endpoint with ID 65 core_created -------------- t (1 row) -- Full JetStream config SELECT ulak.create_endpoint( 'nats_full', 'nats', '{"url": "nats://nats:4222", "subject": "orders.new", "jetstream": true, "stream": "ORDERS", "username": "user", "password": "pass", "headers": {"X-Source": "ulak", "X-Version": "1"}}'::jsonb ) IS NOT NULL AS full_created; INFO: [ulak] Created endpoint with ID 66 full_created -------------- t (1 row) -- Multi-server URL SELECT ulak.create_endpoint( 'nats_cluster', 'nats', '{"url": "nats://s1:4222,nats://s2:4222,nats://s3:4222", "subject": "cluster.test"}'::jsonb ) IS NOT NULL AS cluster_created; INFO: [ulak] Created endpoint with ID 67 cluster_created ----------------- t (1 row) -- Verify configs stored correctly SELECT name, protocol, config->>'url' AS url, config->>'subject' AS subject FROM ulak.endpoints WHERE name LIKE 'nats_%' ORDER BY name; name | protocol | url | subject --------------+----------+----------------------------------------------+---------------- nats_cluster | nats | nats://s1:4222,nats://s2:4222,nats://s3:4222 | cluster.test nats_core | nats | nats://localhost:4222 | events.raw nats_full | nats | nats://nats:4222 | orders.new nats_minimal | nats | nats://localhost:4222 | orders.created (4 rows) -- ============================================================================ -- VALIDATE_ENDPOINT_CONFIG FOR NATS -- ============================================================================ -- Valid minimal SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test"}'::jsonb ) AS valid_minimal; valid_minimal --------------- t (1 row) -- Valid with all options SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "jetstream": true, "stream": "TEST", "token": "secret", "tls": true, "headers": {"X-Key": "val"}, "options": {"max_reconnect": "60"}}'::jsonb ) AS valid_full; valid_full ------------ t (1 row) -- Valid core NATS SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "jetstream": false}'::jsonb ) AS valid_core; valid_core ------------ t (1 row) -- ============================================================================ -- INVALID NATS CONFIGS -- ============================================================================ -- Subject with token wildcard (invalid for publish) SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "orders.*.created"}'::jsonb ) AS wildcard_star; WARNING: [ulak] ERROR: NATS 'subject' must not contain wildcard characters ('*' or '>') wildcard_star --------------- f (1 row) -- Subject with full wildcard (invalid for publish) SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "orders.>"}'::jsonb ) AS wildcard_gt; WARNING: [ulak] ERROR: NATS 'subject' must not contain wildcard characters ('*' or '>') wildcard_gt ------------- f (1 row) -- Credentials file with directory traversal SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "credentials_file": "/tmp/../etc/passwd"}'::jsonb ) AS creds_traversal; WARNING: [ulak] ERROR: NATS 'credentials_file' must not contain '..' creds_traversal ----------------- f (1 row) -- Credentials file with relative path SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "credentials_file": "my.creds"}'::jsonb ) AS creds_relative; WARNING: [ulak] ERROR: NATS 'credentials_file' must be an absolute path creds_relative ---------------- f (1 row) -- Credentials file with wrong extension SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "credentials_file": "/etc/passwd"}'::jsonb ) AS creds_wrong_ext; WARNING: [ulak] ERROR: NATS 'credentials_file' must have .creds extension creds_wrong_ext ----------------- f (1 row) -- Valid credentials file SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "credentials_file": "/opt/nats/user.creds"}'::jsonb ) AS creds_valid; creds_valid ------------- t (1 row) -- Missing url SELECT ulak.validate_endpoint_config( 'nats', '{"subject": "test"}'::jsonb ) AS missing_url; WARNING: [ulak] ERROR: NATS config missing required key 'url' missing_url ------------- f (1 row) -- Missing subject SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222"}'::jsonb ) AS missing_subject; WARNING: [ulak] ERROR: NATS config missing required key 'subject' missing_subject ----------------- f (1 row) -- Empty subject SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": ""}'::jsonb ) AS empty_subject; WARNING: [ulak] ERROR: NATS config missing required key 'subject' empty_subject --------------- f (1 row) -- Invalid jetstream type SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "jetstream": "true"}'::jsonb ) AS invalid_jetstream_type; WARNING: [ulak] ERROR: NATS 'jetstream' must be a boolean invalid_jetstream_type ------------------------ f (1 row) -- Invalid tls type SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "tls": "true"}'::jsonb ) AS invalid_tls_type; WARNING: [ulak] ERROR: NATS 'tls' must be a boolean invalid_tls_type ------------------ f (1 row) -- Invalid headers type SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "headers": "bad"}'::jsonb ) AS invalid_headers_type; WARNING: [ulak] ERROR: NATS 'headers' must be an object invalid_headers_type ---------------------- f (1 row) -- Invalid options type SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "options": "bad"}'::jsonb ) AS invalid_options_type; WARNING: [ulak] ERROR: NATS 'options' must be an object invalid_options_type ---------------------- f (1 row) -- Unsupported option key SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "options": {"bad_option": "1"}}'::jsonb ) AS invalid_option_key; WARNING: [ulak] ERROR: NATS options contains unsupported key invalid_option_key -------------------- f (1 row) -- Unsupported option value type SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "options": {"max_reconnect": 10}}'::jsonb ) AS invalid_option_value_type; WARNING: [ulak] ERROR: NATS option values must be strings invalid_option_value_type --------------------------- f (1 row) -- Invalid option integer string SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "options": {"max_reconnect": "abc"}}'::jsonb ) AS invalid_option_integer_string; WARNING: [ulak] ERROR: NATS option values must be valid integer strings invalid_option_integer_string ------------------------------- f (1 row) -- Unknown config key SELECT ulak.validate_endpoint_config( 'nats', '{"url": "nats://localhost:4222", "subject": "test", "unknown_key": "value"}'::jsonb ) AS unknown_key; WARNING: [ulak] ERROR: NATS config contains unknown key 'unknown_key'. Allowed keys: url, subject, jetstream, stream, token, username, password, nkey_seed, credentials_file, tls, tls_ca_cert, tls_cert, tls_key, headers, options unknown_key ------------- f (1 row) -- ============================================================================ -- ENDPOINT MANAGEMENT -- ============================================================================ -- Alter endpoint SELECT ulak.alter_endpoint( 'nats_minimal', '{"url": "nats://newhost:4222", "subject": "orders.updated"}'::jsonb ) AS altered; INFO: [ulak] Altered endpoint 'nats_minimal' altered --------- t (1 row) SELECT config->>'url' AS new_url, config->>'subject' AS new_subject FROM ulak.endpoints WHERE name = 'nats_minimal'; new_url | new_subject ---------------------+---------------- nats://newhost:4222 | orders.updated (1 row) -- Disable/Enable SELECT ulak.disable_endpoint('nats_minimal') AS disabled; disabled ---------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'nats_minimal'; enabled --------- f (1 row) SELECT ulak.enable_endpoint('nats_minimal') AS enabled; enabled --------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'nats_minimal'; enabled --------- t (1 row) -- Send message (will be pending — no actual NATS server for regression tests) SELECT ulak.send('nats_minimal', '{"event": "test.nats"}'::jsonb) AS sent; sent ------ t (1 row) SELECT count(*) AS nats_queue_count FROM ulak.queue q JOIN ulak.endpoints e ON q.endpoint_id = e.id WHERE e.name = 'nats_minimal'; nats_queue_count ------------------ 1 (1 row) -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id IN ( SELECT id FROM ulak.endpoints WHERE name LIKE 'nats_%' ); SELECT ulak.drop_endpoint('nats_minimal') AS drop1; INFO: [ulak] Dropped endpoint 'nats_minimal' drop1 ------- t (1 row) SELECT ulak.drop_endpoint('nats_core') AS drop2; INFO: [ulak] Dropped endpoint 'nats_core' drop2 ------- t (1 row) SELECT ulak.drop_endpoint('nats_full') AS drop3; INFO: [ulak] Dropped endpoint 'nats_full' drop3 ------- t (1 row) SELECT ulak.drop_endpoint('nats_cluster') AS drop4; INFO: [ulak] Dropped endpoint 'nats_cluster' drop4 ------- t (1 row)