/* */ /* This file is auto generated by pgrx. The ordering of items is not stable, it is driven by a dependency graph. */ /* */ /* */ -- src/lib.rs:213 -- bootstrap -- Extension schemas CREATE SCHEMA IF NOT EXISTS pgtrickle; CREATE SCHEMA IF NOT EXISTS pgtrickle_changes; -- F51: Restrict change buffer schema access to prevent unauthorized -- injection of bogus changes that would be applied on next refresh. REVOKE ALL ON SCHEMA pgtrickle_changes FROM PUBLIC; -- User-declared refresh groups for snapshot consistency CREATE TABLE IF NOT EXISTS pgtrickle.pgt_refresh_groups ( group_id SERIAL PRIMARY KEY, group_name TEXT NOT NULL UNIQUE, member_oids OID[] NOT NULL, isolation TEXT NOT NULL DEFAULT 'read_committed' CHECK (isolation IN ('read_committed', 'repeatable_read')), created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Core ST metadata CREATE TABLE IF NOT EXISTS pgtrickle.pgt_stream_tables ( pgt_id BIGSERIAL PRIMARY KEY, pgt_relid OID NOT NULL UNIQUE, pgt_name TEXT NOT NULL, pgt_schema TEXT NOT NULL, defining_query TEXT NOT NULL, original_query TEXT, schedule TEXT, refresh_mode TEXT NOT NULL DEFAULT 'DIFFERENTIAL' CHECK (refresh_mode IN ('FULL', 'DIFFERENTIAL', 'IMMEDIATE')), status TEXT NOT NULL DEFAULT 'INITIALIZING' CHECK (status IN ('INITIALIZING', 'ACTIVE', 'SUSPENDED', 'ERROR')), is_populated BOOLEAN NOT NULL DEFAULT FALSE, data_timestamp TIMESTAMPTZ, frontier JSONB, last_refresh_at TIMESTAMPTZ, consecutive_errors INT NOT NULL DEFAULT 0, needs_reinit BOOLEAN NOT NULL DEFAULT FALSE, auto_threshold DOUBLE PRECISION, last_full_ms DOUBLE PRECISION, functions_used TEXT[], topk_limit INT, topk_order_by TEXT, topk_offset INT, diamond_consistency TEXT NOT NULL DEFAULT 'atomic' CHECK (diamond_consistency IN ('none', 'atomic')), diamond_schedule_policy TEXT NOT NULL DEFAULT 'fastest' CHECK (diamond_schedule_policy IN ('fastest', 'slowest')), has_keyless_source BOOLEAN NOT NULL DEFAULT FALSE, function_hashes TEXT, requested_cdc_mode TEXT CHECK (requested_cdc_mode IN ('auto', 'trigger', 'wal')), is_append_only BOOLEAN NOT NULL DEFAULT FALSE, scc_id INT, last_fixpoint_iterations INT, max_differential_joins INT, max_delta_fraction DOUBLE PRECISION, pooler_compatibility_mode BOOLEAN NOT NULL DEFAULT FALSE, refresh_tier TEXT NOT NULL DEFAULT 'hot' CHECK (refresh_tier IN ('hot', 'warm', 'cold', 'frozen')), effective_refresh_mode TEXT, fuse_mode TEXT NOT NULL DEFAULT 'off' CHECK (fuse_mode IN ('off', 'on', 'auto')), fuse_state TEXT NOT NULL DEFAULT 'armed' CHECK (fuse_state IN ('armed', 'blown', 'disabled')), fuse_ceiling BIGINT, fuse_sensitivity INT, blown_at TIMESTAMPTZ, blow_reason TEXT, last_error_message TEXT, last_error_at TIMESTAMPTZ, downstream_publication_name TEXT, freshness_deadline_ms BIGINT, st_partition_key TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_pgt_status ON pgtrickle.pgt_stream_tables (status); CREATE UNIQUE INDEX IF NOT EXISTS idx_pgt_name ON pgtrickle.pgt_stream_tables (pgt_schema, pgt_name); -- PERF-4: Scheduler hot‐path lookup by relation OID. CREATE INDEX IF NOT EXISTS idx_pgt_relid ON pgtrickle.pgt_stream_tables (pgt_relid); -- DAG edges CREATE TABLE IF NOT EXISTS pgtrickle.pgt_dependencies ( pgt_id BIGINT NOT NULL REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, source_relid OID NOT NULL, source_type TEXT NOT NULL CHECK (source_type IN ('TABLE', 'STREAM_TABLE', 'VIEW', 'MATVIEW', 'FOREIGN_TABLE')), columns_used TEXT[], column_snapshot JSONB, schema_fingerprint TEXT, cdc_mode TEXT NOT NULL DEFAULT 'TRIGGER' CHECK (cdc_mode IN ('TRIGGER', 'TRANSITIONING', 'WAL')), slot_name TEXT, decoder_confirmed_lsn PG_LSN, transition_started_at TIMESTAMPTZ, PRIMARY KEY (pgt_id, source_relid) ); CREATE INDEX IF NOT EXISTS idx_deps_source ON pgtrickle.pgt_dependencies (source_relid); -- PERF-4: Fast lookup by pgt_id (non‐PK prefix for multi‐column PK). CREATE INDEX IF NOT EXISTS idx_deps_pgt_id ON pgtrickle.pgt_dependencies (pgt_id); -- Refresh history / audit log CREATE TABLE IF NOT EXISTS pgtrickle.pgt_refresh_history ( refresh_id BIGSERIAL PRIMARY KEY, pgt_id BIGINT NOT NULL REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, data_timestamp TIMESTAMPTZ NOT NULL, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ, action TEXT NOT NULL CHECK (action IN ('NO_DATA', 'FULL', 'DIFFERENTIAL', 'REINITIALIZE', 'SKIP')), rows_inserted BIGINT DEFAULT 0, rows_deleted BIGINT DEFAULT 0, delta_row_count BIGINT DEFAULT 0, merge_strategy_used TEXT, was_full_fallback BOOLEAN NOT NULL DEFAULT FALSE, error_message TEXT, status TEXT NOT NULL CHECK (status IN ('RUNNING', 'COMPLETED', 'FAILED', 'SKIPPED')), initiated_by TEXT CHECK (initiated_by IN ('SCHEDULER', 'MANUAL', 'INITIAL', 'SELF_MONITOR')), freshness_deadline TIMESTAMPTZ, tick_watermark_lsn PG_LSN, fixpoint_iteration INT ); CREATE INDEX IF NOT EXISTS idx_hist_pgt_ts ON pgtrickle.pgt_refresh_history (pgt_id, data_timestamp); -- PERF-1: Fast lookup by (pgt_id, start_time) for self-monitoring and scheduler_overhead queries. CREATE INDEX IF NOT EXISTS idx_hist_pgt_start ON pgtrickle.pgt_refresh_history (pgt_id, start_time); -- Per-source CDC slot tracking CREATE TABLE IF NOT EXISTS pgtrickle.pgt_change_tracking ( source_relid OID PRIMARY KEY, slot_name TEXT NOT NULL, last_consumed_lsn PG_LSN, tracked_by_pgt_ids BIGINT[] ); -- Scheduler job table for parallel refresh dispatch CREATE TABLE IF NOT EXISTS pgtrickle.pgt_scheduler_jobs ( job_id BIGSERIAL PRIMARY KEY, dag_version BIGINT NOT NULL, unit_key TEXT NOT NULL, unit_kind TEXT NOT NULL CHECK (unit_kind IN ('singleton', 'atomic_group', 'immediate_closure', 'cyclic_scc', 'repeatable_read_group', 'fused_chain')), member_pgt_ids BIGINT[] NOT NULL, root_pgt_id BIGINT NOT NULL, status TEXT NOT NULL DEFAULT 'QUEUED' CHECK (status IN ('QUEUED', 'RUNNING', 'SUCCEEDED', 'RETRYABLE_FAILED', 'PERMANENT_FAILED', 'CANCELLED')), scheduler_pid INT NOT NULL, worker_pid INT, attempt_no INT NOT NULL DEFAULT 1, enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, outcome_detail TEXT, retryable BOOLEAN ); CREATE INDEX IF NOT EXISTS idx_sched_jobs_status_enqueued ON pgtrickle.pgt_scheduler_jobs (status, enqueued_at); CREATE INDEX IF NOT EXISTS idx_sched_jobs_unit_status ON pgtrickle.pgt_scheduler_jobs (unit_key, status); CREATE INDEX IF NOT EXISTS idx_sched_jobs_finished ON pgtrickle.pgt_scheduler_jobs (finished_at) WHERE finished_at IS NOT NULL; -- Bootstrap source gates (v0.5.0, Phase 3) -- Records which source tables are currently "gated" (bootstrapping in progress). -- When a source is gated, all stream tables that depend on it are skipped by -- the scheduler until pgtrickle.ungate_source() is called. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_source_gates ( source_relid OID PRIMARY KEY, gated BOOLEAN NOT NULL DEFAULT true, gated_at TIMESTAMPTZ NOT NULL DEFAULT now(), ungated_at TIMESTAMPTZ, gated_by TEXT ); -- Per-source watermark state: tracks how far each external source has been loaded. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_watermarks ( source_relid OID PRIMARY KEY, watermark TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), advanced_by TEXT, wal_lsn_at_advance TEXT ); -- Watermark groups: declare that a set of sources must be temporally aligned. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_watermark_groups ( group_id SERIAL PRIMARY KEY, group_name TEXT UNIQUE NOT NULL, source_relids OID[] NOT NULL, tolerance_secs DOUBLE PRECISION NOT NULL DEFAULT 0, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- DB-3: Schema version tracking table. -- Records which schema migration versions have been applied to this database. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_schema_version ( version TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT now(), description TEXT ); INSERT INTO pgtrickle.pgt_schema_version (version, description) VALUES ('0.19.0', 'Initial schema version tracking') ON CONFLICT (version) DO NOTHING; SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_stream_tables', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_dependencies', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_source_gates', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_watermarks', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_watermark_groups', ''); -- G14-SHC: Shared template cache (catalog-backed, UNLOGGED) CREATE UNLOGGED TABLE IF NOT EXISTS pgtrickle.pgt_template_cache ( pgt_id BIGINT PRIMARY KEY REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, query_hash BIGINT NOT NULL, delta_sql TEXT NOT NULL, columns TEXT[] NOT NULL, source_oids INTEGER[] NOT NULL, is_dedup BOOLEAN NOT NULL DEFAULT FALSE, key_changed BOOLEAN NOT NULL DEFAULT FALSE, all_algebraic BOOLEAN NOT NULL DEFAULT FALSE, cached_at TIMESTAMPTZ NOT NULL DEFAULT now() ); /* */ /* */ -- src/lib.rs:475 -- Create event trigger functions with correct RETURNS event_trigger type. -- pgrx's #[pg_extern] generates RETURNS void, which PostgreSQL rejects for -- event triggers. We create them manually here with the correct return type. CREATE FUNCTION pgtrickle."_on_ddl_end"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_ddl_end_wrapper'; CREATE FUNCTION pgtrickle."_on_sql_drop"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_sql_drop_wrapper'; -- Event trigger: track ALTER TABLE on upstream sources CREATE EVENT TRIGGER pg_trickle_ddl_tracker ON ddl_command_end EXECUTE FUNCTION pgtrickle._on_ddl_end(); -- Event trigger: track DROP TABLE on upstream sources / ST storage tables CREATE EVENT TRIGGER pg_trickle_drop_tracker ON sql_drop EXECUTE FUNCTION pgtrickle._on_sql_drop(); /* */ /* */ -- src/lib.rs:632 CREATE OR REPLACE FUNCTION pgtrickle."pause_all"() RETURNS void LANGUAGE plpgsql AS $$ BEGIN UPDATE pgtrickle.pgt_stream_tables SET status = 'PAUSED' WHERE status = 'ACTIVE'; RAISE NOTICE 'pg_trickle: all stream tables paused.'; END; $$; COMMENT ON FUNCTION pgtrickle."pause_all"() IS 'Pause automatic refreshes for every ACTIVE stream table. ' 'Use pgtrickle.resume_all() to re-activate them.'; CREATE OR REPLACE FUNCTION pgtrickle."resume_all"() RETURNS void LANGUAGE plpgsql AS $$ BEGIN UPDATE pgtrickle.pgt_stream_tables SET status = 'ACTIVE' WHERE status = 'PAUSED'; RAISE NOTICE 'pg_trickle: all paused stream tables resumed.'; END; $$; COMMENT ON FUNCTION pgtrickle."resume_all"() IS 'Re-activate all stream tables that were paused with pgtrickle.pause_all().'; /* */ /* */ -- src/lib.rs:895 -- OUTBOX-1 (v0.28.0): Per-stream-table outbox configuration. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_outbox_config ( stream_table_oid OID NOT NULL PRIMARY KEY, stream_table_name TEXT NOT NULL, outbox_table_name TEXT NOT NULL, retention_hours INT NOT NULL DEFAULT 24, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), last_drained_at TIMESTAMPTZ, last_drained_count BIGINT NOT NULL DEFAULT 0, CONSTRAINT uq_outbox_table_name UNIQUE (outbox_table_name) ); CREATE INDEX IF NOT EXISTS idx_pgt_outbox_config_name ON pgtrickle.pgt_outbox_config (stream_table_name); COMMENT ON TABLE pgtrickle.pgt_outbox_config IS 'OUTBOX-1 (v0.28.0): Catalog of stream tables with the transactional outbox pattern enabled.'; -- INBOX-1 (v0.28.0): Named inbox configurations. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_inbox_config ( inbox_name TEXT NOT NULL PRIMARY KEY, inbox_schema TEXT NOT NULL DEFAULT 'pgtrickle', max_retries INT NOT NULL DEFAULT 3, schedule TEXT NOT NULL DEFAULT '1s', with_dead_letter BOOL NOT NULL DEFAULT true, with_stats BOOL NOT NULL DEFAULT true, retention_hours INT NOT NULL DEFAULT 72, id_column TEXT NOT NULL DEFAULT 'event_id', processed_at_column TEXT NOT NULL DEFAULT 'processed_at', retry_count_column TEXT NOT NULL DEFAULT 'retry_count', error_column TEXT NOT NULL DEFAULT 'error', received_at_column TEXT NOT NULL DEFAULT 'received_at', event_type_column TEXT NOT NULL DEFAULT 'event_type', is_managed BOOL NOT NULL DEFAULT true, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); COMMENT ON TABLE pgtrickle.pgt_inbox_config IS 'INBOX-1 (v0.28.0): Catalog of named transactional inbox configurations.'; -- OUTBOX-B1 (v0.28.0): Consumer groups for the outbox pattern. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_consumer_groups ( group_name TEXT NOT NULL PRIMARY KEY, outbox_name TEXT NOT NULL, auto_offset_reset TEXT NOT NULL DEFAULT 'latest', created_at TIMESTAMPTZ NOT NULL DEFAULT now(), CONSTRAINT chk_consumer_group_auto_offset_reset CHECK (auto_offset_reset IN ('earliest', 'latest')) ); CREATE INDEX IF NOT EXISTS idx_pgt_consumer_groups_outbox ON pgtrickle.pgt_consumer_groups (outbox_name); COMMENT ON TABLE pgtrickle.pgt_consumer_groups IS 'OUTBOX-B1 (v0.28.0): Named consumer groups that track consumption progress on an outbox.'; -- OUTBOX-B2 (v0.28.0): Per-consumer committed offsets within a group. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_consumer_offsets ( group_name TEXT NOT NULL REFERENCES pgtrickle.pgt_consumer_groups(group_name) ON DELETE CASCADE, consumer_id TEXT NOT NULL, committed_offset BIGINT NOT NULL DEFAULT 0, last_committed_at TIMESTAMPTZ, last_heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT now(), PRIMARY KEY (group_name, consumer_id) ); COMMENT ON TABLE pgtrickle.pgt_consumer_offsets IS 'OUTBOX-B2 (v0.28.0): Per-consumer committed offsets and heartbeat tracking.'; -- OUTBOX-B3 (v0.28.0): Visibility leases granted by poll_outbox(). CREATE TABLE IF NOT EXISTS pgtrickle.pgt_consumer_leases ( group_name TEXT NOT NULL, consumer_id TEXT NOT NULL, batch_start BIGINT NOT NULL, batch_end BIGINT NOT NULL, lease_expires TIMESTAMPTZ NOT NULL, PRIMARY KEY (group_name, consumer_id), FOREIGN KEY (group_name, consumer_id) REFERENCES pgtrickle.pgt_consumer_offsets(group_name, consumer_id) ON DELETE CASCADE ); COMMENT ON TABLE pgtrickle.pgt_consumer_leases IS 'OUTBOX-B3 (v0.28.0): Visibility leases for in-flight outbox message batches.'; -- INBOX-B1 (v0.28.0): Per-inbox ordering configuration. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_inbox_ordering_config ( inbox_name TEXT NOT NULL PRIMARY KEY REFERENCES pgtrickle.pgt_inbox_config(inbox_name) ON DELETE CASCADE, aggregate_id_col TEXT NOT NULL, sequence_num_col TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); COMMENT ON TABLE pgtrickle.pgt_inbox_ordering_config IS 'INBOX-B1 (v0.28.0): Ordering configuration for per-aggregate sequenced inbox processing.'; -- INBOX-B2 (v0.28.0): Per-inbox priority tiers configuration. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_inbox_priority_config ( inbox_name TEXT NOT NULL PRIMARY KEY REFERENCES pgtrickle.pgt_inbox_config(inbox_name) ON DELETE CASCADE, priority_col TEXT NOT NULL, tiers JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); COMMENT ON TABLE pgtrickle.pgt_inbox_priority_config IS 'INBOX-B2 (v0.28.0): Priority tier configuration for inbox message processing.'; /* */ /* */ -- src/lib.rs:50 CREATE SCHEMA IF NOT EXISTS pgtrickle; /* pg_trickle::pgtrickle */ /* */ /* */ -- src/api/publication.rs:73 -- pg_trickle::api::publication::drop_stream_table_publication CREATE FUNCTION pgtrickle."drop_stream_table_publication"( "name" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_stream_table_publication_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:812 -- pg_trickle::api::diagnostics::diamond_groups CREATE FUNCTION pgtrickle."diamond_groups"() RETURNS TABLE ( "group_id" INT, /* i32 */ "member_name" TEXT, /* String */ "member_schema" TEXT, /* String */ "is_convergence" bool, /* bool */ "epoch" bigint, /* i64 */ "schedule_policy" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'diamond_groups_wrapper'; /* */ /* */ -- src/api/snapshot.rs:360 -- pg_trickle::api::snapshot::drop_snapshot CREATE FUNCTION pgtrickle."drop_snapshot"( "p_snapshot_table" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_snapshot_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1084 -- pg_trickle::api::diagnostics::advance_watermark CREATE FUNCTION pgtrickle."advance_watermark"( "source" TEXT, /* & str */ "watermark" timestamp with time zone /* TimestampWithTimeZone */ ) RETURNS VOID /* Result < (), PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'advance_watermark_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1153 -- pg_trickle::api::diagnostics::drop_watermark_group CREATE FUNCTION pgtrickle."drop_watermark_group"( "group_name" TEXT /* & str */ ) RETURNS VOID /* Result < (), PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_watermark_group_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1501 -- pg_trickle::api::diagnostics::worker_allocation_status CREATE FUNCTION pgtrickle."worker_allocation_status"() RETURNS TABLE ( "db_name" TEXT, /* String */ "workers_used" bigint, /* i64 */ "workers_quota" bigint, /* i64 */ "workers_queued" bigint, /* i64 */ "cluster_active" bigint, /* i64 */ "cluster_max" bigint /* i64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'worker_allocation_status_fn_wrapper'; /* */ /* */ -- src/api/helpers.rs:2457 -- pg_trickle::api::helpers::refresh_efficiency CREATE FUNCTION pgtrickle."refresh_efficiency"() RETURNS TABLE ( "pgt_schema" TEXT, /* String */ "pgt_name" TEXT, /* String */ "refresh_mode" TEXT, /* String */ "total_refreshes" bigint, /* i64 */ "diff_count" bigint, /* i64 */ "full_count" bigint, /* i64 */ "avg_diff_ms" double precision, /* Option < f64 > */ "avg_full_ms" double precision, /* Option < f64 > */ "avg_change_ratio" double precision, /* Option < f64 > */ "diff_speedup" TEXT, /* Option < String > */ "last_refresh_at" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_efficiency_wrapper'; /* */ /* */ -- src/api/planner.rs:186 -- pg_trickle::api::planner::schedule_recommendations CREATE FUNCTION pgtrickle."schedule_recommendations"() RETURNS TABLE ( "name" TEXT, /* Option < String > */ "current_interval_seconds" double precision, /* Option < f64 > */ "recommended_interval_seconds" double precision, /* Option < f64 > */ "delta_pct" double precision, /* Option < f64 > */ "confidence" double precision, /* Option < f64 > */ "reasoning" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'schedule_recommendations_wrapper'; /* */ /* */ -- src/monitor.rs:3029 -- pg_trickle::monitor::trigger_inventory CREATE FUNCTION pgtrickle."trigger_inventory"() RETURNS TABLE ( "source_table" TEXT, /* String */ "source_oid" bigint, /* i64 */ "trigger_name" TEXT, /* String */ "trigger_type" TEXT, /* String */ "present" bool, /* bool */ "enabled" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'trigger_inventory_wrapper'; /* */ /* */ -- src/diagnostics.rs:49 -- pg_trickle::diagnostics::explain_query_rewrite CREATE FUNCTION pgtrickle."explain_query_rewrite"( "query" TEXT /* & str */ ) RETURNS TABLE ( "pass_name" TEXT, /* String */ "changed" bool, /* bool */ "sql_after" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_query_rewrite_wrapper'; /* */ /* */ -- src/monitor.rs:906 -- pg_trickle::monitor::slot_health CREATE FUNCTION pgtrickle."slot_health"() RETURNS TABLE ( "slot_name" TEXT, /* String */ "source_relid" bigint, /* i64 */ "active" bool, /* bool */ "retained_wal_bytes" bigint, /* i64 */ "wal_status" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'slot_health_wrapper'; /* */ /* */ -- src/monitor.rs:2089 -- pg_trickle::monitor::change_buffer_sizes CREATE FUNCTION pgtrickle."change_buffer_sizes"() RETURNS TABLE ( "stream_table" TEXT, /* String */ "source_table" TEXT, /* String */ "source_oid" bigint, /* i64 */ "cdc_mode" TEXT, /* String */ "pending_rows" bigint, /* i64 */ "buffer_bytes" bigint /* i64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'change_buffer_sizes_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:19 -- pg_trickle::api::diagnostics::version_check CREATE FUNCTION pgtrickle."version_check"() RETURNS TEXT /* String */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'version_check_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1214 -- pg_trickle::api::diagnostics::watermark_groups CREATE FUNCTION pgtrickle."watermark_groups"() RETURNS TABLE ( "group_name" TEXT, /* String */ "source_count" INT, /* i32 */ "tolerance_secs" double precision, /* f64 */ "created_at" timestamp with time zone /* TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermark_groups_fn_wrapper'; /* */ /* */ -- src/api/inbox.rs:121 -- pg_trickle::api::inbox::create_inbox CREATE FUNCTION pgtrickle."create_inbox"( "p_name" TEXT, /* & str */ "p_schema" TEXT DEFAULT 'pgtrickle', /* & str */ "p_max_retries" INT DEFAULT 3, /* i32 */ "p_schedule" TEXT DEFAULT '1s', /* & str */ "p_with_dead_letter" bool DEFAULT true, /* bool */ "p_with_stats" bool DEFAULT true, /* bool */ "p_retention_hours" INT DEFAULT 72 /* i32 */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_inbox_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:566 -- pg_trickle::api::self_monitoring::explain_dag CREATE FUNCTION pgtrickle."explain_dag"( "format" TEXT DEFAULT 'mermaid' /* Option < & str > */ ) RETURNS TEXT /* Option < String > */ LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_dag_wrapper'; /* */ /* */ -- src/hash.rs:33 -- pg_trickle::hash::pg_trickle_hash CREATE FUNCTION pgtrickle."pg_trickle_hash"( "input" TEXT /* Option < & str > */ ) RETURNS bigint /* i64 */ IMMUTABLE PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:240 -- pg_trickle::api::self_monitoring::setup_self_monitoring CREATE FUNCTION pgtrickle."setup_self_monitoring"() RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'setup_self_monitoring_wrapper'; /* */ /* */ -- src/api/outbox.rs:804 -- pg_trickle::api::outbox::consumer_heartbeat CREATE FUNCTION pgtrickle."consumer_heartbeat"( "p_group" TEXT, /* & str */ "p_consumer" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'consumer_heartbeat_wrapper'; /* */ /* */ -- src/api/inbox.rs:852 -- pg_trickle::api::inbox::disable_inbox_priority CREATE FUNCTION pgtrickle."disable_inbox_priority"( "p_inbox" TEXT, /* & str */ "p_if_exists" bool DEFAULT false /* bool */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'disable_inbox_priority_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1440 -- pg_trickle::api::diagnostics::drop_refresh_group CREATE FUNCTION pgtrickle."drop_refresh_group"( "group_name" TEXT /* & str */ ) RETURNS VOID /* Result < (), PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_refresh_group_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:453 -- pg_trickle::api::self_monitoring::scheduler_overhead CREATE FUNCTION pgtrickle."scheduler_overhead"() RETURNS TABLE ( "total_refreshes_1h" bigint, /* i64 */ "df_refreshes_1h" bigint, /* i64 */ "df_refresh_fraction" double precision, /* Option < f64 > */ "avg_refresh_ms" double precision, /* Option < f64 > */ "avg_df_refresh_ms" double precision, /* Option < f64 > */ "total_refresh_time_s" double precision, /* Option < f64 > */ "df_refresh_time_s" double precision /* Option < f64 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'scheduler_overhead_wrapper'; /* */ /* */ -- src/api/inbox.rs:538 -- pg_trickle::api::inbox::inbox_status CREATE FUNCTION pgtrickle."inbox_status"( "p_name" TEXT DEFAULT NULL /* Option < & str > */ ) RETURNS TABLE ( "inbox_name" TEXT, /* String */ "inbox_schema" TEXT, /* String */ "max_retries" INT, /* i32 */ "schedule" TEXT, /* String */ "with_dead_letter" bool, /* bool */ "with_stats" bool, /* bool */ "created_at" timestamp with time zone /* Option < TimestampWithTimeZone > */ ) LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'inbox_status_wrapper'; /* */ /* */ -- src/api/snapshot.rs:95 -- pg_trickle::api::snapshot::snapshot_stream_table CREATE FUNCTION pgtrickle."snapshot_stream_table"( "p_name" TEXT, /* & str */ "p_target" TEXT DEFAULT NULL /* Option < & str > */ ) RETURNS TEXT /* String */ LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'snapshot_stream_table_wrapper'; /* */ /* */ -- src/api/mod.rs:624 -- pg_trickle::api::create_stream_table CREATE FUNCTION pgtrickle."create_stream_table"( "name" TEXT, /* & str */ "query" TEXT, /* & str */ "schedule" TEXT DEFAULT 'calculated', /* Option < & str > */ "refresh_mode" TEXT DEFAULT 'AUTO', /* & str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* Option < & str > */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* Option < & str > */ "cdc_mode" TEXT DEFAULT NULL, /* Option < & str > */ "append_only" bool DEFAULT false, /* bool */ "pooler_compatibility_mode" bool DEFAULT false, /* bool */ "partition_by" TEXT DEFAULT NULL, /* Option < & str > */ "max_differential_joins" INT DEFAULT NULL, /* Option < i32 > */ "max_delta_fraction" double precision DEFAULT NULL /* Option < f64 > */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_stream_table_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:8 -- pg_trickle::api::diagnostics::version CREATE FUNCTION pgtrickle."version"() RETURNS TEXT /* & '_ str */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'version_wrapper'; /* */ /* */ -- src/monitor.rs:864 -- pg_trickle::monitor::st_auto_threshold CREATE FUNCTION pgtrickle."st_auto_threshold"( "name" TEXT /* & str */ ) RETURNS double precision /* Option < f64 > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_auto_threshold_wrapper'; /* */ /* */ -- src/api/outbox.rs:498 -- pg_trickle::api::outbox::drop_consumer_group CREATE FUNCTION pgtrickle."drop_consumer_group"( "p_name" TEXT, /* & str */ "p_if_exists" bool DEFAULT false /* bool */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_consumer_group_wrapper'; /* */ /* */ -- src/monitor.rs:2249 -- pg_trickle::monitor::dependency_tree CREATE FUNCTION pgtrickle."dependency_tree"() RETURNS TABLE ( "tree_line" TEXT, /* String */ "node" TEXT, /* String */ "node_type" TEXT, /* String */ "depth" INT, /* i32 */ "status" TEXT, /* Option < String > */ "refresh_mode" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'dependency_tree_wrapper'; /* */ /* */ -- src/api/outbox.rs:748 -- pg_trickle::api::outbox::extend_lease CREATE FUNCTION pgtrickle."extend_lease"( "p_group" TEXT, /* & str */ "p_consumer" TEXT, /* & str */ "p_extension_seconds" INT DEFAULT 30 /* i32 */ ) RETURNS timestamp with time zone /* Option < TimestampWithTimeZone > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'extend_lease_wrapper'; /* */ /* */ -- src/api/outbox.rs:387 -- pg_trickle::api::outbox::outbox_rows_consumed CREATE FUNCTION pgtrickle."outbox_rows_consumed"( "p_stream_table" TEXT, /* & str */ "p_outbox_id" bigint /* i64 */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'outbox_rows_consumed_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1594 -- pg_trickle::api::diagnostics::clear_caches CREATE FUNCTION pgtrickle."clear_caches"() RETURNS bigint /* i64 */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'clear_caches_wrapper'; /* */ /* */ -- src/api/outbox.rs:529 -- pg_trickle::api::outbox::poll_outbox CREATE FUNCTION pgtrickle."poll_outbox"( "p_group" TEXT, /* & str */ "p_consumer" TEXT, /* & str */ "p_batch_size" INT DEFAULT 100, /* i32 */ "p_visibility_seconds" INT DEFAULT 30 /* i32 */ ) RETURNS TABLE ( "outbox_id" bigint, /* i64 */ "pgt_id" uuid, /* pgrx :: Uuid */ "created_at" timestamp with time zone, /* TimestampWithTimeZone */ "inserted_count" bigint, /* i64 */ "deleted_count" bigint, /* i64 */ "is_claim_check" bool, /* bool */ "payload" jsonb /* Option < pgrx :: JsonB > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'poll_outbox_wrapper'; /* */ /* */ -- src/api/outbox.rs:781 -- pg_trickle::api::outbox::seek_offset CREATE FUNCTION pgtrickle."seek_offset"( "p_group" TEXT, /* & str */ "p_consumer" TEXT, /* & str */ "p_new_offset" bigint /* i64 */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'seek_offset_wrapper'; /* */ /* */ -- src/hooks.rs:1028 -- pg_trickle::hooks::pg_trickle_on_sql_drop -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/api/inbox.rs:468 -- pg_trickle::api::inbox::inbox_health CREATE FUNCTION pgtrickle."inbox_health"( "p_name" TEXT /* & str */ ) RETURNS jsonb /* pgrx :: JsonB */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'inbox_health_wrapper'; /* */ /* */ -- src/monitor.rs:756 -- pg_trickle::monitor::get_refresh_history CREATE FUNCTION pgtrickle."get_refresh_history"( "name" TEXT, /* & str */ "max_rows" INT DEFAULT 20 /* i32 */ ) RETURNS TABLE ( "refresh_id" bigint, /* i64 */ "data_timestamp" timestamp with time zone, /* TimestampWithTimeZone */ "start_time" timestamp with time zone, /* TimestampWithTimeZone */ "end_time" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "action" TEXT, /* String */ "status" TEXT, /* String */ "rows_inserted" bigint, /* i64 */ "rows_deleted" bigint, /* i64 */ "duration_ms" double precision, /* Option < f64 > */ "error_message" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_refresh_history_wrapper'; /* */ /* */ -- src/api/metrics_ext.rs:22 -- pg_trickle::api::metrics_ext::metrics_summary CREATE FUNCTION pgtrickle."metrics_summary"() RETURNS TABLE ( "db_name" TEXT, /* Option < String > */ "total_stream_tables" bigint, /* Option < i64 > */ "active_stream_tables" bigint, /* Option < i64 > */ "suspended_stream_tables" bigint, /* Option < i64 > */ "total_refreshes" bigint, /* Option < i64 > */ "successful_refreshes" bigint, /* Option < i64 > */ "failed_refreshes" bigint, /* Option < i64 > */ "total_rows_processed" bigint, /* Option < i64 > */ "active_workers" INT /* Option < i32 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'metrics_summary_wrapper'; /* */ /* */ -- src/api/inbox.rs:748 -- pg_trickle::api::inbox::disable_inbox_ordering CREATE FUNCTION pgtrickle."disable_inbox_ordering"( "p_inbox" TEXT, /* & str */ "p_if_exists" bool DEFAULT false /* bool */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'disable_inbox_ordering_wrapper'; /* */ /* */ -- src/api/outbox.rs:308 -- pg_trickle::api::outbox::outbox_status CREATE FUNCTION pgtrickle."outbox_status"( "p_name" TEXT /* & str */ ) RETURNS jsonb /* pgrx :: JsonB */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'outbox_status_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1249 -- pg_trickle::api::diagnostics::watermark_status CREATE FUNCTION pgtrickle."watermark_status"() RETURNS TABLE ( "group_name" TEXT, /* String */ "min_watermark" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "max_watermark" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "lag_secs" double precision, /* Option < f64 > */ "aligned" bool, /* bool */ "sources_with_watermark" INT, /* i32 */ "sources_total" INT /* i32 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermark_status_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:951 -- pg_trickle::api::diagnostics::source_gates CREATE FUNCTION pgtrickle."source_gates"() RETURNS TABLE ( "source_table" TEXT, /* String */ "schema_name" TEXT, /* String */ "gated" bool, /* bool */ "gated_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "ungated_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "gated_by" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'source_gates_fn_wrapper'; /* */ /* */ -- src/monitor.rs:883 -- pg_trickle::monitor::get_staleness CREATE FUNCTION pgtrickle."get_staleness"( "name" TEXT /* & str */ ) RETURNS double precision /* Option < f64 > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_staleness_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:61 -- pg_trickle::api::diagnostics::migrate CREATE FUNCTION pgtrickle."migrate"() RETURNS TEXT /* String */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'migrate_wrapper'; /* */ /* */ -- src/api/outbox.rs:448 -- pg_trickle::api::outbox::create_consumer_group CREATE FUNCTION pgtrickle."create_consumer_group"( "p_name" TEXT, /* & str */ "p_outbox" TEXT, /* & str */ "p_auto_offset_reset" TEXT DEFAULT 'latest' /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_consumer_group_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:305 -- pg_trickle::api::self_monitoring::teardown_self_monitoring CREATE FUNCTION pgtrickle."teardown_self_monitoring"() RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'teardown_self_monitoring_wrapper'; /* */ /* */ -- src/api/planner.rs:117 -- pg_trickle::api::planner::recommend_schedule CREATE FUNCTION pgtrickle."recommend_schedule"( "p_name" TEXT /* & str */ ) RETURNS jsonb /* pgrx :: JsonB */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'recommend_schedule_wrapper'; /* */ /* */ -- src/monitor.rs:2811 -- pg_trickle::monitor::health_summary CREATE FUNCTION pgtrickle."health_summary"() RETURNS TABLE ( "total_stream_tables" INT, /* i32 */ "active_count" INT, /* i32 */ "error_count" INT, /* i32 */ "suspended_count" INT, /* i32 */ "stale_count" INT, /* i32 */ "reinit_pending" INT, /* i32 */ "max_staleness_seconds" double precision, /* Option < f64 > */ "scheduler_status" TEXT, /* String */ "cache_hit_rate" double precision /* Option < f64 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'health_summary_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1452 -- pg_trickle::api::diagnostics::refresh_groups CREATE FUNCTION pgtrickle."refresh_groups"() RETURNS TABLE ( "group_id" INT, /* i32 */ "group_name" TEXT, /* String */ "member_count" INT, /* i32 */ "isolation" TEXT, /* String */ "created_at" timestamp with time zone /* TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_groups_fn_wrapper'; /* */ /* */ -- src/hash.rs:54 -- pg_trickle::hash::pg_trickle_hash_multi CREATE FUNCTION pgtrickle."pg_trickle_hash_multi"( "inputs" TEXT[] /* Vec < Option < String > > */ ) RETURNS bigint /* i64 */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_multi_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:358 -- pg_trickle::api::self_monitoring::self_monitoring_status CREATE FUNCTION pgtrickle."self_monitoring_status"() RETURNS TABLE ( "st_name" TEXT, /* String */ "exists" bool, /* bool */ "status" TEXT, /* Option < String > */ "refresh_mode" TEXT, /* Option < String > */ "last_refresh_at" TEXT, /* Option < String > */ "total_refreshes" bigint /* Option < i64 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'self_monitoring_status_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1116 -- pg_trickle::api::diagnostics::create_watermark_group CREATE FUNCTION pgtrickle."create_watermark_group"( "group_name" TEXT, /* & str */ "sources" TEXT[], /* Vec < String > */ "tolerance_secs" double precision DEFAULT 0.0 /* f64 */ ) RETURNS INT /* Result < i32, PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_watermark_group_wrapper'; /* */ /* */ -- src/api/outbox.rs:238 -- pg_trickle::api::outbox::disable_outbox CREATE FUNCTION pgtrickle."disable_outbox"( "p_name" TEXT, /* & str */ "p_if_exists" bool DEFAULT false /* bool */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'disable_outbox_wrapper'; /* */ /* */ -- src/diagnostics.rs:570 -- pg_trickle::diagnostics::list_auxiliary_columns CREATE FUNCTION pgtrickle."list_auxiliary_columns"( "name" TEXT /* & str */ ) RETURNS TABLE ( "column_name" TEXT, /* String */ "data_type" TEXT, /* String */ "purpose" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'list_auxiliary_columns_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:905 -- pg_trickle::api::diagnostics::gate_source CREATE FUNCTION pgtrickle."gate_source"( "source" TEXT /* & str */ ) RETURNS VOID /* Result < (), PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'gate_source_wrapper'; /* */ /* */ -- src/api/inbox.rs:344 -- pg_trickle::api::inbox::enable_inbox_tracking CREATE FUNCTION pgtrickle."enable_inbox_tracking"( "p_name" TEXT, /* & str */ "p_table_ref" TEXT, /* & str */ "p_id_column" TEXT DEFAULT 'event_id', /* & str */ "p_processed_at_column" TEXT DEFAULT 'processed_at', /* & str */ "p_retry_count_column" TEXT DEFAULT 'retry_count', /* & str */ "p_error_column" TEXT DEFAULT 'error', /* & str */ "p_received_at_column" TEXT DEFAULT 'received_at', /* & str */ "p_event_type_column" TEXT DEFAULT 'event_type', /* & str */ "p_max_retries" INT DEFAULT 3, /* i32 */ "p_schedule" TEXT DEFAULT '1s' /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'enable_inbox_tracking_wrapper'; /* */ /* */ -- src/monitor.rs:1048 -- pg_trickle::monitor::explain_st CREATE FUNCTION pgtrickle."explain_st"( "name" TEXT, /* & str */ "with_analyze" bool DEFAULT false /* bool */ ) RETURNS TABLE ( "property" TEXT, /* String */ "value" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_st_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:95 -- pg_trickle::api::diagnostics::_signal_launcher_rescan CREATE FUNCTION pgtrickle."_signal_launcher_rescan"() RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', '_signal_launcher_rescan_wrapper'; /* */ /* */ -- src/ivm.rs:535 -- pg_trickle::ivm::pgt_ivm_apply_delta CREATE FUNCTION pgtrickle."pgt_ivm_apply_delta"( "pgt_id" bigint, /* i64 */ "source_oid" INT, /* i32 */ "has_new" bool, /* bool */ "has_old" bool /* bool */ ) RETURNS VOID /* Result < (), PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_ivm_apply_delta_wrapper'; /* */ /* */ -- src/api/outbox.rs:91 -- pg_trickle::api::outbox::enable_outbox CREATE FUNCTION pgtrickle."enable_outbox"( "p_name" TEXT, /* & str */ "p_retention_hours" INT DEFAULT 24 /* i32 */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'enable_outbox_wrapper'; /* */ /* */ -- src/api/publication.rs:16 -- pg_trickle::api::publication::stream_table_to_publication CREATE FUNCTION pgtrickle."stream_table_to_publication"( "name" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'stream_table_to_publication_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:741 -- pg_trickle::api::diagnostics::fuse_status CREATE FUNCTION pgtrickle."fuse_status"() RETURNS TABLE ( "stream_table" TEXT, /* String */ "fuse_mode" TEXT, /* String */ "fuse_state" TEXT, /* String */ "fuse_ceiling" bigint, /* Option < i64 > */ "effective_ceiling" bigint, /* Option < i64 > */ "fuse_sensitivity" INT, /* Option < i32 > */ "blown_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "blow_reason" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'fuse_status_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:223 -- pg_trickle::api::diagnostics::pgt_scc_status CREATE FUNCTION pgtrickle."pgt_scc_status"() RETURNS TABLE ( "scc_id" INT, /* i32 */ "member_count" INT, /* i32 */ "members" TEXT[], /* Vec < String > */ "last_iterations" INT, /* Option < i32 > */ "last_converged_at" timestamp with time zone /* Option < TimestampWithTimeZone > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_scc_status_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:154 -- pg_trickle::api::diagnostics::parse_duration_seconds CREATE FUNCTION pgtrickle."parse_duration_seconds"( "input" TEXT /* & str */ ) RETURNS bigint /* Option < i64 > */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'parse_duration_seconds_wrapper'; /* */ /* */ -- src/lib.rs:453 -- requires: -- parse_duration_seconds -- Status overview view (ERR-1d: last_error_message and last_error_at are -- included via st.* from pgt_stream_tables) CREATE OR REPLACE VIEW pgtrickle.stream_tables_info AS SELECT st.*, now() - st.last_refresh_at AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.last_refresh_at)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, CASE WHEN st.topk_limit IS NOT NULL THEN TRUE ELSE FALSE END AS is_topk FROM pgtrickle.pgt_stream_tables st; /* */ /* */ -- src/lib.rs:505 -- requires: -- parse_duration_seconds -- Convenience view: pg_stat_stream_tables -- Combines catalog metadata with aggregate refresh statistics. CREATE OR REPLACE VIEW pgtrickle.pg_stat_stream_tables AS SELECT st.pgt_id, st.pgt_schema, st.pgt_name, st.status, st.refresh_mode, st.is_populated, st.data_timestamp, st.schedule, now() - st.last_refresh_at AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.last_refresh_at IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.last_refresh_at)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, st.consecutive_errors, st.needs_reinit, st.last_refresh_at, COALESCE(stats.total_refreshes, 0) AS total_refreshes, COALESCE(stats.successful_refreshes, 0) AS successful_refreshes, COALESCE(stats.failed_refreshes, 0) AS failed_refreshes, COALESCE(stats.total_rows_inserted, 0) AS total_rows_inserted, COALESCE(stats.total_rows_deleted, 0) AS total_rows_deleted, stats.avg_duration_ms, stats.last_action, stats.last_status, (SELECT array_agg(DISTINCT d.cdc_mode ORDER BY d.cdc_mode) FROM pgtrickle.pgt_dependencies d WHERE d.pgt_id = st.pgt_id AND d.source_type = 'TABLE') AS cdc_modes, st.scc_id, st.last_fixpoint_iterations, st.refresh_tier FROM pgtrickle.pgt_stream_tables st LEFT JOIN LATERAL ( SELECT count(*)::bigint AS total_refreshes, count(*) FILTER (WHERE h.status = 'COMPLETED')::bigint AS successful_refreshes, count(*) FILTER (WHERE h.status = 'FAILED')::bigint AS failed_refreshes, COALESCE(sum(h.rows_inserted), 0)::bigint AS total_rows_inserted, COALESCE(sum(h.rows_deleted), 0)::bigint AS total_rows_deleted, CASE WHEN count(*) FILTER (WHERE h.end_time IS NOT NULL) > 0 THEN avg(EXTRACT(EPOCH FROM (h.end_time - h.start_time)) * 1000) FILTER (WHERE h.end_time IS NOT NULL) ELSE NULL END::float8 AS avg_duration_ms, (SELECT h2.action FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_action, (SELECT h2.status FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_status, (SELECT h2.initiated_by FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_initiated_by, (SELECT h2.freshness_deadline FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS freshness_deadline FROM pgtrickle.pgt_refresh_history h WHERE h.pgt_id = st.pgt_id ) stats ON true; -- Per-source CDC status view (G5): exposes cdc_mode, slot names, and -- transition timestamps for every TABLE dependency of a stream table. CREATE OR REPLACE VIEW pgtrickle.pgt_cdc_status AS SELECT st.pgt_schema, st.pgt_name, d.source_relid, c.relname AS source_name, n.nspname AS source_schema, d.cdc_mode, d.slot_name, d.decoder_confirmed_lsn, d.transition_started_at FROM pgtrickle.pgt_dependencies d JOIN pgtrickle.pgt_stream_tables st ON st.pgt_id = d.pgt_id JOIN pg_class c ON c.oid = d.source_relid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE d.source_type = 'TABLE'; /* */ /* */ -- src/lib.rs:593 -- requires: -- parse_duration_seconds -- ERG-E: One-row health summary for dashboards and alerting. CREATE OR REPLACE VIEW pgtrickle.quick_health AS SELECT (SELECT count(*) FROM pgtrickle.pgt_stream_tables)::bigint AS total_stream_tables, (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'ERROR' OR consecutive_errors > 0)::bigint AS error_tables, (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE schedule IS NOT NULL AND schedule !~ '[\s@]' AND last_refresh_at IS NOT NULL AND EXTRACT(EPOCH FROM (now() - last_refresh_at)) > pgtrickle.parse_duration_seconds(schedule))::bigint AS stale_tables, (SELECT count(*) > 0 FROM pg_stat_activity WHERE backend_type = 'pg_trickle scheduler') AS scheduler_running, CASE WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables) = 0 THEN 'EMPTY' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'SUSPENDED') > 0 THEN 'CRITICAL' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'ERROR' OR consecutive_errors > 0) > 0 THEN 'WARNING' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE schedule IS NOT NULL AND schedule !~ '[\s@]' AND last_refresh_at IS NOT NULL AND EXTRACT(EPOCH FROM (now() - last_refresh_at)) > pgtrickle.parse_duration_seconds(schedule)) > 0 THEN 'WARNING' ELSE 'OK' END AS status; /* */ /* */ -- src/api/inbox.rs:888 -- pg_trickle::api::inbox::inbox_ordering_gaps CREATE FUNCTION pgtrickle."inbox_ordering_gaps"( "p_inbox_name" TEXT /* & str */ ) RETURNS TABLE ( "aggregate_id" TEXT, /* String */ "expected_seq" bigint, /* i64 */ "found_seq" bigint /* i64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'inbox_ordering_gaps_wrapper'; /* */ /* */ -- src/api/helpers.rs:2272 -- pg_trickle::api::helpers::convert_buffers_to_unlogged CREATE FUNCTION pgtrickle."convert_buffers_to_unlogged"() RETURNS bigint /* Result < i64, PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'convert_buffers_to_unlogged_wrapper'; /* */ /* */ -- src/diagnostics.rs:395 -- pg_trickle::diagnostics::diagnose_errors CREATE FUNCTION pgtrickle."diagnose_errors"( "name" TEXT /* & str */ ) RETURNS TABLE ( "event_time" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "error_type" TEXT, /* String */ "error_message" TEXT, /* String */ "remediation" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'diagnose_errors_wrapper'; /* */ /* */ -- src/api/mod.rs:900 -- pg_trickle::api::create_or_replace_stream_table CREATE FUNCTION pgtrickle."create_or_replace_stream_table"( "name" TEXT, /* & str */ "query" TEXT, /* & str */ "schedule" TEXT DEFAULT 'calculated', /* Option < & str > */ "refresh_mode" TEXT DEFAULT 'AUTO', /* & str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* Option < & str > */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* Option < & str > */ "cdc_mode" TEXT DEFAULT NULL, /* Option < & str > */ "append_only" bool DEFAULT false, /* bool */ "pooler_compatibility_mode" bool DEFAULT false, /* bool */ "partition_by" TEXT DEFAULT NULL, /* Option < & str > */ "max_differential_joins" INT DEFAULT NULL, /* Option < i32 > */ "max_delta_fraction" double precision DEFAULT NULL /* Option < f64 > */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_or_replace_stream_table_wrapper'; /* */ /* */ -- src/api/cluster.rs:18 -- pg_trickle::api::cluster::cluster_worker_summary CREATE FUNCTION pgtrickle."cluster_worker_summary"() RETURNS TABLE ( "db_oid" bigint, /* Option < i64 > */ "db_name" TEXT, /* Option < String > */ "active_workers" INT, /* Option < i32 > */ "scheduler_pid" INT, /* Option < i32 > */ "scheduler_running" bool, /* Option < bool > */ "total_active_workers" INT /* Option < i32 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'cluster_worker_summary_wrapper'; /* */ /* */ -- src/api/mod.rs:3423 -- pg_trickle::api::alter_stream_table CREATE FUNCTION pgtrickle."alter_stream_table"( "name" TEXT, /* & str */ "query" TEXT DEFAULT NULL, /* Option < & str > */ "schedule" TEXT DEFAULT NULL, /* Option < & str > */ "refresh_mode" TEXT DEFAULT NULL, /* Option < & str > */ "status" TEXT DEFAULT NULL, /* Option < & str > */ "diamond_consistency" TEXT DEFAULT NULL, /* Option < & str > */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* Option < & str > */ "cdc_mode" TEXT DEFAULT NULL, /* Option < & str > */ "append_only" bool DEFAULT NULL, /* Option < bool > */ "pooler_compatibility_mode" bool DEFAULT NULL, /* Option < bool > */ "tier" TEXT DEFAULT NULL, /* Option < & str > */ "fuse" TEXT DEFAULT NULL, /* Option < & str > */ "fuse_ceiling" bigint DEFAULT NULL, /* Option < i64 > */ "fuse_sensitivity" INT DEFAULT NULL, /* Option < i32 > */ "partition_by" TEXT DEFAULT NULL, /* Option < & str > */ "max_differential_joins" INT DEFAULT NULL, /* Option < i32 > */ "max_delta_fraction" double precision DEFAULT NULL /* Option < f64 > */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'alter_stream_table_wrapper'; /* */ /* */ -- src/api/mod.rs:667 -- pg_trickle::api::create_stream_table_if_not_exists CREATE FUNCTION pgtrickle."create_stream_table_if_not_exists"( "name" TEXT, /* & str */ "query" TEXT, /* & str */ "schedule" TEXT DEFAULT 'calculated', /* Option < & str > */ "refresh_mode" TEXT DEFAULT 'AUTO', /* & str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* Option < & str > */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* Option < & str > */ "cdc_mode" TEXT DEFAULT NULL, /* Option < & str > */ "append_only" bool DEFAULT false, /* bool */ "pooler_compatibility_mode" bool DEFAULT false, /* bool */ "partition_by" TEXT DEFAULT NULL, /* Option < & str > */ "max_differential_joins" INT DEFAULT NULL, /* Option < i32 > */ "max_delta_fraction" double precision DEFAULT NULL /* Option < f64 > */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_stream_table_if_not_exists_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1161 -- pg_trickle::api::diagnostics::watermarks CREATE FUNCTION pgtrickle."watermarks"() RETURNS TABLE ( "source_table" TEXT, /* String */ "schema_name" TEXT, /* String */ "watermark" timestamp with time zone, /* TimestampWithTimeZone */ "updated_at" timestamp with time zone, /* TimestampWithTimeZone */ "advanced_by" TEXT, /* Option < String > */ "wal_lsn" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermarks_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1370 -- pg_trickle::api::diagnostics::create_refresh_group CREATE FUNCTION pgtrickle."create_refresh_group"( "group_name" TEXT, /* & str */ "members" TEXT[], /* Vec < String > */ "isolation" TEXT DEFAULT 'read_committed' /* & str */ ) RETURNS INT /* Result < i32, PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_refresh_group_wrapper'; /* */ /* */ -- src/api/helpers.rs:2516 -- pg_trickle::api::helpers::export_definition CREATE FUNCTION pgtrickle."export_definition"( "st_name" TEXT /* & str */ ) RETURNS TEXT /* Result < String, PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'export_definition_wrapper'; /* */ /* */ -- src/lib.rs:710 -- requires: -- export_definition CREATE OR REPLACE FUNCTION pgtrickle."stream_table_definition"( p_name text ) RETURNS text LANGUAGE sql STABLE AS $$ SELECT pgtrickle.export_definition(p_name); $$; COMMENT ON FUNCTION pgtrickle."stream_table_definition"(text) IS 'Return the CREATE STREAM TABLE DDL for the named stream table. ' 'Equivalent to pgtrickle.export_definition(name) — provided as a ' 'more discoverable alias.'; /* */ /* */ -- src/api/diagnostics.rs:928 -- pg_trickle::api::diagnostics::ungate_source CREATE FUNCTION pgtrickle."ungate_source"( "source" TEXT /* & str */ ) RETURNS VOID /* Result < (), PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'ungate_source_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1012 -- pg_trickle::api::diagnostics::bootstrap_gate_status CREATE FUNCTION pgtrickle."bootstrap_gate_status"() RETURNS TABLE ( "source_table" TEXT, /* String */ "schema_name" TEXT, /* String */ "gated" bool, /* bool */ "gated_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "ungated_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "gated_by" TEXT, /* Option < String > */ "gate_duration" interval, /* Option < pgrx :: datum :: Interval > */ "affected_stream_tables" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'bootstrap_gate_status_fn_wrapper'; /* */ /* */ -- src/monitor.rs:1337 -- pg_trickle::monitor::explain_diff_sql CREATE FUNCTION pgtrickle."explain_diff_sql"( "name" TEXT /* & str */ ) RETURNS TEXT /* Option < String > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_diff_sql_wrapper'; /* */ /* */ -- src/monitor.rs:462 -- pg_trickle::monitor::st_refresh_stats CREATE FUNCTION pgtrickle."st_refresh_stats"() RETURNS TABLE ( "pgt_name" TEXT, /* String */ "pgt_schema" TEXT, /* String */ "status" TEXT, /* String */ "refresh_mode" TEXT, /* String */ "is_populated" bool, /* bool */ "total_refreshes" bigint, /* i64 */ "successful_refreshes" bigint, /* i64 */ "failed_refreshes" bigint, /* i64 */ "total_rows_inserted" bigint, /* i64 */ "total_rows_deleted" bigint, /* i64 */ "avg_duration_ms" double precision, /* f64 */ "last_refresh_action" TEXT, /* Option < String > */ "last_refresh_status" TEXT, /* Option < String > */ "last_refresh_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "staleness_secs" double precision, /* Option < f64 > */ "stale" bool, /* bool */ "consecutive_errors" INT, /* i32 */ "schedule" TEXT, /* Option < String > */ "refresh_tier" TEXT, /* String */ "last_error_message" TEXT, /* Option < String > */ "downstream_publication" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_refresh_stats_wrapper'; /* */ /* */ -- src/api/mod.rs:4198 -- pg_trickle::api::refresh_stream_table CREATE FUNCTION pgtrickle."refresh_stream_table"( "name" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_stream_table_wrapper'; /* */ /* */ -- src/lib.rs:670 -- requires: -- refresh_stream_table CREATE OR REPLACE FUNCTION pgtrickle."refresh_if_stale"( p_name text, p_max_age interval DEFAULT '5 minutes'::interval ) RETURNS boolean LANGUAGE plpgsql AS $$ DECLARE v_last_end timestamp with time zone; v_refreshed boolean := false; BEGIN SELECT MAX(end_time) INTO v_last_end FROM pgtrickle.pgt_refresh_history h JOIN pgtrickle.pgt_stream_tables s USING (pgt_id) WHERE s.pgt_name = p_name AND h.status = 'COMPLETED'; IF v_last_end IS NULL OR (now() - v_last_end) > p_max_age THEN PERFORM pgtrickle.refresh_stream_table(p_name); v_refreshed := true; END IF; RETURN v_refreshed; END; $$; COMMENT ON FUNCTION pgtrickle."refresh_if_stale"(text, interval) IS 'Refresh the named stream table only when the most recent completed ' 'refresh is older than max_age. Returns TRUE when a refresh was ' 'triggered, FALSE when the table was fresh enough.'; /* */ /* */ -- src/api/mod.rs:765 -- pg_trickle::api::bulk_create CREATE FUNCTION pgtrickle."bulk_create"( "definitions" jsonb /* pgrx :: JsonB */ ) RETURNS jsonb /* pgrx :: JsonB */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'bulk_create_wrapper'; /* */ /* */ -- src/api/snapshot.rs:282 -- pg_trickle::api::snapshot::list_snapshots CREATE FUNCTION pgtrickle."list_snapshots"( "p_name" TEXT /* & str */ ) RETURNS TABLE ( "snapshot_table" TEXT, /* Option < String > */ "created_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "row_count" bigint, /* Option < i64 > */ "frontier" jsonb, /* Option < pgrx :: JsonB > */ "size_bytes" bigint /* Option < i64 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'list_snapshots_wrapper'; /* */ /* */ -- src/monitor.rs:2167 -- pg_trickle::monitor::list_sources CREATE FUNCTION pgtrickle."list_sources"( "name" TEXT /* & str */ ) RETURNS TABLE ( "source_table" TEXT, /* String */ "source_oid" bigint, /* i64 */ "source_type" TEXT, /* String */ "cdc_mode" TEXT, /* String */ "columns_used" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'list_sources_wrapper'; /* */ /* */ -- src/api/helpers.rs:2633 -- pg_trickle::api::helpers::restore_stream_tables CREATE FUNCTION pgtrickle."restore_stream_tables"() RETURNS VOID /* Result < (), crate :: error :: PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'restore_stream_tables_wrapper'; /* */ /* */ -- src/monitor.rs:3135 -- pg_trickle::monitor::worker_pool_status CREATE FUNCTION pgtrickle."worker_pool_status"() RETURNS TABLE ( "active_workers" INT, /* i32 */ "max_workers" INT, /* i32 */ "per_db_cap" INT, /* i32 */ "parallel_mode" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'worker_pool_status_wrapper'; /* */ /* */ -- src/hooks.rs:51 -- pg_trickle::hooks::pg_trickle_on_ddl_end -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/api/outbox.rs:827 -- pg_trickle::api::outbox::consumer_lag CREATE FUNCTION pgtrickle."consumer_lag"( "p_group" TEXT /* & str */ ) RETURNS TABLE ( "consumer_id" TEXT, /* String */ "committed_offset" bigint, /* i64 */ "max_outbox_id" bigint, /* i64 */ "lag" bigint, /* i64 */ "last_heartbeat_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "is_alive" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'consumer_lag_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:545 -- pg_trickle::api::diagnostics::shared_buffer_stats CREATE FUNCTION pgtrickle."shared_buffer_stats"() RETURNS TABLE ( "source_oid" bigint, /* i64 */ "source_table" TEXT, /* String */ "consumer_count" INT, /* i32 */ "consumers" TEXT, /* String */ "columns_tracked" INT, /* i32 */ "safe_frontier_lsn" TEXT, /* Option < String > */ "buffer_rows" bigint, /* i64 */ "is_partitioned" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'shared_buffer_stats_fn_wrapper'; /* */ /* */ -- src/api/mod.rs:4226 -- pg_trickle::api::write_and_refresh CREATE FUNCTION pgtrickle."write_and_refresh"( "sql" TEXT, /* & str */ "stream_table_name" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'write_and_refresh_wrapper'; /* */ /* */ -- src/api/inbox.rs:970 -- pg_trickle::api::inbox::inbox_is_my_partition CREATE FUNCTION pgtrickle."inbox_is_my_partition"( "p_aggregate_id" TEXT, /* & str */ "p_worker_id" INT, /* i32 */ "p_total_workers" INT /* i32 */ ) RETURNS bool /* bool */ IMMUTABLE STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'inbox_is_my_partition_wrapper'; /* */ /* */ -- src/api/publication.rs:126 -- pg_trickle::api::publication::set_stream_table_sla CREATE FUNCTION pgtrickle."set_stream_table_sla"( "name" TEXT, /* & str */ "sla" interval /* Interval */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'set_stream_table_sla_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:163 -- pg_trickle::api::diagnostics::pgt_status CREATE FUNCTION pgtrickle."pgt_status"() RETURNS TABLE ( "name" TEXT, /* String */ "status" TEXT, /* String */ "refresh_mode" TEXT, /* String */ "is_populated" bool, /* bool */ "consecutive_errors" INT, /* i32 */ "schedule" TEXT, /* Option < String > */ "data_timestamp" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "staleness" interval, /* Option < pgrx :: datum :: Interval > */ "scc_id" INT /* Option < i32 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_status_wrapper'; /* */ /* */ -- src/api/mod.rs:3973 -- pg_trickle::api::drop_stream_table CREATE FUNCTION pgtrickle."drop_stream_table"( "name" TEXT, /* & str */ "cascade" bool DEFAULT false /* bool */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_stream_table_wrapper'; /* */ /* */ -- src/lib.rs:733 -- requires: -- create_stream_table -- drop_stream_table -- alter_stream_table CREATE OR REPLACE FUNCTION pgtrickle."canary_begin"( p_name text, p_new_query text ) RETURNS text LANGUAGE plpgsql AS $$ DECLARE v_schema text; v_table text; v_canary text; v_dot int; BEGIN v_dot := strpos(p_name, '.'); IF v_dot > 0 THEN v_schema := substr(p_name, 1, v_dot - 1); v_table := substr(p_name, v_dot + 1); ELSE v_schema := current_schema(); v_table := p_name; END IF; v_canary := '__pgt_canary_' || v_table; -- Drop any stale canary table from a previous run. BEGIN PERFORM pgtrickle.drop_stream_table(v_schema || '.' || v_canary); EXCEPTION WHEN OTHERS THEN NULL; -- ignore if it does not exist END; -- Create the canary stream table with the new query. PERFORM pgtrickle.create_stream_table( v_schema || '.' || v_canary, p_new_query ); RETURN format( 'Canary stream table %I.%I created. Run pgtrickle.canary_diff(%L) to compare.', v_schema, v_canary, p_name ); END; $$; COMMENT ON FUNCTION pgtrickle."canary_begin"(text, text) IS 'Start a shadow/canary test for the named stream table. ' 'Creates __pgt_canary_ with p_new_query and starts refreshing it. ' 'Use canary_diff(name) to inspect differences and canary_promote(name) to ' 'swap canary into production.'; CREATE OR REPLACE FUNCTION pgtrickle."canary_diff"( p_name text ) RETURNS TABLE( row_source text, diff_row text ) LANGUAGE plpgsql AS $$ DECLARE v_schema text; v_table text; v_canary text; v_dot int; v_sql text; BEGIN v_dot := strpos(p_name, '.'); IF v_dot > 0 THEN v_schema := substr(p_name, 1, v_dot - 1); v_table := substr(p_name, v_dot + 1); ELSE v_schema := current_schema(); v_table := p_name; END IF; v_canary := '__pgt_canary_' || v_table; -- Return rows in live-only vs canary-only using EXCEPT (symmetric difference). v_sql := format( '(SELECT %L AS row_source, t::text AS diff_row FROM %I.%I t EXCEPT SELECT %L, c::text FROM %I.%I c) UNION ALL (SELECT %L, c::text FROM %I.%I c EXCEPT SELECT %L, t::text FROM %I.%I t)', 'live_only', v_schema, v_table, 'canary_only', v_schema, v_canary, 'canary_only', v_schema, v_canary, 'live_only', v_schema, v_table ); RETURN QUERY EXECUTE v_sql; END; $$; COMMENT ON FUNCTION pgtrickle."canary_diff"(text) IS 'Compare the live stream table with its canary counterpart. ' 'Returns rows that exist in only one of the two tables. ' 'An empty result set indicates the new query produces the same output.'; CREATE OR REPLACE FUNCTION pgtrickle."canary_promote"( p_name text ) RETURNS text LANGUAGE plpgsql AS $$ DECLARE v_schema text; v_table text; v_canary text; v_dot int; v_new_query text; BEGIN v_dot := strpos(p_name, '.'); IF v_dot > 0 THEN v_schema := substr(p_name, 1, v_dot - 1); v_table := substr(p_name, v_dot + 1); ELSE v_schema := current_schema(); v_table := p_name; END IF; v_canary := '__pgt_canary_' || v_table; -- Read the defining query from the canary table. SELECT defining_query INTO v_new_query FROM pgtrickle.pgt_stream_tables WHERE pgt_schema = v_schema AND pgt_name = v_canary; IF v_new_query IS NULL THEN RAISE EXCEPTION 'No canary found for %. Run pgtrickle.canary_begin() first.', p_name; END IF; -- Promote: alter the live table to use the new query, then drop the canary. PERFORM pgtrickle.alter_stream_table(v_schema || '.' || v_table, query => v_new_query); BEGIN PERFORM pgtrickle.drop_stream_table(v_schema || '.' || v_canary); EXCEPTION WHEN OTHERS THEN NULL; END; RETURN format( 'Canary promoted: %I.%I now uses the canary query. Canary table dropped.', v_schema, v_table ); END; $$; COMMENT ON FUNCTION pgtrickle."canary_promote"(text) IS 'Promote the canary stream table to production. ' 'Calls ALTER STREAM TABLE with the canary query, then drops the canary table. ' 'Run pgtrickle.canary_diff(name) first to confirm the result set matches.'; /* */ /* */ -- src/api/inbox.rs:615 -- pg_trickle::api::inbox::replay_inbox_messages CREATE FUNCTION pgtrickle."replay_inbox_messages"( "p_name" TEXT, /* & str */ "p_event_ids" TEXT[] /* Vec < String > */ ) RETURNS bigint /* i64 */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'replay_inbox_messages_wrapper'; /* */ /* */ -- src/api/outbox.rs:718 -- pg_trickle::api::outbox::commit_offset CREATE FUNCTION pgtrickle."commit_offset"( "p_group" TEXT, /* & str */ "p_consumer" TEXT, /* & str */ "p_last_offset" bigint /* i64 */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'commit_offset_wrapper'; /* */ /* */ -- src/monitor.rs:2518 -- pg_trickle::monitor::health_check CREATE FUNCTION pgtrickle."health_check"() RETURNS TABLE ( "check_name" TEXT, /* String */ "severity" TEXT, /* String */ "detail" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'health_check_wrapper'; /* */ /* */ -- src/ivm.rs:841 -- pg_trickle::ivm::pgt_ivm_handle_truncate CREATE FUNCTION pgtrickle."pgt_ivm_handle_truncate"( "pgt_id" bigint /* i64 */ ) RETURNS VOID /* Result < (), PgTrickleError > */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_ivm_handle_truncate_wrapper'; /* */ /* */ -- src/api/helpers.rs:2352 -- pg_trickle::api::helpers::recommend_refresh_mode CREATE FUNCTION pgtrickle."recommend_refresh_mode"( "st_name" TEXT DEFAULT NULL /* Option < String > */ ) RETURNS TABLE ( "pgt_schema" TEXT, /* String */ "pgt_name" TEXT, /* String */ "current_mode" TEXT, /* String */ "effective_mode" TEXT, /* Option < String > */ "recommended_mode" TEXT, /* String */ "confidence" TEXT, /* String */ "reason" TEXT, /* String */ "signals" jsonb /* pgrx :: JsonB */ ) LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'recommend_refresh_mode_wrapper'; /* */ /* */ -- src/api/inbox.rs:273 -- pg_trickle::api::inbox::drop_inbox CREATE FUNCTION pgtrickle."drop_inbox"( "p_name" TEXT, /* & str */ "p_if_exists" bool DEFAULT false, /* bool */ "p_cascade" bool DEFAULT false /* bool */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_inbox_wrapper'; /* */ /* */ -- src/monitor.rs:3159 -- pg_trickle::monitor::parallel_job_status CREATE FUNCTION pgtrickle."parallel_job_status"( "max_age_seconds" INT DEFAULT 300 /* i32 */ ) RETURNS TABLE ( "job_id" bigint, /* i64 */ "unit_key" TEXT, /* String */ "unit_kind" TEXT, /* String */ "status" TEXT, /* String */ "member_count" INT, /* i32 */ "attempt_no" INT, /* i32 */ "scheduler_pid" INT, /* i32 */ "worker_pid" INT, /* Option < i32 > */ "enqueued_at" timestamp with time zone, /* TimestampWithTimeZone */ "started_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "finished_at" timestamp with time zone, /* Option < TimestampWithTimeZone > */ "duration_ms" double precision /* Option < f64 > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'parallel_job_status_wrapper'; /* */ /* */ -- src/api/inbox.rs:668 -- pg_trickle::api::inbox::enable_inbox_ordering CREATE FUNCTION pgtrickle."enable_inbox_ordering"( "p_inbox" TEXT, /* & str */ "p_aggregate_id_col" TEXT, /* & str */ "p_sequence_num_col" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'enable_inbox_ordering_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:296 -- pg_trickle::api::diagnostics::explain_refresh_mode CREATE FUNCTION pgtrickle."explain_refresh_mode"( "name" TEXT /* & str */ ) RETURNS TABLE ( "configured_mode" TEXT, /* String */ "effective_mode" TEXT, /* Option < String > */ "downgrade_reason" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_refresh_mode_wrapper'; /* */ /* */ -- src/monitor.rs:971 -- pg_trickle::monitor::pgtrickle_refresh_stats CREATE FUNCTION pgtrickle."pgtrickle_refresh_stats"() RETURNS TABLE ( "stream_table" TEXT, /* String */ "mode" TEXT, /* String */ "avg_ms" double precision, /* f64 */ "p95_ms" double precision, /* f64 */ "p99_ms" double precision, /* f64 */ "refresh_count" bigint, /* i64 */ "last_refresh_at" timestamp with time zone /* Option < TimestampWithTimeZone > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgtrickle_refresh_stats_wrapper'; /* */ /* */ -- src/diagnostics.rs:767 -- pg_trickle::diagnostics::validate_query CREATE FUNCTION pgtrickle."validate_query"( "query" TEXT /* & str */ ) RETURNS TABLE ( "check_name" TEXT, /* String */ "result" TEXT, /* String */ "severity" TEXT /* String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'validate_query_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:516 -- pg_trickle::api::diagnostics::dedup_stats CREATE FUNCTION pgtrickle."dedup_stats"() RETURNS TABLE ( "total_diff_refreshes" bigint, /* i64 */ "dedup_needed" bigint, /* i64 */ "dedup_ratio_pct" double precision /* f64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'dedup_stats_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:407 -- pg_trickle::api::diagnostics::explain_delta CREATE FUNCTION pgtrickle."explain_delta"( "name" TEXT, /* & str */ "format" TEXT DEFAULT 'text' /* & str */ ) RETURNS SETOF TEXT /* String */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_delta_text_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:109 -- pg_trickle::api::diagnostics::rebuild_cdc_triggers CREATE FUNCTION pgtrickle."rebuild_cdc_triggers"() RETURNS TEXT /* & '_ str */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'rebuild_cdc_triggers_wrapper'; /* */ /* */ -- src/api/snapshot.rs:184 -- pg_trickle::api::snapshot::restore_from_snapshot CREATE FUNCTION pgtrickle."restore_from_snapshot"( "p_name" TEXT, /* & str */ "p_source" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'restore_from_snapshot_wrapper'; /* */ /* */ -- src/api/inbox.rs:787 -- pg_trickle::api::inbox::enable_inbox_priority CREATE FUNCTION pgtrickle."enable_inbox_priority"( "p_inbox" TEXT, /* & str */ "p_priority_col" TEXT, /* & str */ "p_tiers" jsonb DEFAULT NULL /* Option < pgrx :: JsonB > */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'enable_inbox_priority_wrapper'; /* */ /* */ -- src/monitor.rs:2943 -- pg_trickle::monitor::refresh_timeline CREATE FUNCTION pgtrickle."refresh_timeline"( "max_rows" INT DEFAULT 50 /* i32 */ ) RETURNS TABLE ( "start_time" timestamp with time zone, /* TimestampWithTimeZone */ "stream_table" TEXT, /* String */ "action" TEXT, /* String */ "status" TEXT, /* String */ "rows_inserted" bigint, /* i64 */ "rows_deleted" bigint, /* i64 */ "duration_ms" double precision, /* Option < f64 > */ "error_message" TEXT /* Option < String > */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_timeline_wrapper'; /* */ /* */ -- src/api/mod.rs:4156 -- pg_trickle::api::resume_stream_table CREATE FUNCTION pgtrickle."resume_stream_table"( "name" TEXT /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'resume_stream_table_wrapper'; /* */ /* */ -- src/monitor.rs:1598 -- pg_trickle::monitor::check_cdc_health CREATE FUNCTION pgtrickle."check_cdc_health"() RETURNS TABLE ( "source_relid" bigint, /* i64 */ "source_table" TEXT, /* String */ "cdc_mode" TEXT, /* String */ "slot_name" TEXT, /* Option < String > */ "lag_bytes" bigint, /* Option < i64 > */ "confirmed_lsn" TEXT, /* Option < String > */ "alert" TEXT, /* Option < String > */ "selective_capture" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'check_cdc_health_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:667 -- pg_trickle::api::diagnostics::reset_fuse CREATE FUNCTION pgtrickle."reset_fuse"( "name" TEXT, /* & str */ "action" TEXT DEFAULT 'apply' /* & str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'reset_fuse_wrapper'; /* */ /* */ -- src/monitor.rs:929 -- pg_trickle::monitor::cache_stats CREATE FUNCTION pgtrickle."cache_stats"() RETURNS TABLE ( "l1_hits" bigint, /* i64 */ "l2_hits" bigint, /* i64 */ "misses" bigint, /* i64 */ "evictions" bigint, /* i64 */ "l1_size" INT /* i32 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'cache_stats_wrapper'; /* */ /* */ -- src/lib.rs:1018 -- requires: -- _signal_launcher_rescan -- finalize SELECT pgtrickle._signal_launcher_rescan(); /* */