/* */ /* This file is auto generated by pgrx. The ordering of items is not stable, it is driven by a dependency graph. */ /* */ /* */ -- src/lib.rs:213 -- bootstrap -- Extension schemas CREATE SCHEMA IF NOT EXISTS pgtrickle; CREATE SCHEMA IF NOT EXISTS pgtrickle_changes; -- F51: Restrict change buffer schema access to prevent unauthorized -- injection of bogus changes that would be applied on next refresh. REVOKE ALL ON SCHEMA pgtrickle_changes FROM PUBLIC; -- User-declared refresh groups for snapshot consistency CREATE TABLE IF NOT EXISTS pgtrickle.pgt_refresh_groups ( group_id SERIAL PRIMARY KEY, group_name TEXT NOT NULL UNIQUE, member_oids OID[] NOT NULL, isolation TEXT NOT NULL DEFAULT 'read_committed' CHECK (isolation IN ('read_committed', 'repeatable_read')), created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- Core ST metadata CREATE TABLE IF NOT EXISTS pgtrickle.pgt_stream_tables ( pgt_id BIGSERIAL PRIMARY KEY, pgt_relid OID NOT NULL UNIQUE, pgt_name TEXT NOT NULL, pgt_schema TEXT NOT NULL, defining_query TEXT NOT NULL, original_query TEXT, schedule TEXT, refresh_mode TEXT NOT NULL DEFAULT 'DIFFERENTIAL' CHECK (refresh_mode IN ('FULL', 'DIFFERENTIAL', 'IMMEDIATE')), status TEXT NOT NULL DEFAULT 'INITIALIZING' CHECK (status IN ('INITIALIZING', 'ACTIVE', 'SUSPENDED', 'ERROR')), is_populated BOOLEAN NOT NULL DEFAULT FALSE, data_timestamp TIMESTAMPTZ, frontier JSONB, last_refresh_at TIMESTAMPTZ, consecutive_errors INT NOT NULL DEFAULT 0, needs_reinit BOOLEAN NOT NULL DEFAULT FALSE, auto_threshold DOUBLE PRECISION, last_full_ms DOUBLE PRECISION, functions_used TEXT[], topk_limit INT, topk_order_by TEXT, topk_offset INT, diamond_consistency TEXT NOT NULL DEFAULT 'atomic' CHECK (diamond_consistency IN ('none', 'atomic')), diamond_schedule_policy TEXT NOT NULL DEFAULT 'fastest' CHECK (diamond_schedule_policy IN ('fastest', 'slowest')), has_keyless_source BOOLEAN NOT NULL DEFAULT FALSE, function_hashes TEXT, requested_cdc_mode TEXT CHECK (requested_cdc_mode IN ('auto', 'trigger', 'wal')), is_append_only BOOLEAN NOT NULL DEFAULT FALSE, scc_id INT, last_fixpoint_iterations INT, max_differential_joins INT, max_delta_fraction DOUBLE PRECISION, pooler_compatibility_mode BOOLEAN NOT NULL DEFAULT FALSE, refresh_tier TEXT NOT NULL DEFAULT 'hot' CHECK (refresh_tier IN ('hot', 'warm', 'cold', 'frozen')), effective_refresh_mode TEXT, fuse_mode TEXT NOT NULL DEFAULT 'off' CHECK (fuse_mode IN ('off', 'on', 'auto')), fuse_state TEXT NOT NULL DEFAULT 'armed' CHECK (fuse_state IN ('armed', 'blown', 'disabled')), fuse_ceiling BIGINT, fuse_sensitivity INT, blown_at TIMESTAMPTZ, blow_reason TEXT, last_error_message TEXT, last_error_at TIMESTAMPTZ, downstream_publication_name TEXT, freshness_deadline_ms BIGINT, st_partition_key TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_pgt_status ON pgtrickle.pgt_stream_tables (status); CREATE UNIQUE INDEX IF NOT EXISTS idx_pgt_name ON pgtrickle.pgt_stream_tables (pgt_schema, pgt_name); -- PERF-4: Scheduler hot‐path lookup by relation OID. CREATE INDEX IF NOT EXISTS idx_pgt_relid ON pgtrickle.pgt_stream_tables (pgt_relid); -- DAG edges CREATE TABLE IF NOT EXISTS pgtrickle.pgt_dependencies ( pgt_id BIGINT NOT NULL REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, source_relid OID NOT NULL, source_type TEXT NOT NULL CHECK (source_type IN ('TABLE', 'STREAM_TABLE', 'VIEW', 'MATVIEW', 'FOREIGN_TABLE')), columns_used TEXT[], column_snapshot JSONB, schema_fingerprint TEXT, cdc_mode TEXT NOT NULL DEFAULT 'TRIGGER' CHECK (cdc_mode IN ('TRIGGER', 'TRANSITIONING', 'WAL')), slot_name TEXT, decoder_confirmed_lsn PG_LSN, transition_started_at TIMESTAMPTZ, PRIMARY KEY (pgt_id, source_relid) ); CREATE INDEX IF NOT EXISTS idx_deps_source ON pgtrickle.pgt_dependencies (source_relid); -- PERF-4: Fast lookup by pgt_id (non‐PK prefix for multi‐column PK). CREATE INDEX IF NOT EXISTS idx_deps_pgt_id ON pgtrickle.pgt_dependencies (pgt_id); -- Refresh history / audit log CREATE TABLE IF NOT EXISTS pgtrickle.pgt_refresh_history ( refresh_id BIGSERIAL PRIMARY KEY, pgt_id BIGINT NOT NULL REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, data_timestamp TIMESTAMPTZ NOT NULL, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ, action TEXT NOT NULL CHECK (action IN ('NO_DATA', 'FULL', 'DIFFERENTIAL', 'REINITIALIZE', 'SKIP')), rows_inserted BIGINT DEFAULT 0, rows_deleted BIGINT DEFAULT 0, delta_row_count BIGINT DEFAULT 0, merge_strategy_used TEXT, was_full_fallback BOOLEAN NOT NULL DEFAULT FALSE, error_message TEXT, status TEXT NOT NULL CHECK (status IN ('RUNNING', 'COMPLETED', 'FAILED', 'SKIPPED')), initiated_by TEXT CHECK (initiated_by IN ('SCHEDULER', 'MANUAL', 'INITIAL', 'SELF_MONITOR')), freshness_deadline TIMESTAMPTZ, tick_watermark_lsn PG_LSN, fixpoint_iteration INT ); CREATE INDEX IF NOT EXISTS idx_hist_pgt_ts ON pgtrickle.pgt_refresh_history (pgt_id, data_timestamp); -- PERF-1: Fast lookup by (pgt_id, start_time) for self-monitoring and scheduler_overhead queries. CREATE INDEX IF NOT EXISTS idx_hist_pgt_start ON pgtrickle.pgt_refresh_history (pgt_id, start_time); -- Per-source CDC slot tracking CREATE TABLE IF NOT EXISTS pgtrickle.pgt_change_tracking ( source_relid OID PRIMARY KEY, slot_name TEXT NOT NULL, last_consumed_lsn PG_LSN, tracked_by_pgt_ids BIGINT[] ); -- Scheduler job table for parallel refresh dispatch CREATE TABLE IF NOT EXISTS pgtrickle.pgt_scheduler_jobs ( job_id BIGSERIAL PRIMARY KEY, dag_version BIGINT NOT NULL, unit_key TEXT NOT NULL, unit_kind TEXT NOT NULL CHECK (unit_kind IN ('singleton', 'atomic_group', 'immediate_closure', 'cyclic_scc', 'repeatable_read_group', 'fused_chain')), member_pgt_ids BIGINT[] NOT NULL, root_pgt_id BIGINT NOT NULL, status TEXT NOT NULL DEFAULT 'QUEUED' CHECK (status IN ('QUEUED', 'RUNNING', 'SUCCEEDED', 'RETRYABLE_FAILED', 'PERMANENT_FAILED', 'CANCELLED')), scheduler_pid INT NOT NULL, worker_pid INT, attempt_no INT NOT NULL DEFAULT 1, enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, outcome_detail TEXT, retryable BOOLEAN ); CREATE INDEX IF NOT EXISTS idx_sched_jobs_status_enqueued ON pgtrickle.pgt_scheduler_jobs (status, enqueued_at); CREATE INDEX IF NOT EXISTS idx_sched_jobs_unit_status ON pgtrickle.pgt_scheduler_jobs (unit_key, status); CREATE INDEX IF NOT EXISTS idx_sched_jobs_finished ON pgtrickle.pgt_scheduler_jobs (finished_at) WHERE finished_at IS NOT NULL; -- Bootstrap source gates (v0.5.0, Phase 3) -- Records which source tables are currently "gated" (bootstrapping in progress). -- When a source is gated, all stream tables that depend on it are skipped by -- the scheduler until pgtrickle.ungate_source() is called. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_source_gates ( source_relid OID PRIMARY KEY, gated BOOLEAN NOT NULL DEFAULT true, gated_at TIMESTAMPTZ NOT NULL DEFAULT now(), ungated_at TIMESTAMPTZ, gated_by TEXT ); -- Per-source watermark state: tracks how far each external source has been loaded. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_watermarks ( source_relid OID PRIMARY KEY, watermark TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), advanced_by TEXT, wal_lsn_at_advance TEXT ); -- Watermark groups: declare that a set of sources must be temporally aligned. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_watermark_groups ( group_id SERIAL PRIMARY KEY, group_name TEXT UNIQUE NOT NULL, source_relids OID[] NOT NULL, tolerance_secs DOUBLE PRECISION NOT NULL DEFAULT 0, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); -- DB-3: Schema version tracking table. -- Records which schema migration versions have been applied to this database. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_schema_version ( version TEXT PRIMARY KEY, applied_at TIMESTAMPTZ NOT NULL DEFAULT now(), description TEXT ); INSERT INTO pgtrickle.pgt_schema_version (version, description) VALUES ('0.19.0', 'Initial schema version tracking') ON CONFLICT (version) DO NOTHING; SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_stream_tables', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_dependencies', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_source_gates', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_watermarks', ''); SELECT pg_catalog.pg_extension_config_dump('pgtrickle.pgt_watermark_groups', ''); -- G14-SHC: Shared template cache (catalog-backed, UNLOGGED) CREATE UNLOGGED TABLE IF NOT EXISTS pgtrickle.pgt_template_cache ( pgt_id BIGINT PRIMARY KEY REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, query_hash BIGINT NOT NULL, delta_sql TEXT NOT NULL, columns TEXT[] NOT NULL, source_oids INTEGER[] NOT NULL, is_dedup BOOLEAN NOT NULL DEFAULT FALSE, key_changed BOOLEAN NOT NULL DEFAULT FALSE, all_algebraic BOOLEAN NOT NULL DEFAULT FALSE, cached_at TIMESTAMPTZ NOT NULL DEFAULT now() ); /* */ /* */ -- src/lib.rs:475 -- Create event trigger functions with correct RETURNS event_trigger type. -- pgrx's #[pg_extern] generates RETURNS void, which PostgreSQL rejects for -- event triggers. We create them manually here with the correct return type. CREATE FUNCTION pgtrickle."_on_ddl_end"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_ddl_end_wrapper'; CREATE FUNCTION pgtrickle."_on_sql_drop"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_sql_drop_wrapper'; -- Event trigger: track ALTER TABLE on upstream sources CREATE EVENT TRIGGER pg_trickle_ddl_tracker ON ddl_command_end EXECUTE FUNCTION pgtrickle._on_ddl_end(); -- Event trigger: track DROP TABLE on upstream sources / ST storage tables CREATE EVENT TRIGGER pg_trickle_drop_tracker ON sql_drop EXECUTE FUNCTION pgtrickle._on_sql_drop(); /* */ /* */ -- src/lib.rs:632 CREATE OR REPLACE FUNCTION pgtrickle."pause_all"() RETURNS void LANGUAGE plpgsql AS $$ BEGIN UPDATE pgtrickle.pgt_stream_tables SET status = 'PAUSED' WHERE status = 'ACTIVE'; RAISE NOTICE 'pg_trickle: all stream tables paused.'; END; $$; COMMENT ON FUNCTION pgtrickle."pause_all"() IS 'Pause automatic refreshes for every ACTIVE stream table. ' 'Use pgtrickle.resume_all() to re-activate them.'; CREATE OR REPLACE FUNCTION pgtrickle."resume_all"() RETURNS void LANGUAGE plpgsql AS $$ BEGIN UPDATE pgtrickle.pgt_stream_tables SET status = 'ACTIVE' WHERE status = 'PAUSED'; RAISE NOTICE 'pg_trickle: all paused stream tables resumed.'; END; $$; COMMENT ON FUNCTION pgtrickle."resume_all"() IS 'Re-activate all stream tables that were paused with pgtrickle.pause_all().'; /* */ /* */ -- src/lib.rs:50 CREATE SCHEMA IF NOT EXISTS pgtrickle; /* pg_trickle::pgtrickle */ /* */ /* */ -- src/api/publication.rs:16 -- pg_trickle::api::publication::stream_table_to_publication CREATE FUNCTION pgtrickle."stream_table_to_publication"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'stream_table_to_publication_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:223 -- pg_trickle::api::diagnostics::pgt_scc_status CREATE FUNCTION pgtrickle."pgt_scc_status"() RETURNS TABLE ( "scc_id" INT, /* i32 */ "member_count" INT, /* i32 */ "members" TEXT[], /* alloc::vec::Vec */ "last_iterations" INT, /* core::option::Option */ "last_converged_at" timestamp with time zone /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_scc_status_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1214 -- pg_trickle::api::diagnostics::watermark_groups CREATE FUNCTION pgtrickle."watermark_groups"() RETURNS TABLE ( "group_name" TEXT, /* alloc::string::String */ "source_count" INT, /* i32 */ "tolerance_secs" double precision, /* f64 */ "created_at" timestamp with time zone /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermark_groups_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:61 -- pg_trickle::api::diagnostics::migrate CREATE FUNCTION pgtrickle."migrate"() RETURNS TEXT /* alloc::string::String */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'migrate_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1153 -- pg_trickle::api::diagnostics::drop_watermark_group CREATE FUNCTION pgtrickle."drop_watermark_group"( "group_name" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_watermark_group_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1249 -- pg_trickle::api::diagnostics::watermark_status CREATE FUNCTION pgtrickle."watermark_status"() RETURNS TABLE ( "group_name" TEXT, /* alloc::string::String */ "min_watermark" timestamp with time zone, /* core::option::Option */ "max_watermark" timestamp with time zone, /* core::option::Option */ "lag_secs" double precision, /* core::option::Option */ "aligned" bool, /* bool */ "sources_with_watermark" INT, /* i32 */ "sources_total" INT /* i32 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermark_status_fn_wrapper'; /* */ /* */ -- src/api/mod.rs:3259 -- pg_trickle::api::alter_stream_table CREATE FUNCTION pgtrickle."alter_stream_table"( "name" TEXT, /* &str */ "query" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "schedule" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "status" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT NULL, /* core::option::Option */ "pooler_compatibility_mode" bool DEFAULT NULL, /* core::option::Option */ "tier" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "fuse" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "fuse_ceiling" bigint DEFAULT NULL, /* core::option::Option */ "fuse_sensitivity" INT DEFAULT NULL, /* core::option::Option */ "partition_by" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "max_differential_joins" INT DEFAULT NULL, /* core::option::Option */ "max_delta_fraction" double precision DEFAULT NULL /* core::option::Option */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'alter_stream_table_wrapper'; /* */ /* */ -- src/api/helpers.rs:2633 -- pg_trickle::api::helpers::restore_stream_tables CREATE FUNCTION pgtrickle."restore_stream_tables"() RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'restore_stream_tables_wrapper'; /* */ /* */ -- src/api/helpers.rs:2272 -- pg_trickle::api::helpers::convert_buffers_to_unlogged CREATE FUNCTION pgtrickle."convert_buffers_to_unlogged"() RETURNS bigint /* core::result::Result */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'convert_buffers_to_unlogged_wrapper'; /* */ /* */ -- src/api/mod.rs:460 -- pg_trickle::api::create_stream_table CREATE FUNCTION pgtrickle."create_stream_table"( "name" TEXT, /* &str */ "query" TEXT, /* &str */ "schedule" TEXT DEFAULT 'calculated', /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT 'AUTO', /* &str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT false, /* bool */ "pooler_compatibility_mode" bool DEFAULT false, /* bool */ "partition_by" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "max_differential_joins" INT DEFAULT NULL, /* core::option::Option */ "max_delta_fraction" double precision DEFAULT NULL /* core::option::Option */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_stream_table_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:667 -- pg_trickle::api::diagnostics::reset_fuse CREATE FUNCTION pgtrickle."reset_fuse"( "name" TEXT, /* &str */ "action" TEXT DEFAULT 'apply' /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'reset_fuse_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:453 -- pg_trickle::api::self_monitoring::scheduler_overhead CREATE FUNCTION pgtrickle."scheduler_overhead"() RETURNS TABLE ( "total_refreshes_1h" bigint, /* i64 */ "df_refreshes_1h" bigint, /* i64 */ "df_refresh_fraction" double precision, /* core::option::Option */ "avg_refresh_ms" double precision, /* core::option::Option */ "avg_df_refresh_ms" double precision, /* core::option::Option */ "total_refresh_time_s" double precision, /* core::option::Option */ "df_refresh_time_s" double precision /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'scheduler_overhead_wrapper'; /* */ /* */ -- src/monitor.rs:2073 -- pg_trickle::monitor::change_buffer_sizes CREATE FUNCTION pgtrickle."change_buffer_sizes"() RETURNS TABLE ( "stream_table" TEXT, /* alloc::string::String */ "source_table" TEXT, /* alloc::string::String */ "source_oid" bigint, /* i64 */ "cdc_mode" TEXT, /* alloc::string::String */ "pending_rows" bigint, /* i64 */ "buffer_bytes" bigint /* i64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'change_buffer_sizes_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:812 -- pg_trickle::api::diagnostics::diamond_groups CREATE FUNCTION pgtrickle."diamond_groups"() RETURNS TABLE ( "group_id" INT, /* i32 */ "member_name" TEXT, /* alloc::string::String */ "member_schema" TEXT, /* alloc::string::String */ "is_convergence" bool, /* bool */ "epoch" bigint, /* i64 */ "schedule_policy" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'diamond_groups_wrapper'; /* */ /* */ -- src/monitor.rs:3013 -- pg_trickle::monitor::trigger_inventory CREATE FUNCTION pgtrickle."trigger_inventory"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "source_oid" bigint, /* i64 */ "trigger_name" TEXT, /* alloc::string::String */ "trigger_type" TEXT, /* alloc::string::String */ "present" bool, /* bool */ "enabled" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'trigger_inventory_wrapper'; /* */ /* */ -- src/monitor.rs:955 -- pg_trickle::monitor::pgtrickle_refresh_stats CREATE FUNCTION pgtrickle."pgtrickle_refresh_stats"() RETURNS TABLE ( "stream_table" TEXT, /* alloc::string::String */ "mode" TEXT, /* alloc::string::String */ "avg_ms" double precision, /* f64 */ "p95_ms" double precision, /* f64 */ "p99_ms" double precision, /* f64 */ "refresh_count" bigint, /* i64 */ "last_refresh_at" timestamp with time zone /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgtrickle_refresh_stats_wrapper'; /* */ /* */ -- src/monitor.rs:2151 -- pg_trickle::monitor::list_sources CREATE FUNCTION pgtrickle."list_sources"( "name" TEXT /* &str */ ) RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "source_oid" bigint, /* i64 */ "source_type" TEXT, /* alloc::string::String */ "cdc_mode" TEXT, /* alloc::string::String */ "columns_used" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'list_sources_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:358 -- pg_trickle::api::self_monitoring::self_monitoring_status CREATE FUNCTION pgtrickle."self_monitoring_status"() RETURNS TABLE ( "st_name" TEXT, /* alloc::string::String */ "exists" bool, /* bool */ "status" TEXT, /* core::option::Option */ "refresh_mode" TEXT, /* core::option::Option */ "last_refresh_at" TEXT, /* core::option::Option */ "total_refreshes" bigint /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'self_monitoring_status_wrapper'; /* */ /* */ -- src/monitor.rs:913 -- pg_trickle::monitor::cache_stats CREATE FUNCTION pgtrickle."cache_stats"() RETURNS TABLE ( "l1_hits" bigint, /* i64 */ "l2_hits" bigint, /* i64 */ "misses" bigint, /* i64 */ "evictions" bigint, /* i64 */ "l1_size" INT /* i32 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'cache_stats_wrapper'; /* */ /* */ -- src/api/helpers.rs:2516 -- pg_trickle::api::helpers::export_definition CREATE FUNCTION pgtrickle."export_definition"( "st_name" TEXT /* &str */ ) RETURNS TEXT /* core::result::Result */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'export_definition_wrapper'; /* */ /* */ -- src/lib.rs:710 -- requires: -- export_definition CREATE OR REPLACE FUNCTION pgtrickle."stream_table_definition"( p_name text ) RETURNS text LANGUAGE sql STABLE AS $$ SELECT pgtrickle.export_definition(p_name); $$; COMMENT ON FUNCTION pgtrickle."stream_table_definition"(text) IS 'Return the CREATE STREAM TABLE DDL for the named stream table. ' 'Equivalent to pgtrickle.export_definition(name) — provided as a ' 'more discoverable alias.'; /* */ /* */ -- src/hooks.rs:51 -- pg_trickle::hooks::pg_trickle_on_ddl_end -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/monitor.rs:740 -- pg_trickle::monitor::get_refresh_history CREATE FUNCTION pgtrickle."get_refresh_history"( "name" TEXT, /* &str */ "max_rows" INT DEFAULT 20 /* i32 */ ) RETURNS TABLE ( "refresh_id" bigint, /* i64 */ "data_timestamp" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "start_time" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "end_time" timestamp with time zone, /* core::option::Option */ "action" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "rows_inserted" bigint, /* i64 */ "rows_deleted" bigint, /* i64 */ "duration_ms" double precision, /* core::option::Option */ "error_message" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_refresh_history_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1116 -- pg_trickle::api::diagnostics::create_watermark_group CREATE FUNCTION pgtrickle."create_watermark_group"( "group_name" TEXT, /* &str */ "sources" TEXT[], /* alloc::vec::Vec */ "tolerance_secs" double precision DEFAULT 0.0 /* f64 */ ) RETURNS INT /* core::result::Result */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_watermark_group_wrapper'; /* */ /* */ -- src/monitor.rs:2233 -- pg_trickle::monitor::dependency_tree CREATE FUNCTION pgtrickle."dependency_tree"() RETURNS TABLE ( "tree_line" TEXT, /* alloc::string::String */ "node" TEXT, /* alloc::string::String */ "node_type" TEXT, /* alloc::string::String */ "depth" INT, /* i32 */ "status" TEXT, /* core::option::Option */ "refresh_mode" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'dependency_tree_wrapper'; /* */ /* */ -- src/monitor.rs:2795 -- pg_trickle::monitor::health_summary CREATE FUNCTION pgtrickle."health_summary"() RETURNS TABLE ( "total_stream_tables" INT, /* i32 */ "active_count" INT, /* i32 */ "error_count" INT, /* i32 */ "suspended_count" INT, /* i32 */ "stale_count" INT, /* i32 */ "reinit_pending" INT, /* i32 */ "max_staleness_seconds" double precision, /* core::option::Option */ "scheduler_status" TEXT, /* alloc::string::String */ "cache_hit_rate" double precision /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'health_summary_wrapper'; /* */ /* */ -- src/ivm.rs:841 -- pg_trickle::ivm::pgt_ivm_handle_truncate CREATE FUNCTION pgtrickle."pgt_ivm_handle_truncate"( "pgt_id" bigint /* i64 */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_ivm_handle_truncate_wrapper'; /* */ /* */ -- src/hash.rs:33 -- pg_trickle::hash::pg_trickle_hash CREATE FUNCTION pgtrickle."pg_trickle_hash"( "input" TEXT /* core::option::Option<&str> */ ) RETURNS bigint /* i64 */ IMMUTABLE PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:516 -- pg_trickle::api::diagnostics::dedup_stats CREATE FUNCTION pgtrickle."dedup_stats"() RETURNS TABLE ( "total_diff_refreshes" bigint, /* i64 */ "dedup_needed" bigint, /* i64 */ "dedup_ratio_pct" double precision /* f64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'dedup_stats_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:407 -- pg_trickle::api::diagnostics::explain_delta CREATE FUNCTION pgtrickle."explain_delta"( "name" TEXT, /* &str */ "format" TEXT DEFAULT 'text' /* &str */ ) RETURNS SETOF TEXT /* alloc::string::String */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_delta_text_wrapper'; /* */ /* */ -- src/monitor.rs:3119 -- pg_trickle::monitor::worker_pool_status CREATE FUNCTION pgtrickle."worker_pool_status"() RETURNS TABLE ( "active_workers" INT, /* i32 */ "max_workers" INT, /* i32 */ "per_db_cap" INT, /* i32 */ "parallel_mode" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'worker_pool_status_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:305 -- pg_trickle::api::self_monitoring::teardown_self_monitoring CREATE FUNCTION pgtrickle."teardown_self_monitoring"() RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'teardown_self_monitoring_wrapper'; /* */ /* */ -- src/ivm.rs:535 -- pg_trickle::ivm::pgt_ivm_apply_delta CREATE FUNCTION pgtrickle."pgt_ivm_apply_delta"( "pgt_id" bigint, /* i64 */ "source_oid" INT, /* i32 */ "has_new" bool, /* bool */ "has_old" bool /* bool */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_ivm_apply_delta_wrapper'; /* */ /* */ -- src/monitor.rs:1321 -- pg_trickle::monitor::explain_diff_sql CREATE FUNCTION pgtrickle."explain_diff_sql"( "name" TEXT /* &str */ ) RETURNS TEXT /* core::option::Option */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_diff_sql_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1161 -- pg_trickle::api::diagnostics::watermarks CREATE FUNCTION pgtrickle."watermarks"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "schema_name" TEXT, /* alloc::string::String */ "watermark" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "updated_at" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "advanced_by" TEXT, /* core::option::Option */ "wal_lsn" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermarks_fn_wrapper'; /* */ /* */ -- src/api/publication.rs:73 -- pg_trickle::api::publication::drop_stream_table_publication CREATE FUNCTION pgtrickle."drop_stream_table_publication"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_stream_table_publication_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:566 -- pg_trickle::api::self_monitoring::explain_dag CREATE FUNCTION pgtrickle."explain_dag"( "format" TEXT DEFAULT 'mermaid' /* core::option::Option<&str> */ ) RETURNS TEXT /* core::option::Option */ LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_dag_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:951 -- pg_trickle::api::diagnostics::source_gates CREATE FUNCTION pgtrickle."source_gates"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "schema_name" TEXT, /* alloc::string::String */ "gated" bool, /* bool */ "gated_at" timestamp with time zone, /* core::option::Option */ "ungated_at" timestamp with time zone, /* core::option::Option */ "gated_by" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'source_gates_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:154 -- pg_trickle::api::diagnostics::parse_duration_seconds CREATE FUNCTION pgtrickle."parse_duration_seconds"( "input" TEXT /* &str */ ) RETURNS bigint /* core::option::Option */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'parse_duration_seconds_wrapper'; /* */ /* */ -- src/lib.rs:593 -- requires: -- parse_duration_seconds -- ERG-E: One-row health summary for dashboards and alerting. CREATE OR REPLACE VIEW pgtrickle.quick_health AS SELECT (SELECT count(*) FROM pgtrickle.pgt_stream_tables)::bigint AS total_stream_tables, (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'ERROR' OR consecutive_errors > 0)::bigint AS error_tables, (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE schedule IS NOT NULL AND schedule !~ '[\s@]' AND last_refresh_at IS NOT NULL AND EXTRACT(EPOCH FROM (now() - last_refresh_at)) > pgtrickle.parse_duration_seconds(schedule))::bigint AS stale_tables, (SELECT count(*) > 0 FROM pg_stat_activity WHERE backend_type = 'pg_trickle scheduler') AS scheduler_running, CASE WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables) = 0 THEN 'EMPTY' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'SUSPENDED') > 0 THEN 'CRITICAL' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'ERROR' OR consecutive_errors > 0) > 0 THEN 'WARNING' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE schedule IS NOT NULL AND schedule !~ '[\s@]' AND last_refresh_at IS NOT NULL AND EXTRACT(EPOCH FROM (now() - last_refresh_at)) > pgtrickle.parse_duration_seconds(schedule)) > 0 THEN 'WARNING' ELSE 'OK' END AS status; /* */ /* */ -- src/lib.rs:505 -- requires: -- parse_duration_seconds -- Convenience view: pg_stat_stream_tables -- Combines catalog metadata with aggregate refresh statistics. CREATE OR REPLACE VIEW pgtrickle.pg_stat_stream_tables AS SELECT st.pgt_id, st.pgt_schema, st.pgt_name, st.status, st.refresh_mode, st.is_populated, st.data_timestamp, st.schedule, now() - st.last_refresh_at AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.last_refresh_at IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.last_refresh_at)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, st.consecutive_errors, st.needs_reinit, st.last_refresh_at, COALESCE(stats.total_refreshes, 0) AS total_refreshes, COALESCE(stats.successful_refreshes, 0) AS successful_refreshes, COALESCE(stats.failed_refreshes, 0) AS failed_refreshes, COALESCE(stats.total_rows_inserted, 0) AS total_rows_inserted, COALESCE(stats.total_rows_deleted, 0) AS total_rows_deleted, stats.avg_duration_ms, stats.last_action, stats.last_status, (SELECT array_agg(DISTINCT d.cdc_mode ORDER BY d.cdc_mode) FROM pgtrickle.pgt_dependencies d WHERE d.pgt_id = st.pgt_id AND d.source_type = 'TABLE') AS cdc_modes, st.scc_id, st.last_fixpoint_iterations, st.refresh_tier FROM pgtrickle.pgt_stream_tables st LEFT JOIN LATERAL ( SELECT count(*)::bigint AS total_refreshes, count(*) FILTER (WHERE h.status = 'COMPLETED')::bigint AS successful_refreshes, count(*) FILTER (WHERE h.status = 'FAILED')::bigint AS failed_refreshes, COALESCE(sum(h.rows_inserted), 0)::bigint AS total_rows_inserted, COALESCE(sum(h.rows_deleted), 0)::bigint AS total_rows_deleted, CASE WHEN count(*) FILTER (WHERE h.end_time IS NOT NULL) > 0 THEN avg(EXTRACT(EPOCH FROM (h.end_time - h.start_time)) * 1000) FILTER (WHERE h.end_time IS NOT NULL) ELSE NULL END::float8 AS avg_duration_ms, (SELECT h2.action FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_action, (SELECT h2.status FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_status, (SELECT h2.initiated_by FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_initiated_by, (SELECT h2.freshness_deadline FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS freshness_deadline FROM pgtrickle.pgt_refresh_history h WHERE h.pgt_id = st.pgt_id ) stats ON true; -- Per-source CDC status view (G5): exposes cdc_mode, slot names, and -- transition timestamps for every TABLE dependency of a stream table. CREATE OR REPLACE VIEW pgtrickle.pgt_cdc_status AS SELECT st.pgt_schema, st.pgt_name, d.source_relid, c.relname AS source_name, n.nspname AS source_schema, d.cdc_mode, d.slot_name, d.decoder_confirmed_lsn, d.transition_started_at FROM pgtrickle.pgt_dependencies d JOIN pgtrickle.pgt_stream_tables st ON st.pgt_id = d.pgt_id JOIN pg_class c ON c.oid = d.source_relid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE d.source_type = 'TABLE'; /* */ /* */ -- src/lib.rs:453 -- requires: -- parse_duration_seconds -- Status overview view (ERR-1d: last_error_message and last_error_at are -- included via st.* from pgt_stream_tables) CREATE OR REPLACE VIEW pgtrickle.stream_tables_info AS SELECT st.*, now() - st.last_refresh_at AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.last_refresh_at)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, CASE WHEN st.topk_limit IS NOT NULL THEN TRUE ELSE FALSE END AS is_topk FROM pgtrickle.pgt_stream_tables st; /* */ /* */ -- src/api/diagnostics.rs:1594 -- pg_trickle::api::diagnostics::clear_caches CREATE FUNCTION pgtrickle."clear_caches"() RETURNS bigint /* i64 */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'clear_caches_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:296 -- pg_trickle::api::diagnostics::explain_refresh_mode CREATE FUNCTION pgtrickle."explain_refresh_mode"( "name" TEXT /* &str */ ) RETURNS TABLE ( "configured_mode" TEXT, /* alloc::string::String */ "effective_mode" TEXT, /* core::option::Option */ "downgrade_reason" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_refresh_mode_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:741 -- pg_trickle::api::diagnostics::fuse_status CREATE FUNCTION pgtrickle."fuse_status"() RETURNS TABLE ( "stream_table" TEXT, /* alloc::string::String */ "fuse_mode" TEXT, /* alloc::string::String */ "fuse_state" TEXT, /* alloc::string::String */ "fuse_ceiling" bigint, /* core::option::Option */ "effective_ceiling" bigint, /* core::option::Option */ "fuse_sensitivity" INT, /* core::option::Option */ "blown_at" timestamp with time zone, /* core::option::Option */ "blow_reason" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'fuse_status_wrapper'; /* */ /* */ -- src/monitor.rs:1032 -- pg_trickle::monitor::explain_st CREATE FUNCTION pgtrickle."explain_st"( "name" TEXT, /* &str */ "with_analyze" bool DEFAULT false /* bool */ ) RETURNS TABLE ( "property" TEXT, /* alloc::string::String */ "value" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_st_wrapper'; /* */ /* */ -- src/monitor.rs:867 -- pg_trickle::monitor::get_staleness CREATE FUNCTION pgtrickle."get_staleness"( "name" TEXT /* &str */ ) RETURNS double precision /* core::option::Option */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_staleness_wrapper'; /* */ /* */ -- src/diagnostics.rs:49 -- pg_trickle::diagnostics::explain_query_rewrite CREATE FUNCTION pgtrickle."explain_query_rewrite"( "query" TEXT /* &str */ ) RETURNS TABLE ( "pass_name" TEXT, /* alloc::string::String */ "changed" bool, /* bool */ "sql_after" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_query_rewrite_wrapper'; /* */ /* */ -- src/monitor.rs:2502 -- pg_trickle::monitor::health_check CREATE FUNCTION pgtrickle."health_check"() RETURNS TABLE ( "check_name" TEXT, /* alloc::string::String */ "severity" TEXT, /* alloc::string::String */ "detail" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'health_check_wrapper'; /* */ /* */ -- src/monitor.rs:3143 -- pg_trickle::monitor::parallel_job_status CREATE FUNCTION pgtrickle."parallel_job_status"( "max_age_seconds" INT DEFAULT 300 /* i32 */ ) RETURNS TABLE ( "job_id" bigint, /* i64 */ "unit_key" TEXT, /* alloc::string::String */ "unit_kind" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "member_count" INT, /* i32 */ "attempt_no" INT, /* i32 */ "scheduler_pid" INT, /* i32 */ "worker_pid" INT, /* core::option::Option */ "enqueued_at" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "started_at" timestamp with time zone, /* core::option::Option */ "finished_at" timestamp with time zone, /* core::option::Option */ "duration_ms" double precision /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'parallel_job_status_wrapper'; /* */ /* */ -- src/api/helpers.rs:2457 -- pg_trickle::api::helpers::refresh_efficiency CREATE FUNCTION pgtrickle."refresh_efficiency"() RETURNS TABLE ( "pgt_schema" TEXT, /* alloc::string::String */ "pgt_name" TEXT, /* alloc::string::String */ "refresh_mode" TEXT, /* alloc::string::String */ "total_refreshes" bigint, /* i64 */ "diff_count" bigint, /* i64 */ "full_count" bigint, /* i64 */ "avg_diff_ms" double precision, /* core::option::Option */ "avg_full_ms" double precision, /* core::option::Option */ "avg_change_ratio" double precision, /* core::option::Option */ "diff_speedup" TEXT, /* core::option::Option */ "last_refresh_at" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_efficiency_wrapper'; /* */ /* */ -- src/api/publication.rs:126 -- pg_trickle::api::publication::set_stream_table_sla CREATE FUNCTION pgtrickle."set_stream_table_sla"( "name" TEXT, /* &str */ "sla" interval /* pgrx::datetime::interval::Interval */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'set_stream_table_sla_wrapper'; /* */ /* */ -- src/hash.rs:54 -- pg_trickle::hash::pg_trickle_hash_multi CREATE FUNCTION pgtrickle."pg_trickle_hash_multi"( "inputs" TEXT[] /* alloc::vec::Vec> */ ) RETURNS bigint /* i64 */ IMMUTABLE PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_multi_wrapper'; /* */ /* */ -- src/monitor.rs:890 -- pg_trickle::monitor::slot_health CREATE FUNCTION pgtrickle."slot_health"() RETURNS TABLE ( "slot_name" TEXT, /* alloc::string::String */ "source_relid" bigint, /* i64 */ "active" bool, /* bool */ "retained_wal_bytes" bigint, /* i64 */ "wal_status" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'slot_health_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:928 -- pg_trickle::api::diagnostics::ungate_source CREATE FUNCTION pgtrickle."ungate_source"( "source" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'ungate_source_wrapper'; /* */ /* */ -- src/api/mod.rs:4062 -- pg_trickle::api::write_and_refresh CREATE FUNCTION pgtrickle."write_and_refresh"( "sql" TEXT, /* &str */ "stream_table_name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'write_and_refresh_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1440 -- pg_trickle::api::diagnostics::drop_refresh_group CREATE FUNCTION pgtrickle."drop_refresh_group"( "group_name" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_refresh_group_wrapper'; /* */ /* */ -- src/api/mod.rs:503 -- pg_trickle::api::create_stream_table_if_not_exists CREATE FUNCTION pgtrickle."create_stream_table_if_not_exists"( "name" TEXT, /* &str */ "query" TEXT, /* &str */ "schedule" TEXT DEFAULT 'calculated', /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT 'AUTO', /* &str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT false, /* bool */ "pooler_compatibility_mode" bool DEFAULT false, /* bool */ "partition_by" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "max_differential_joins" INT DEFAULT NULL, /* core::option::Option */ "max_delta_fraction" double precision DEFAULT NULL /* core::option::Option */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_stream_table_if_not_exists_wrapper'; /* */ /* */ -- src/api/mod.rs:736 -- pg_trickle::api::create_or_replace_stream_table CREATE FUNCTION pgtrickle."create_or_replace_stream_table"( "name" TEXT, /* &str */ "query" TEXT, /* &str */ "schedule" TEXT DEFAULT 'calculated', /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT 'AUTO', /* &str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT false, /* bool */ "pooler_compatibility_mode" bool DEFAULT false, /* bool */ "partition_by" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "max_differential_joins" INT DEFAULT NULL, /* core::option::Option */ "max_delta_fraction" double precision DEFAULT NULL /* core::option::Option */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_or_replace_stream_table_wrapper'; /* */ /* */ -- src/api/mod.rs:601 -- pg_trickle::api::bulk_create CREATE FUNCTION pgtrickle."bulk_create"( "definitions" jsonb /* pgrx::datum::json::JsonB */ ) RETURNS jsonb /* pgrx::datum::json::JsonB */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'bulk_create_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:19 -- pg_trickle::api::diagnostics::version_check CREATE FUNCTION pgtrickle."version_check"() RETURNS TEXT /* alloc::string::String */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'version_check_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1501 -- pg_trickle::api::diagnostics::worker_allocation_status CREATE FUNCTION pgtrickle."worker_allocation_status"() RETURNS TABLE ( "db_name" TEXT, /* alloc::string::String */ "workers_used" bigint, /* i64 */ "workers_quota" bigint, /* i64 */ "workers_queued" bigint, /* i64 */ "cluster_active" bigint, /* i64 */ "cluster_max" bigint /* i64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'worker_allocation_status_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1370 -- pg_trickle::api::diagnostics::create_refresh_group CREATE FUNCTION pgtrickle."create_refresh_group"( "group_name" TEXT, /* &str */ "members" TEXT[], /* alloc::vec::Vec */ "isolation" TEXT DEFAULT 'read_committed' /* &str */ ) RETURNS INT /* core::result::Result */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_refresh_group_wrapper'; /* */ /* */ -- src/api/helpers.rs:2352 -- pg_trickle::api::helpers::recommend_refresh_mode CREATE FUNCTION pgtrickle."recommend_refresh_mode"( "st_name" TEXT DEFAULT NULL /* core::option::Option */ ) RETURNS TABLE ( "pgt_schema" TEXT, /* alloc::string::String */ "pgt_name" TEXT, /* alloc::string::String */ "current_mode" TEXT, /* alloc::string::String */ "effective_mode" TEXT, /* core::option::Option */ "recommended_mode" TEXT, /* alloc::string::String */ "confidence" TEXT, /* alloc::string::String */ "reason" TEXT, /* alloc::string::String */ "signals" jsonb /* pgrx::datum::json::JsonB */ ) LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'recommend_refresh_mode_wrapper'; /* */ /* */ -- src/api/self_monitoring.rs:240 -- pg_trickle::api::self_monitoring::setup_self_monitoring CREATE FUNCTION pgtrickle."setup_self_monitoring"() RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'setup_self_monitoring_wrapper'; /* */ /* */ -- src/diagnostics.rs:570 -- pg_trickle::diagnostics::list_auxiliary_columns CREATE FUNCTION pgtrickle."list_auxiliary_columns"( "name" TEXT /* &str */ ) RETURNS TABLE ( "column_name" TEXT, /* alloc::string::String */ "data_type" TEXT, /* alloc::string::String */ "purpose" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'list_auxiliary_columns_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1012 -- pg_trickle::api::diagnostics::bootstrap_gate_status CREATE FUNCTION pgtrickle."bootstrap_gate_status"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "schema_name" TEXT, /* alloc::string::String */ "gated" bool, /* bool */ "gated_at" timestamp with time zone, /* core::option::Option */ "ungated_at" timestamp with time zone, /* core::option::Option */ "gated_by" TEXT, /* core::option::Option */ "gate_duration" interval, /* core::option::Option */ "affected_stream_tables" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'bootstrap_gate_status_fn_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:163 -- pg_trickle::api::diagnostics::pgt_status CREATE FUNCTION pgtrickle."pgt_status"() RETURNS TABLE ( "name" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "refresh_mode" TEXT, /* alloc::string::String */ "is_populated" bool, /* bool */ "consecutive_errors" INT, /* i32 */ "schedule" TEXT, /* core::option::Option */ "data_timestamp" timestamp with time zone, /* core::option::Option */ "staleness" interval, /* core::option::Option */ "scc_id" INT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_status_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:545 -- pg_trickle::api::diagnostics::shared_buffer_stats CREATE FUNCTION pgtrickle."shared_buffer_stats"() RETURNS TABLE ( "source_oid" bigint, /* i64 */ "source_table" TEXT, /* alloc::string::String */ "consumer_count" INT, /* i32 */ "consumers" TEXT, /* alloc::string::String */ "columns_tracked" INT, /* i32 */ "safe_frontier_lsn" TEXT, /* core::option::Option */ "buffer_rows" bigint, /* i64 */ "is_partitioned" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'shared_buffer_stats_fn_wrapper'; /* */ /* */ -- src/monitor.rs:2927 -- pg_trickle::monitor::refresh_timeline CREATE FUNCTION pgtrickle."refresh_timeline"( "max_rows" INT DEFAULT 50 /* i32 */ ) RETURNS TABLE ( "start_time" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "stream_table" TEXT, /* alloc::string::String */ "action" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "rows_inserted" bigint, /* i64 */ "rows_deleted" bigint, /* i64 */ "duration_ms" double precision, /* core::option::Option */ "error_message" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_timeline_wrapper'; /* */ /* */ -- src/hooks.rs:1028 -- pg_trickle::hooks::pg_trickle_on_sql_drop -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/api/mod.rs:3809 -- pg_trickle::api::drop_stream_table CREATE FUNCTION pgtrickle."drop_stream_table"( "name" TEXT, /* &str */ "cascade" bool DEFAULT false /* bool */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_stream_table_wrapper'; /* */ /* */ -- src/lib.rs:733 -- requires: -- create_stream_table -- drop_stream_table -- alter_stream_table CREATE OR REPLACE FUNCTION pgtrickle."canary_begin"( p_name text, p_new_query text ) RETURNS text LANGUAGE plpgsql AS $$ DECLARE v_schema text; v_table text; v_canary text; v_dot int; BEGIN v_dot := strpos(p_name, '.'); IF v_dot > 0 THEN v_schema := substr(p_name, 1, v_dot - 1); v_table := substr(p_name, v_dot + 1); ELSE v_schema := current_schema(); v_table := p_name; END IF; v_canary := '__pgt_canary_' || v_table; -- Drop any stale canary table from a previous run. BEGIN PERFORM pgtrickle.drop_stream_table(v_schema || '.' || v_canary); EXCEPTION WHEN OTHERS THEN NULL; -- ignore if it does not exist END; -- Create the canary stream table with the new query. PERFORM pgtrickle.create_stream_table( v_schema || '.' || v_canary, p_new_query ); RETURN format( 'Canary stream table %I.%I created. Run pgtrickle.canary_diff(%L) to compare.', v_schema, v_canary, p_name ); END; $$; COMMENT ON FUNCTION pgtrickle."canary_begin"(text, text) IS 'Start a shadow/canary test for the named stream table. ' 'Creates __pgt_canary_ with p_new_query and starts refreshing it. ' 'Use canary_diff(name) to inspect differences and canary_promote(name) to ' 'swap canary into production.'; CREATE OR REPLACE FUNCTION pgtrickle."canary_diff"( p_name text ) RETURNS TABLE( row_source text, diff_row text ) LANGUAGE plpgsql AS $$ DECLARE v_schema text; v_table text; v_canary text; v_dot int; v_sql text; BEGIN v_dot := strpos(p_name, '.'); IF v_dot > 0 THEN v_schema := substr(p_name, 1, v_dot - 1); v_table := substr(p_name, v_dot + 1); ELSE v_schema := current_schema(); v_table := p_name; END IF; v_canary := '__pgt_canary_' || v_table; -- Return rows in live-only vs canary-only using EXCEPT (symmetric difference). v_sql := format( '(SELECT %L AS row_source, t::text AS diff_row FROM %I.%I t EXCEPT SELECT %L, c::text FROM %I.%I c) UNION ALL (SELECT %L, c::text FROM %I.%I c EXCEPT SELECT %L, t::text FROM %I.%I t)', 'live_only', v_schema, v_table, 'canary_only', v_schema, v_canary, 'canary_only', v_schema, v_canary, 'live_only', v_schema, v_table ); RETURN QUERY EXECUTE v_sql; END; $$; COMMENT ON FUNCTION pgtrickle."canary_diff"(text) IS 'Compare the live stream table with its canary counterpart. ' 'Returns rows that exist in only one of the two tables. ' 'An empty result set indicates the new query produces the same output.'; CREATE OR REPLACE FUNCTION pgtrickle."canary_promote"( p_name text ) RETURNS text LANGUAGE plpgsql AS $$ DECLARE v_schema text; v_table text; v_canary text; v_dot int; v_new_query text; BEGIN v_dot := strpos(p_name, '.'); IF v_dot > 0 THEN v_schema := substr(p_name, 1, v_dot - 1); v_table := substr(p_name, v_dot + 1); ELSE v_schema := current_schema(); v_table := p_name; END IF; v_canary := '__pgt_canary_' || v_table; -- Read the defining query from the canary table. SELECT defining_query INTO v_new_query FROM pgtrickle.pgt_stream_tables WHERE pgt_schema = v_schema AND pgt_name = v_canary; IF v_new_query IS NULL THEN RAISE EXCEPTION 'No canary found for %. Run pgtrickle.canary_begin() first.', p_name; END IF; -- Promote: alter the live table to use the new query, then drop the canary. PERFORM pgtrickle.alter_stream_table(v_schema || '.' || v_table, query => v_new_query); BEGIN PERFORM pgtrickle.drop_stream_table(v_schema || '.' || v_canary); EXCEPTION WHEN OTHERS THEN NULL; END; RETURN format( 'Canary promoted: %I.%I now uses the canary query. Canary table dropped.', v_schema, v_table ); END; $$; COMMENT ON FUNCTION pgtrickle."canary_promote"(text) IS 'Promote the canary stream table to production. ' 'Calls ALTER STREAM TABLE with the canary query, then drops the canary table. ' 'Run pgtrickle.canary_diff(name) first to confirm the result set matches.'; /* */ /* */ -- src/api/diagnostics.rs:905 -- pg_trickle::api::diagnostics::gate_source CREATE FUNCTION pgtrickle."gate_source"( "source" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'gate_source_wrapper'; /* */ /* */ -- src/api/mod.rs:4034 -- pg_trickle::api::refresh_stream_table CREATE FUNCTION pgtrickle."refresh_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_stream_table_wrapper'; /* */ /* */ -- src/lib.rs:670 -- requires: -- refresh_stream_table CREATE OR REPLACE FUNCTION pgtrickle."refresh_if_stale"( p_name text, p_max_age interval DEFAULT '5 minutes'::interval ) RETURNS boolean LANGUAGE plpgsql AS $$ DECLARE v_last_end timestamp with time zone; v_refreshed boolean := false; BEGIN SELECT MAX(end_time) INTO v_last_end FROM pgtrickle.pgt_refresh_history h JOIN pgtrickle.pgt_stream_tables s USING (pgt_id) WHERE s.pgt_name = p_name AND h.status = 'COMPLETED'; IF v_last_end IS NULL OR (now() - v_last_end) > p_max_age THEN PERFORM pgtrickle.refresh_stream_table(p_name); v_refreshed := true; END IF; RETURN v_refreshed; END; $$; COMMENT ON FUNCTION pgtrickle."refresh_if_stale"(text, interval) IS 'Refresh the named stream table only when the most recent completed ' 'refresh is older than max_age. Returns TRUE when a refresh was ' 'triggered, FALSE when the table was fresh enough.'; /* */ /* */ -- src/api/diagnostics.rs:1084 -- pg_trickle::api::diagnostics::advance_watermark CREATE FUNCTION pgtrickle."advance_watermark"( "source" TEXT, /* &str */ "watermark" timestamp with time zone /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'advance_watermark_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:109 -- pg_trickle::api::diagnostics::rebuild_cdc_triggers CREATE FUNCTION pgtrickle."rebuild_cdc_triggers"() RETURNS TEXT /* &str */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'rebuild_cdc_triggers_wrapper'; /* */ /* */ -- src/monitor.rs:458 -- pg_trickle::monitor::st_refresh_stats CREATE FUNCTION pgtrickle."st_refresh_stats"() RETURNS TABLE ( "pgt_name" TEXT, /* alloc::string::String */ "pgt_schema" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "refresh_mode" TEXT, /* alloc::string::String */ "is_populated" bool, /* bool */ "total_refreshes" bigint, /* i64 */ "successful_refreshes" bigint, /* i64 */ "failed_refreshes" bigint, /* i64 */ "total_rows_inserted" bigint, /* i64 */ "total_rows_deleted" bigint, /* i64 */ "avg_duration_ms" double precision, /* f64 */ "last_refresh_action" TEXT, /* core::option::Option */ "last_refresh_status" TEXT, /* core::option::Option */ "last_refresh_at" timestamp with time zone, /* core::option::Option */ "staleness_secs" double precision, /* core::option::Option */ "stale" bool, /* bool */ "consecutive_errors" INT, /* i32 */ "schedule" TEXT, /* core::option::Option */ "refresh_tier" TEXT, /* alloc::string::String */ "last_error_message" TEXT, /* core::option::Option */ "downstream_publication" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_refresh_stats_wrapper'; /* */ /* */ -- src/monitor.rs:848 -- pg_trickle::monitor::st_auto_threshold CREATE FUNCTION pgtrickle."st_auto_threshold"( "name" TEXT /* &str */ ) RETURNS double precision /* core::option::Option */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_auto_threshold_wrapper'; /* */ /* */ -- src/api/mod.rs:3992 -- pg_trickle::api::resume_stream_table CREATE FUNCTION pgtrickle."resume_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'resume_stream_table_wrapper'; /* */ /* */ -- src/monitor.rs:1582 -- pg_trickle::monitor::check_cdc_health CREATE FUNCTION pgtrickle."check_cdc_health"() RETURNS TABLE ( "source_relid" bigint, /* i64 */ "source_table" TEXT, /* alloc::string::String */ "cdc_mode" TEXT, /* alloc::string::String */ "slot_name" TEXT, /* core::option::Option */ "lag_bytes" bigint, /* core::option::Option */ "confirmed_lsn" TEXT, /* core::option::Option */ "alert" TEXT, /* core::option::Option */ "selective_capture" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'check_cdc_health_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:8 -- pg_trickle::api::diagnostics::version CREATE FUNCTION pgtrickle."version"() RETURNS TEXT /* &str */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'version_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:95 -- pg_trickle::api::diagnostics::_signal_launcher_rescan CREATE FUNCTION pgtrickle."_signal_launcher_rescan"() RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', '_signal_launcher_rescan_wrapper'; /* */ /* */ -- src/diagnostics.rs:395 -- pg_trickle::diagnostics::diagnose_errors CREATE FUNCTION pgtrickle."diagnose_errors"( "name" TEXT /* &str */ ) RETURNS TABLE ( "event_time" timestamp with time zone, /* core::option::Option */ "error_type" TEXT, /* alloc::string::String */ "error_message" TEXT, /* alloc::string::String */ "remediation" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'diagnose_errors_wrapper'; /* */ /* */ -- src/diagnostics.rs:767 -- pg_trickle::diagnostics::validate_query CREATE FUNCTION pgtrickle."validate_query"( "query" TEXT /* &str */ ) RETURNS TABLE ( "check_name" TEXT, /* alloc::string::String */ "result" TEXT, /* alloc::string::String */ "severity" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'validate_query_wrapper'; /* */ /* */ -- src/api/diagnostics.rs:1452 -- pg_trickle::api::diagnostics::refresh_groups CREATE FUNCTION pgtrickle."refresh_groups"() RETURNS TABLE ( "group_id" INT, /* i32 */ "group_name" TEXT, /* alloc::string::String */ "member_count" INT, /* i32 */ "isolation" TEXT, /* alloc::string::String */ "created_at" timestamp with time zone /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_groups_fn_wrapper'; /* */ /* */ -- src/lib.rs:901 -- requires: -- _signal_launcher_rescan -- finalize SELECT pgtrickle._signal_launcher_rescan(); /* */