-- 22_mqtt_config.sql: MQTT endpoint configuration validation -- pg_regress test for ulak -- NOTE: Only runs when built with ENABLE_MQTT=1 -- ============================================================================ -- VALID MQTT CONFIGS -- ============================================================================ -- Minimal valid MQTT config (broker + topic required) SELECT ulak.create_endpoint( 'mqtt_minimal', 'mqtt', '{"broker": "localhost", "topic": "test/topic"}'::jsonb ) IS NOT NULL AS minimal_created; INFO: [ulak] Created endpoint with ID 57 minimal_created ----------------- t (1 row) -- MQTT config with all optional fields SELECT ulak.create_endpoint( 'mqtt_full', 'mqtt', '{"broker": "mqtt.example.com", "port": 8883, "topic": "events/orders", "qos": 1, "retain": true, "client_id": "pgx-test-01", "username": "user", "password": "secret", "clean_session": false}'::jsonb ) IS NOT NULL AS full_created; INFO: [ulak] Created endpoint with ID 58 full_created -------------- t (1 row) -- MQTT config with TLS SELECT ulak.create_endpoint( 'mqtt_tls', 'mqtt', '{"broker": "mqtt-tls", "topic": "secure/topic", "tls": true, "tls_ca_cert": "/certs/ca.pem", "tls_cert": "/certs/client.pem", "tls_key": "/certs/client-key.pem"}'::jsonb ) IS NOT NULL AS tls_created; INFO: [ulak] Created endpoint with ID 59 tls_created ------------- t (1 row) -- MQTT config with Last Will and Testament SELECT ulak.create_endpoint( 'mqtt_lwt', 'mqtt', '{"broker": "localhost", "topic": "events/data", "will_topic": "events/status", "will_payload": "{\"status\": \"offline\"}", "will_qos": 1, "will_retain": true}'::jsonb ) IS NOT NULL AS lwt_created; INFO: [ulak] Created endpoint with ID 60 lwt_created ------------- t (1 row) -- Verify configs stored correctly SELECT name, protocol, config->>'broker' AS broker, config->>'topic' AS topic FROM ulak.endpoints WHERE name LIKE 'mqtt_%' ORDER BY name; name | protocol | broker | topic --------------+----------+------------------+--------------- mqtt_full | mqtt | mqtt.example.com | events/orders mqtt_lwt | mqtt | localhost | events/data mqtt_minimal | mqtt | localhost | test/topic mqtt_tls | mqtt | mqtt-tls | secure/topic (4 rows) -- ============================================================================ -- VALIDATE_ENDPOINT_CONFIG FOR MQTT -- ============================================================================ -- Valid minimal config SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test"}'::jsonb ) AS valid_minimal; valid_minimal --------------- t (1 row) -- Valid with QoS levels SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "qos": 0}'::jsonb ) AS valid_qos0; valid_qos0 ------------ t (1 row) SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "qos": 1}'::jsonb ) AS valid_qos1; valid_qos1 ------------ t (1 row) SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "qos": 2}'::jsonb ) AS valid_qos2; valid_qos2 ------------ t (1 row) -- Valid with nested options object SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "options": {"keepalive": 60}}'::jsonb ) AS valid_options_object; valid_options_object ---------------------- t (1 row) -- ============================================================================ -- INVALID MQTT CONFIGS -- ============================================================================ -- Topic with single-level wildcard (invalid for publish) SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "sensor/+/data"}'::jsonb ) AS wildcard_plus; WARNING: [ulak] ERROR: MQTT config: 'topic' must not contain wildcard characters ('+' or '#') wildcard_plus --------------- f (1 row) -- Topic with multi-level wildcard (invalid for publish) SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "sensor/#"}'::jsonb ) AS wildcard_hash; WARNING: [ulak] ERROR: MQTT config: 'topic' must not contain wildcard characters ('+' or '#') wildcard_hash --------------- f (1 row) -- Missing broker SELECT ulak.validate_endpoint_config( 'mqtt', '{"topic": "no-broker"}'::jsonb ) AS missing_broker; WARNING: [ulak] ERROR: MQTT config: 'broker' is required and must be non-empty string missing_broker ---------------- f (1 row) -- Missing topic SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost"}'::jsonb ) AS missing_topic; WARNING: [ulak] ERROR: MQTT config: 'topic' is required and must be non-empty string missing_topic --------------- f (1 row) -- Empty broker SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "", "topic": "test"}'::jsonb ) AS empty_broker; WARNING: [ulak] ERROR: MQTT config: 'broker' is required and must be non-empty string empty_broker -------------- f (1 row) -- Invalid qos range SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "qos": 3}'::jsonb ) AS invalid_qos_range; WARNING: [ulak] ERROR: MQTT config: 'qos' must be between 0 and 2 invalid_qos_range ------------------- f (1 row) -- Invalid qos type SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "qos": "1"}'::jsonb ) AS invalid_qos_type; WARNING: [ulak] ERROR: MQTT config: 'qos' must be a number invalid_qos_type ------------------ f (1 row) -- Invalid port range SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "port": 70000}'::jsonb ) AS invalid_port; WARNING: [ulak] ERROR: MQTT config: 'port' must be between 1 and 65535 invalid_port -------------- f (1 row) -- Invalid tls type SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "tls": "true"}'::jsonb ) AS invalid_tls_type; WARNING: [ulak] ERROR: MQTT config: 'tls' must be a boolean invalid_tls_type ------------------ f (1 row) -- Invalid options type SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "options": "bad"}'::jsonb ) AS invalid_options_type; WARNING: [ulak] ERROR: MQTT config: 'options' must be an object invalid_options_type ---------------------- f (1 row) -- Invalid will_qos range SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "will_qos": 9}'::jsonb ) AS invalid_will_qos; WARNING: [ulak] ERROR: MQTT config: 'will_qos' must be between 0 and 2 invalid_will_qos ------------------ f (1 row) -- Unknown config key SELECT ulak.validate_endpoint_config( 'mqtt', '{"broker": "localhost", "topic": "test", "unknown_key": "value"}'::jsonb ) AS unknown_key; WARNING: [ulak] ERROR: MQTT config contains unknown key 'unknown_key'. Allowed keys: broker, topic, qos, port, client_id, retain, username, password, options, tls, tls_ca_cert, tls_cert, tls_key, tls_insecure, will_topic, will_payload, will_qos, will_retain, clean_session unknown_key ------------- f (1 row) -- ============================================================================ -- MQTT ENDPOINT CRUD OPERATIONS -- ============================================================================ -- Alter endpoint config SELECT ulak.alter_endpoint( 'mqtt_minimal', '{"broker": "new-broker", "topic": "new/topic", "qos": 1}'::jsonb ) AS altered; INFO: [ulak] Altered endpoint 'mqtt_minimal' altered --------- t (1 row) SELECT config->>'broker' AS new_broker, config->>'topic' AS new_topic FROM ulak.endpoints WHERE name = 'mqtt_minimal'; new_broker | new_topic ------------+----------- new-broker | new/topic (1 row) -- Enable/disable SELECT ulak.disable_endpoint('mqtt_minimal') AS disabled; disabled ---------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'mqtt_minimal'; enabled --------- f (1 row) SELECT ulak.enable_endpoint('mqtt_minimal') AS enabled; enabled --------- t (1 row) SELECT enabled FROM ulak.endpoints WHERE name = 'mqtt_minimal'; enabled --------- t (1 row) -- ============================================================================ -- MQTT MESSAGE SEND (queue insertion) -- ============================================================================ SELECT ulak.send('mqtt_full', '{"event": "temperature", "value": 23.5}'::jsonb) AS sent; sent ------ t (1 row) SELECT count(*) AS mqtt_queue_count FROM ulak.queue q JOIN ulak.endpoints e ON q.endpoint_id = e.id WHERE e.protocol = 'mqtt'; mqtt_queue_count ------------------ 1 (1 row) -- ============================================================================ -- CLEANUP -- ============================================================================ DELETE FROM ulak.queue WHERE endpoint_id IN ( SELECT id FROM ulak.endpoints WHERE name LIKE 'mqtt_%' ); SELECT ulak.drop_endpoint('mqtt_minimal'); INFO: [ulak] Dropped endpoint 'mqtt_minimal' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('mqtt_full'); INFO: [ulak] Dropped endpoint 'mqtt_full' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('mqtt_tls'); INFO: [ulak] Dropped endpoint 'mqtt_tls' drop_endpoint --------------- t (1 row) SELECT ulak.drop_endpoint('mqtt_lwt'); INFO: [ulak] Dropped endpoint 'mqtt_lwt' drop_endpoint --------------- t (1 row)