-- pg_trickle v0.1.3 — real installable SQL baseline for upgrade testing. -- -- This file is derived from the pgrx-generated v0.2.0 SQL with the 11 -- functions added in v0.2.0 removed. The catalog table structure is -- identical to v0.2.0 because upgrade E2E tests run against the v0.2.1 -- binary (which requires the current catalog schema). -- -- Functions present in v0.1.3 (18 total): -- create_stream_table, alter_stream_table, drop_stream_table, -- resume_stream_table, refresh_stream_table, parse_duration_seconds, -- pgt_status, st_refresh_stats, get_refresh_history, st_auto_threshold, -- get_staleness, slot_health, explain_st, check_cdc_health, -- pg_trickle_hash, pg_trickle_hash_multi, _on_ddl_end, _on_sql_drop -- -- Functions added in v0.2.0 (NOT present here): -- list_sources, change_buffer_sizes, _signal_launcher_rescan, -- refresh_timeline, pgt_ivm_handle_truncate, health_check, -- pgt_ivm_apply_delta, trigger_inventory, version, -- dependency_tree, diamond_groups -- -- Used by: tests/build_e2e_upgrade_image.sh (upgrade E2E image) -- Also used by: scripts/check_upgrade_completeness.sh (function diff baseline) /* */ /* This file is auto generated by pgrx. The ordering of items is not stable, it is driven by a dependency graph. */ /* */ /* */ -- src/lib.rs:84 -- bootstrap -- Extension schemas CREATE SCHEMA IF NOT EXISTS pgtrickle; CREATE SCHEMA IF NOT EXISTS pgtrickle_changes; -- F51: Restrict change buffer schema access to prevent unauthorized -- injection of bogus changes that would be applied on next refresh. REVOKE ALL ON SCHEMA pgtrickle_changes FROM PUBLIC; -- Core ST metadata CREATE TABLE IF NOT EXISTS pgtrickle.pgt_stream_tables ( pgt_id BIGSERIAL PRIMARY KEY, pgt_relid OID NOT NULL UNIQUE, pgt_name TEXT NOT NULL, pgt_schema TEXT NOT NULL, defining_query TEXT NOT NULL, original_query TEXT, schedule TEXT, refresh_mode TEXT NOT NULL DEFAULT 'DIFFERENTIAL' CHECK (refresh_mode IN ('FULL', 'DIFFERENTIAL', 'IMMEDIATE')), status TEXT NOT NULL DEFAULT 'INITIALIZING' CHECK (status IN ('INITIALIZING', 'ACTIVE', 'SUSPENDED', 'ERROR')), is_populated BOOLEAN NOT NULL DEFAULT FALSE, data_timestamp TIMESTAMPTZ, frontier JSONB, last_refresh_at TIMESTAMPTZ, consecutive_errors INT NOT NULL DEFAULT 0, needs_reinit BOOLEAN NOT NULL DEFAULT FALSE, auto_threshold DOUBLE PRECISION, last_full_ms DOUBLE PRECISION, functions_used TEXT[], topk_limit INT, topk_order_by TEXT, diamond_consistency TEXT NOT NULL DEFAULT 'none' CHECK (diamond_consistency IN ('none', 'atomic')), diamond_schedule_policy TEXT NOT NULL DEFAULT 'fastest' CHECK (diamond_schedule_policy IN ('fastest', 'slowest')), created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_pgt_status ON pgtrickle.pgt_stream_tables (status); CREATE UNIQUE INDEX IF NOT EXISTS idx_pgt_name ON pgtrickle.pgt_stream_tables (pgt_schema, pgt_name); -- DAG edges CREATE TABLE IF NOT EXISTS pgtrickle.pgt_dependencies ( pgt_id BIGINT NOT NULL REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, source_relid OID NOT NULL, source_type TEXT NOT NULL CHECK (source_type IN ('TABLE', 'STREAM_TABLE', 'VIEW', 'MATVIEW', 'FOREIGN_TABLE')), columns_used TEXT[], column_snapshot JSONB, schema_fingerprint TEXT, cdc_mode TEXT NOT NULL DEFAULT 'TRIGGER' CHECK (cdc_mode IN ('TRIGGER', 'TRANSITIONING', 'WAL')), slot_name TEXT, decoder_confirmed_lsn PG_LSN, transition_started_at TIMESTAMPTZ, PRIMARY KEY (pgt_id, source_relid) ); CREATE INDEX IF NOT EXISTS idx_deps_source ON pgtrickle.pgt_dependencies (source_relid); -- Refresh history / audit log CREATE TABLE IF NOT EXISTS pgtrickle.pgt_refresh_history ( refresh_id BIGSERIAL PRIMARY KEY, pgt_id BIGINT NOT NULL, data_timestamp TIMESTAMPTZ NOT NULL, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ, action TEXT NOT NULL CHECK (action IN ('NO_DATA', 'FULL', 'DIFFERENTIAL', 'DIFFERENTIAL', 'REINITIALIZE', 'SKIP')), rows_inserted BIGINT DEFAULT 0, rows_deleted BIGINT DEFAULT 0, delta_row_count BIGINT DEFAULT 0, merge_strategy_used TEXT, was_full_fallback BOOLEAN NOT NULL DEFAULT FALSE, error_message TEXT, status TEXT NOT NULL CHECK (status IN ('RUNNING', 'COMPLETED', 'FAILED', 'SKIPPED')), initiated_by TEXT CHECK (initiated_by IN ('SCHEDULER', 'MANUAL', 'INITIAL')), freshness_deadline TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_hist_pgt_ts ON pgtrickle.pgt_refresh_history (pgt_id, data_timestamp); -- Per-source CDC slot tracking CREATE TABLE IF NOT EXISTS pgtrickle.pgt_change_tracking ( source_relid OID PRIMARY KEY, slot_name TEXT NOT NULL, last_consumed_lsn PG_LSN, tracked_by_pgt_ids BIGINT[] ); /* */ /* */ -- src/lib.rs:207 -- Create event trigger functions with correct RETURNS event_trigger type. -- pgrx's #[pg_extern] generates RETURNS void, which PostgreSQL rejects for -- event triggers. We create them manually here with the correct return type. CREATE FUNCTION pgtrickle."_on_ddl_end"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_ddl_end_wrapper'; CREATE FUNCTION pgtrickle."_on_sql_drop"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_sql_drop_wrapper'; -- Event trigger: track ALTER TABLE on upstream sources CREATE EVENT TRIGGER pg_trickle_ddl_tracker ON ddl_command_end EXECUTE FUNCTION pgtrickle._on_ddl_end(); -- Event trigger: track DROP TABLE on upstream sources / ST storage tables CREATE EVENT TRIGGER pg_trickle_drop_tracker ON sql_drop EXECUTE FUNCTION pgtrickle._on_sql_drop(); /* */ /* */ -- src/lib.rs:44 CREATE SCHEMA IF NOT EXISTS pgtrickle; /* pg_trickle::pgtrickle */ /* */ /* */ -- src/hash.rs:24 -- pg_trickle::hash::pg_trickle_hash_multi CREATE FUNCTION pgtrickle."pg_trickle_hash_multi"( "inputs" TEXT[] /* alloc::vec::Vec> */ ) RETURNS bigint /* i64 */ IMMUTABLE PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_multi_wrapper'; /* */ /* */ -- src/monitor.rs:648 -- pg_trickle::monitor::check_cdc_health CREATE FUNCTION pgtrickle."check_cdc_health"() RETURNS TABLE ( "source_relid" bigint, /* i64 */ "source_table" TEXT, /* alloc::string::String */ "cdc_mode" TEXT, /* alloc::string::String */ "slot_name" TEXT, /* core::option::Option */ "lag_bytes" bigint, /* core::option::Option */ "confirmed_lsn" TEXT, /* core::option::Option */ "alert" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'check_cdc_health_wrapper'; /* */ /* */ -- src/hooks.rs:51 -- pg_trickle::hooks::pg_trickle_on_ddl_end -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/api.rs:872 -- pg_trickle::api::refresh_stream_table CREATE FUNCTION pgtrickle."refresh_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_stream_table_wrapper'; /* */ /* */ -- src/api.rs:31 -- pg_trickle::api::create_stream_table CREATE FUNCTION pgtrickle."create_stream_table"( "name" TEXT, /* &str */ "query" TEXT, /* &str */ "schedule" TEXT DEFAULT '1m', /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT 'DIFFERENTIAL', /* &str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL /* core::option::Option<&str> */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_stream_table_wrapper'; /* */ /* */ -- src/monitor.rs:545 -- pg_trickle::monitor::explain_st CREATE FUNCTION pgtrickle."explain_st"( "name" TEXT /* &str */ ) RETURNS TABLE ( "property" TEXT, /* alloc::string::String */ "value" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_st_wrapper'; /* */ /* */ -- src/monitor.rs:439 -- pg_trickle::monitor::get_staleness CREATE FUNCTION pgtrickle."get_staleness"( "name" TEXT /* &str */ ) RETURNS double precision /* core::option::Option */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_staleness_wrapper'; /* */ /* */ -- src/api.rs:758 -- pg_trickle::api::drop_stream_table CREATE FUNCTION pgtrickle."drop_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_stream_table_wrapper'; /* */ /* */ -- src/api.rs:1190 -- pg_trickle::api::pgt_status CREATE FUNCTION pgtrickle."pgt_status"() RETURNS TABLE ( "name" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "refresh_mode" TEXT, /* alloc::string::String */ "is_populated" bool, /* bool */ "consecutive_errors" INT, /* i32 */ "schedule" TEXT, /* core::option::Option */ "data_timestamp" timestamp with time zone, /* core::option::Option */ "staleness" interval /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_status_wrapper'; /* */ /* */ -- src/monitor.rs:196 -- pg_trickle::monitor::st_refresh_stats CREATE FUNCTION pgtrickle."st_refresh_stats"() RETURNS TABLE ( "pgt_name" TEXT, /* alloc::string::String */ "pgt_schema" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "refresh_mode" TEXT, /* alloc::string::String */ "is_populated" bool, /* bool */ "total_refreshes" bigint, /* i64 */ "successful_refreshes" bigint, /* i64 */ "failed_refreshes" bigint, /* i64 */ "total_rows_inserted" bigint, /* i64 */ "total_rows_deleted" bigint, /* i64 */ "avg_duration_ms" double precision, /* f64 */ "last_refresh_action" TEXT, /* core::option::Option */ "last_refresh_status" TEXT, /* core::option::Option */ "last_refresh_at" timestamp with time zone, /* core::option::Option */ "staleness_secs" double precision, /* core::option::Option */ "stale" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_refresh_stats_wrapper'; /* */ /* */ -- src/hash.rs:13 -- pg_trickle::hash::pg_trickle_hash CREATE FUNCTION pgtrickle."pg_trickle_hash"( "input" TEXT /* &str */ ) RETURNS bigint /* i64 */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_wrapper'; /* */ /* */ -- src/api.rs:831 -- pg_trickle::api::resume_stream_table CREATE FUNCTION pgtrickle."resume_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'resume_stream_table_wrapper'; /* */ /* */ -- src/api.rs:522 -- pg_trickle::api::alter_stream_table CREATE FUNCTION pgtrickle."alter_stream_table"( "name" TEXT, /* &str */ "schedule" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "status" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL /* core::option::Option<&str> */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'alter_stream_table_wrapper'; /* */ /* */ -- src/monitor.rs:420 -- pg_trickle::monitor::st_auto_threshold CREATE FUNCTION pgtrickle."st_auto_threshold"( "name" TEXT /* &str */ ) RETURNS double precision /* core::option::Option */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_auto_threshold_wrapper'; /* */ /* */ -- src/monitor.rs:323 -- pg_trickle::monitor::get_refresh_history CREATE FUNCTION pgtrickle."get_refresh_history"( "name" TEXT, /* &str */ "max_rows" INT DEFAULT 20 /* i32 */ ) RETURNS TABLE ( "refresh_id" bigint, /* i64 */ "data_timestamp" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "start_time" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "end_time" timestamp with time zone, /* core::option::Option */ "action" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "rows_inserted" bigint, /* i64 */ "rows_deleted" bigint, /* i64 */ "duration_ms" double precision, /* core::option::Option */ "error_message" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_refresh_history_wrapper'; /* */ /* */ -- src/monitor.rs:462 -- pg_trickle::monitor::slot_health CREATE FUNCTION pgtrickle."slot_health"() RETURNS TABLE ( "slot_name" TEXT, /* alloc::string::String */ "source_relid" bigint, /* i64 */ "active" bool, /* bool */ "retained_wal_bytes" bigint, /* i64 */ "wal_status" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'slot_health_wrapper'; /* */ /* */ -- src/hooks.rs:910 -- pg_trickle::hooks::pg_trickle_on_sql_drop -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/api.rs:1181 -- pg_trickle::api::parse_duration_seconds CREATE FUNCTION pgtrickle."parse_duration_seconds"( "input" TEXT /* &str */ ) RETURNS bigint /* core::option::Option */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'parse_duration_seconds_wrapper'; /* */ /* */ -- src/lib.rs:237 -- requires: -- parse_duration_seconds -- Convenience view: pg_stat_stream_tables -- Combines catalog metadata with aggregate refresh statistics. CREATE OR REPLACE VIEW pgtrickle.pg_stat_stream_tables AS SELECT st.pgt_id, st.pgt_schema, st.pgt_name, st.status, st.refresh_mode, st.is_populated, st.data_timestamp, st.schedule, now() - st.data_timestamp AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.data_timestamp IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.data_timestamp)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, st.consecutive_errors, st.needs_reinit, st.last_refresh_at, COALESCE(stats.total_refreshes, 0) AS total_refreshes, COALESCE(stats.successful_refreshes, 0) AS successful_refreshes, COALESCE(stats.failed_refreshes, 0) AS failed_refreshes, COALESCE(stats.total_rows_inserted, 0) AS total_rows_inserted, COALESCE(stats.total_rows_deleted, 0) AS total_rows_deleted, stats.avg_duration_ms, stats.last_action, stats.last_status FROM pgtrickle.pgt_stream_tables st LEFT JOIN LATERAL ( SELECT count(*)::bigint AS total_refreshes, count(*) FILTER (WHERE h.status = 'COMPLETED')::bigint AS successful_refreshes, count(*) FILTER (WHERE h.status = 'FAILED')::bigint AS failed_refreshes, COALESCE(sum(h.rows_inserted), 0)::bigint AS total_rows_inserted, COALESCE(sum(h.rows_deleted), 0)::bigint AS total_rows_deleted, CASE WHEN count(*) FILTER (WHERE h.end_time IS NOT NULL) > 0 THEN avg(EXTRACT(EPOCH FROM (h.end_time - h.start_time)) * 1000) FILTER (WHERE h.end_time IS NOT NULL) ELSE NULL END::float8 AS avg_duration_ms, (SELECT h2.action FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_action, (SELECT h2.status FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_status, (SELECT h2.initiated_by FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_initiated_by, (SELECT h2.freshness_deadline FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS freshness_deadline FROM pgtrickle.pgt_refresh_history h WHERE h.pgt_id = st.pgt_id ) stats ON true; /* */ /* */ -- src/lib.rs:186 -- requires: -- parse_duration_seconds -- Status overview view CREATE OR REPLACE VIEW pgtrickle.stream_tables_info AS SELECT st.*, now() - st.data_timestamp AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.data_timestamp)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, CASE WHEN st.topk_limit IS NOT NULL THEN TRUE ELSE FALSE END AS is_topk FROM pgtrickle.pgt_stream_tables st; /* */