/* */ /* This file is auto generated by pgrx. The ordering of items is not stable, it is driven by a dependency graph. */ /* */ /* */ -- src/lib.rs:102 -- bootstrap -- Extension schemas CREATE SCHEMA IF NOT EXISTS pgtrickle; CREATE SCHEMA IF NOT EXISTS pgtrickle_changes; -- F51: Restrict change buffer schema access to prevent unauthorized -- injection of bogus changes that would be applied on next refresh. REVOKE ALL ON SCHEMA pgtrickle_changes FROM PUBLIC; -- Core ST metadata CREATE TABLE IF NOT EXISTS pgtrickle.pgt_stream_tables ( pgt_id BIGSERIAL PRIMARY KEY, pgt_relid OID NOT NULL UNIQUE, pgt_name TEXT NOT NULL, pgt_schema TEXT NOT NULL, defining_query TEXT NOT NULL, original_query TEXT, schedule TEXT, refresh_mode TEXT NOT NULL DEFAULT 'DIFFERENTIAL' CHECK (refresh_mode IN ('FULL', 'DIFFERENTIAL', 'IMMEDIATE')), status TEXT NOT NULL DEFAULT 'INITIALIZING' CHECK (status IN ('INITIALIZING', 'ACTIVE', 'SUSPENDED', 'ERROR')), is_populated BOOLEAN NOT NULL DEFAULT FALSE, data_timestamp TIMESTAMPTZ, frontier JSONB, last_refresh_at TIMESTAMPTZ, consecutive_errors INT NOT NULL DEFAULT 0, needs_reinit BOOLEAN NOT NULL DEFAULT FALSE, auto_threshold DOUBLE PRECISION, last_full_ms DOUBLE PRECISION, functions_used TEXT[], topk_limit INT, topk_order_by TEXT, topk_offset INT, diamond_consistency TEXT NOT NULL DEFAULT 'none' CHECK (diamond_consistency IN ('none', 'atomic')), diamond_schedule_policy TEXT NOT NULL DEFAULT 'fastest' CHECK (diamond_schedule_policy IN ('fastest', 'slowest')), has_keyless_source BOOLEAN NOT NULL DEFAULT FALSE, function_hashes TEXT, requested_cdc_mode TEXT CHECK (requested_cdc_mode IN ('auto', 'trigger', 'wal')), is_append_only BOOLEAN NOT NULL DEFAULT FALSE, scc_id INT, last_fixpoint_iterations INT, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_pgt_status ON pgtrickle.pgt_stream_tables (status); CREATE UNIQUE INDEX IF NOT EXISTS idx_pgt_name ON pgtrickle.pgt_stream_tables (pgt_schema, pgt_name); -- DAG edges CREATE TABLE IF NOT EXISTS pgtrickle.pgt_dependencies ( pgt_id BIGINT NOT NULL REFERENCES pgtrickle.pgt_stream_tables(pgt_id) ON DELETE CASCADE, source_relid OID NOT NULL, source_type TEXT NOT NULL CHECK (source_type IN ('TABLE', 'STREAM_TABLE', 'VIEW', 'MATVIEW', 'FOREIGN_TABLE')), columns_used TEXT[], column_snapshot JSONB, schema_fingerprint TEXT, cdc_mode TEXT NOT NULL DEFAULT 'TRIGGER' CHECK (cdc_mode IN ('TRIGGER', 'TRANSITIONING', 'WAL')), slot_name TEXT, decoder_confirmed_lsn PG_LSN, transition_started_at TIMESTAMPTZ, PRIMARY KEY (pgt_id, source_relid) ); CREATE INDEX IF NOT EXISTS idx_deps_source ON pgtrickle.pgt_dependencies (source_relid); -- Refresh history / audit log CREATE TABLE IF NOT EXISTS pgtrickle.pgt_refresh_history ( refresh_id BIGSERIAL PRIMARY KEY, pgt_id BIGINT NOT NULL, data_timestamp TIMESTAMPTZ NOT NULL, start_time TIMESTAMPTZ NOT NULL, end_time TIMESTAMPTZ, action TEXT NOT NULL CHECK (action IN ('NO_DATA', 'FULL', 'DIFFERENTIAL', 'DIFFERENTIAL', 'REINITIALIZE', 'SKIP')), rows_inserted BIGINT DEFAULT 0, rows_deleted BIGINT DEFAULT 0, delta_row_count BIGINT DEFAULT 0, merge_strategy_used TEXT, was_full_fallback BOOLEAN NOT NULL DEFAULT FALSE, error_message TEXT, status TEXT NOT NULL CHECK (status IN ('RUNNING', 'COMPLETED', 'FAILED', 'SKIPPED')), initiated_by TEXT CHECK (initiated_by IN ('SCHEDULER', 'MANUAL', 'INITIAL')), freshness_deadline TIMESTAMPTZ, tick_watermark_lsn PG_LSN, fixpoint_iteration INT ); CREATE INDEX IF NOT EXISTS idx_hist_pgt_ts ON pgtrickle.pgt_refresh_history (pgt_id, data_timestamp); -- Per-source CDC slot tracking CREATE TABLE IF NOT EXISTS pgtrickle.pgt_change_tracking ( source_relid OID PRIMARY KEY, slot_name TEXT NOT NULL, last_consumed_lsn PG_LSN, tracked_by_pgt_ids BIGINT[] ); -- Scheduler job table for parallel refresh dispatch CREATE TABLE IF NOT EXISTS pgtrickle.pgt_scheduler_jobs ( job_id BIGSERIAL PRIMARY KEY, dag_version BIGINT NOT NULL, unit_key TEXT NOT NULL, unit_kind TEXT NOT NULL CHECK (unit_kind IN ('singleton', 'atomic_group', 'immediate_closure')), member_pgt_ids BIGINT[] NOT NULL, root_pgt_id BIGINT NOT NULL, status TEXT NOT NULL DEFAULT 'QUEUED' CHECK (status IN ('QUEUED', 'RUNNING', 'SUCCEEDED', 'RETRYABLE_FAILED', 'PERMANENT_FAILED', 'CANCELLED')), scheduler_pid INT NOT NULL, worker_pid INT, attempt_no INT NOT NULL DEFAULT 1, enqueued_at TIMESTAMPTZ NOT NULL DEFAULT now(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, outcome_detail TEXT, retryable BOOLEAN ); CREATE INDEX IF NOT EXISTS idx_sched_jobs_status_enqueued ON pgtrickle.pgt_scheduler_jobs (status, enqueued_at); CREATE INDEX IF NOT EXISTS idx_sched_jobs_unit_status ON pgtrickle.pgt_scheduler_jobs (unit_key, status); CREATE INDEX IF NOT EXISTS idx_sched_jobs_finished ON pgtrickle.pgt_scheduler_jobs (finished_at) WHERE finished_at IS NOT NULL; -- Bootstrap source gates (v0.5.0, Phase 3) -- Records which source tables are currently "gated" (bootstrapping in progress). -- When a source is gated, all stream tables that depend on it are skipped by -- the scheduler until pgtrickle.ungate_source() is called. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_source_gates ( source_relid OID PRIMARY KEY, gated BOOLEAN NOT NULL DEFAULT true, gated_at TIMESTAMPTZ NOT NULL DEFAULT now(), ungated_at TIMESTAMPTZ, gated_by TEXT ); -- Per-source watermark state: tracks how far each external source has been loaded. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_watermarks ( source_relid OID PRIMARY KEY, watermark TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), advanced_by TEXT, wal_lsn_at_advance TEXT ); -- Watermark groups: declare that a set of sources must be temporally aligned. CREATE TABLE IF NOT EXISTS pgtrickle.pgt_watermark_groups ( group_id SERIAL PRIMARY KEY, group_name TEXT UNIQUE NOT NULL, source_relids OID[] NOT NULL, tolerance_secs DOUBLE PRECISION NOT NULL DEFAULT 0, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); /* */ /* */ -- src/lib.rs:295 -- Create event trigger functions with correct RETURNS event_trigger type. -- pgrx's #[pg_extern] generates RETURNS void, which PostgreSQL rejects for -- event triggers. We create them manually here with the correct return type. CREATE FUNCTION pgtrickle."_on_ddl_end"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_ddl_end_wrapper'; CREATE FUNCTION pgtrickle."_on_sql_drop"() RETURNS event_trigger LANGUAGE c AS 'MODULE_PATHNAME', 'pg_trickle_on_sql_drop_wrapper'; -- Event trigger: track ALTER TABLE on upstream sources CREATE EVENT TRIGGER pg_trickle_ddl_tracker ON ddl_command_end EXECUTE FUNCTION pgtrickle._on_ddl_end(); -- Event trigger: track DROP TABLE on upstream sources / ST storage tables CREATE EVENT TRIGGER pg_trickle_drop_tracker ON sql_drop EXECUTE FUNCTION pgtrickle._on_sql_drop(); /* */ /* */ -- src/lib.rs:44 CREATE SCHEMA IF NOT EXISTS pgtrickle; /* pg_trickle::pgtrickle */ /* */ /* */ -- src/monitor.rs:1797 -- pg_trickle::monitor::trigger_inventory CREATE FUNCTION pgtrickle."trigger_inventory"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "source_oid" bigint, /* i64 */ "trigger_name" TEXT, /* alloc::string::String */ "trigger_type" TEXT, /* alloc::string::String */ "present" bool, /* bool */ "enabled" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'trigger_inventory_wrapper'; /* */ /* */ -- src/api.rs:147 -- pg_trickle::api::create_or_replace_stream_table CREATE FUNCTION pgtrickle."create_or_replace_stream_table"( "name" TEXT, /* &str */ "query" TEXT, /* &str */ "schedule" TEXT DEFAULT 'calculated', /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT 'AUTO', /* &str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT false /* bool */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_or_replace_stream_table_wrapper'; /* */ /* */ -- src/ivm.rs:535 -- pg_trickle::ivm::pgt_ivm_apply_delta CREATE FUNCTION pgtrickle."pgt_ivm_apply_delta"( "pgt_id" bigint, /* i64 */ "source_oid" INT, /* i32 */ "has_new" bool, /* bool */ "has_old" bool /* bool */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_ivm_apply_delta_wrapper'; /* */ /* */ -- src/api.rs:2790 -- pg_trickle::api::_signal_launcher_rescan CREATE FUNCTION pgtrickle."_signal_launcher_rescan"() RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', '_signal_launcher_rescan_wrapper'; /* */ /* */ -- src/ivm.rs:840 -- pg_trickle::ivm::pgt_ivm_handle_truncate CREATE FUNCTION pgtrickle."pgt_ivm_handle_truncate"( "pgt_id" bigint /* i64 */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_ivm_handle_truncate_wrapper'; /* */ /* */ -- src/monitor.rs:1025 -- pg_trickle::monitor::change_buffer_sizes CREATE FUNCTION pgtrickle."change_buffer_sizes"() RETURNS TABLE ( "stream_table" TEXT, /* alloc::string::String */ "source_table" TEXT, /* alloc::string::String */ "source_oid" bigint, /* i64 */ "cdc_mode" TEXT, /* alloc::string::String */ "pending_rows" bigint, /* i64 */ "buffer_bytes" bigint /* i64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'change_buffer_sizes_wrapper'; /* */ /* */ -- src/api.rs:2858 -- pg_trickle::api::pgt_status CREATE FUNCTION pgtrickle."pgt_status"() RETURNS TABLE ( "name" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "refresh_mode" TEXT, /* alloc::string::String */ "is_populated" bool, /* bool */ "consecutive_errors" INT, /* i32 */ "schedule" TEXT, /* core::option::Option */ "data_timestamp" timestamp with time zone, /* core::option::Option */ "staleness" interval, /* core::option::Option */ "scc_id" INT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_status_wrapper'; /* */ /* */ -- src/hooks.rs:1000 -- pg_trickle::hooks::pg_trickle_on_sql_drop -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/api.rs:2849 -- pg_trickle::api::parse_duration_seconds CREATE FUNCTION pgtrickle."parse_duration_seconds"( "input" TEXT /* &str */ ) RETURNS bigint /* core::option::Option */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'parse_duration_seconds_wrapper'; /* */ /* */ -- src/lib.rs:325 -- requires: -- parse_duration_seconds -- Convenience view: pg_stat_stream_tables -- Combines catalog metadata with aggregate refresh statistics. CREATE OR REPLACE VIEW pgtrickle.pg_stat_stream_tables AS SELECT st.pgt_id, st.pgt_schema, st.pgt_name, st.status, st.refresh_mode, st.is_populated, st.data_timestamp, st.schedule, now() - st.data_timestamp AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.data_timestamp IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.data_timestamp)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, st.consecutive_errors, st.needs_reinit, st.last_refresh_at, COALESCE(stats.total_refreshes, 0) AS total_refreshes, COALESCE(stats.successful_refreshes, 0) AS successful_refreshes, COALESCE(stats.failed_refreshes, 0) AS failed_refreshes, COALESCE(stats.total_rows_inserted, 0) AS total_rows_inserted, COALESCE(stats.total_rows_deleted, 0) AS total_rows_deleted, stats.avg_duration_ms, stats.last_action, stats.last_status, (SELECT array_agg(DISTINCT d.cdc_mode ORDER BY d.cdc_mode) FROM pgtrickle.pgt_dependencies d WHERE d.pgt_id = st.pgt_id AND d.source_type = 'TABLE') AS cdc_modes, st.scc_id, st.last_fixpoint_iterations FROM pgtrickle.pgt_stream_tables st LEFT JOIN LATERAL ( SELECT count(*)::bigint AS total_refreshes, count(*) FILTER (WHERE h.status = 'COMPLETED')::bigint AS successful_refreshes, count(*) FILTER (WHERE h.status = 'FAILED')::bigint AS failed_refreshes, COALESCE(sum(h.rows_inserted), 0)::bigint AS total_rows_inserted, COALESCE(sum(h.rows_deleted), 0)::bigint AS total_rows_deleted, CASE WHEN count(*) FILTER (WHERE h.end_time IS NOT NULL) > 0 THEN avg(EXTRACT(EPOCH FROM (h.end_time - h.start_time)) * 1000) FILTER (WHERE h.end_time IS NOT NULL) ELSE NULL END::float8 AS avg_duration_ms, (SELECT h2.action FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_action, (SELECT h2.status FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_status, (SELECT h2.initiated_by FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS last_initiated_by, (SELECT h2.freshness_deadline FROM pgtrickle.pgt_refresh_history h2 WHERE h2.pgt_id = st.pgt_id ORDER BY h2.refresh_id DESC LIMIT 1) AS freshness_deadline FROM pgtrickle.pgt_refresh_history h WHERE h.pgt_id = st.pgt_id ) stats ON true; -- Per-source CDC status view (G5): exposes cdc_mode, slot names, and -- transition timestamps for every TABLE dependency of a stream table. CREATE OR REPLACE VIEW pgtrickle.pgt_cdc_status AS SELECT st.pgt_schema, st.pgt_name, d.source_relid, c.relname AS source_name, n.nspname AS source_schema, d.cdc_mode, d.slot_name, d.decoder_confirmed_lsn, d.transition_started_at FROM pgtrickle.pgt_dependencies d JOIN pgtrickle.pgt_stream_tables st ON st.pgt_id = d.pgt_id JOIN pg_class c ON c.oid = d.source_relid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE d.source_type = 'TABLE'; /* */ /* */ -- src/lib.rs:412 -- requires: -- parse_duration_seconds -- ERG-E: One-row health summary for dashboards and alerting. CREATE OR REPLACE VIEW pgtrickle.quick_health AS SELECT (SELECT count(*) FROM pgtrickle.pgt_stream_tables)::bigint AS total_stream_tables, (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'ERROR' OR consecutive_errors > 0)::bigint AS error_tables, (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE schedule IS NOT NULL AND schedule !~ '[\s@]' AND data_timestamp IS NOT NULL AND EXTRACT(EPOCH FROM (now() - data_timestamp)) > pgtrickle.parse_duration_seconds(schedule))::bigint AS stale_tables, (SELECT count(*) > 0 FROM pg_stat_activity WHERE backend_type = 'pg_trickle scheduler') AS scheduler_running, CASE WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables) = 0 THEN 'EMPTY' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'SUSPENDED') > 0 THEN 'CRITICAL' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE status = 'ERROR' OR consecutive_errors > 0) > 0 THEN 'WARNING' WHEN (SELECT count(*) FROM pgtrickle.pgt_stream_tables WHERE schedule IS NOT NULL AND schedule !~ '[\s@]' AND data_timestamp IS NOT NULL AND EXTRACT(EPOCH FROM (now() - data_timestamp)) > pgtrickle.parse_duration_seconds(schedule)) > 0 THEN 'WARNING' ELSE 'OK' END AS status; /* */ /* */ -- src/lib.rs:274 -- requires: -- parse_duration_seconds -- Status overview view CREATE OR REPLACE VIEW pgtrickle.stream_tables_info AS SELECT st.*, now() - st.data_timestamp AS staleness, CASE WHEN st.schedule IS NOT NULL AND st.schedule !~ '[\s@]' THEN EXTRACT(EPOCH FROM (now() - st.data_timestamp)) > pgtrickle.parse_duration_seconds(st.schedule) ELSE NULL::boolean END AS stale, CASE WHEN st.topk_limit IS NOT NULL THEN TRUE ELSE FALSE END AS is_topk FROM pgtrickle.pgt_stream_tables st; /* */ /* */ -- src/hash.rs:24 -- pg_trickle::hash::pg_trickle_hash_multi CREATE FUNCTION pgtrickle."pg_trickle_hash_multi"( "inputs" TEXT[] /* alloc::vec::Vec> */ ) RETURNS bigint /* i64 */ IMMUTABLE PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_multi_wrapper'; /* */ /* */ -- src/api.rs:2272 -- pg_trickle::api::refresh_stream_table CREATE FUNCTION pgtrickle."refresh_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_stream_table_wrapper'; /* */ /* */ -- src/monitor.rs:1897 -- pg_trickle::monitor::worker_pool_status CREATE FUNCTION pgtrickle."worker_pool_status"() RETURNS TABLE ( "active_workers" INT, /* i32 */ "max_workers" INT, /* i32 */ "per_db_cap" INT, /* i32 */ "parallel_mode" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'worker_pool_status_wrapper'; /* */ /* */ -- src/api.rs:2912 -- pg_trickle::api::pgt_scc_status CREATE FUNCTION pgtrickle."pgt_scc_status"() RETURNS TABLE ( "scc_id" INT, /* i32 */ "member_count" INT, /* i32 */ "members" TEXT[], /* alloc::vec::Vec */ "last_iterations" INT, /* core::option::Option */ "last_converged_at" timestamp with time zone /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgt_scc_status_wrapper'; /* */ /* */ -- src/api.rs:3313 -- pg_trickle::api::drop_watermark_group CREATE FUNCTION pgtrickle."drop_watermark_group"( "group_name" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_watermark_group_wrapper'; /* */ /* */ -- src/api.rs:3090 -- pg_trickle::api::ungate_source CREATE FUNCTION pgtrickle."ungate_source"( "source" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'ungate_source_wrapper'; /* */ /* */ -- src/api.rs:3321 -- pg_trickle::api::watermarks CREATE FUNCTION pgtrickle."watermarks"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "schema_name" TEXT, /* alloc::string::String */ "watermark" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "updated_at" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "advanced_by" TEXT, /* core::option::Option */ "wal_lsn" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermarks_fn_wrapper'; /* */ /* */ -- src/api.rs:3115 -- pg_trickle::api::source_gates CREATE FUNCTION pgtrickle."source_gates"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "schema_name" TEXT, /* alloc::string::String */ "gated" bool, /* bool */ "gated_at" timestamp with time zone, /* core::option::Option */ "ungated_at" timestamp with time zone, /* core::option::Option */ "gated_by" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'source_gates_fn_wrapper'; /* */ /* */ -- src/monitor.rs:1921 -- pg_trickle::monitor::parallel_job_status CREATE FUNCTION pgtrickle."parallel_job_status"( "max_age_seconds" INT DEFAULT 300 /* i32 */ ) RETURNS TABLE ( "job_id" bigint, /* i64 */ "unit_key" TEXT, /* alloc::string::String */ "unit_kind" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "member_count" INT, /* i32 */ "attempt_no" INT, /* i32 */ "scheduler_pid" INT, /* i32 */ "worker_pid" INT, /* core::option::Option */ "enqueued_at" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "started_at" timestamp with time zone, /* core::option::Option */ "finished_at" timestamp with time zone, /* core::option::Option */ "duration_ms" double precision /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'parallel_job_status_wrapper'; /* */ /* */ -- src/monitor.rs:589 -- pg_trickle::monitor::get_staleness CREATE FUNCTION pgtrickle."get_staleness"( "name" TEXT /* &str */ ) RETURNS double precision /* core::option::Option */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_staleness_wrapper'; /* */ /* */ -- src/monitor.rs:1430 -- pg_trickle::monitor::health_check CREATE FUNCTION pgtrickle."health_check"() RETURNS TABLE ( "check_name" TEXT, /* alloc::string::String */ "severity" TEXT, /* alloc::string::String */ "detail" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'health_check_wrapper'; /* */ /* */ -- src/api.rs:3065 -- pg_trickle::api::gate_source CREATE FUNCTION pgtrickle."gate_source"( "source" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'gate_source_wrapper'; /* */ /* */ -- src/hooks.rs:51 -- pg_trickle::hooks::pg_trickle_on_ddl_end -- Skipped due to `#[pgrx(sql = false)]` /* */ /* */ -- src/monitor.rs:1173 -- pg_trickle::monitor::dependency_tree CREATE FUNCTION pgtrickle."dependency_tree"() RETURNS TABLE ( "tree_line" TEXT, /* alloc::string::String */ "node" TEXT, /* alloc::string::String */ "node_type" TEXT, /* alloc::string::String */ "depth" INT, /* i32 */ "status" TEXT, /* core::option::Option */ "refresh_mode" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'dependency_tree_wrapper'; /* */ /* */ -- src/hash.rs:13 -- pg_trickle::hash::pg_trickle_hash CREATE FUNCTION pgtrickle."pg_trickle_hash"( "input" TEXT /* &str */ ) RETURNS bigint /* i64 */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pg_trickle_hash_wrapper'; /* */ /* */ -- src/api.rs:33 -- pg_trickle::api::create_stream_table CREATE FUNCTION pgtrickle."create_stream_table"( "name" TEXT, /* &str */ "query" TEXT, /* &str */ "schedule" TEXT DEFAULT 'calculated', /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT 'AUTO', /* &str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT false /* bool */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_stream_table_wrapper'; /* */ /* */ -- src/api.rs:2136 -- pg_trickle::api::drop_stream_table CREATE FUNCTION pgtrickle."drop_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'drop_stream_table_wrapper'; /* */ /* */ -- src/api.rs:2972 -- pg_trickle::api::diamond_groups CREATE FUNCTION pgtrickle."diamond_groups"() RETURNS TABLE ( "group_id" INT, /* i32 */ "member_name" TEXT, /* alloc::string::String */ "member_schema" TEXT, /* alloc::string::String */ "is_convergence" bool, /* bool */ "epoch" bigint, /* i64 */ "schedule_policy" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'diamond_groups_wrapper'; /* */ /* */ -- src/monitor.rs:734 -- pg_trickle::monitor::check_cdc_health CREATE FUNCTION pgtrickle."check_cdc_health"() RETURNS TABLE ( "source_relid" bigint, /* i64 */ "source_table" TEXT, /* alloc::string::String */ "cdc_mode" TEXT, /* alloc::string::String */ "slot_name" TEXT, /* core::option::Option */ "lag_bytes" bigint, /* core::option::Option */ "confirmed_lsn" TEXT, /* core::option::Option */ "alert" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'check_cdc_health_wrapper'; /* */ /* */ -- src/api.rs:3367 -- pg_trickle::api::watermark_groups CREATE FUNCTION pgtrickle."watermark_groups"() RETURNS TABLE ( "group_name" TEXT, /* alloc::string::String */ "source_count" INT, /* i32 */ "tolerance_secs" double precision, /* f64 */ "created_at" timestamp with time zone /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermark_groups_fn_wrapper'; /* */ /* */ -- src/api.rs:3402 -- pg_trickle::api::watermark_status CREATE FUNCTION pgtrickle."watermark_status"() RETURNS TABLE ( "group_name" TEXT, /* alloc::string::String */ "min_watermark" timestamp with time zone, /* core::option::Option */ "max_watermark" timestamp with time zone, /* core::option::Option */ "lag_secs" double precision, /* core::option::Option */ "aligned" bool, /* bool */ "sources_with_watermark" INT, /* i32 */ "sources_total" INT /* i32 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'watermark_status_fn_wrapper'; /* */ /* */ -- src/api.rs:1799 -- pg_trickle::api::alter_stream_table CREATE FUNCTION pgtrickle."alter_stream_table"( "name" TEXT, /* &str */ "query" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "schedule" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "status" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT NULL /* core::option::Option */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'alter_stream_table_wrapper'; /* */ /* */ -- src/api.rs:3245 -- pg_trickle::api::advance_watermark CREATE FUNCTION pgtrickle."advance_watermark"( "source" TEXT, /* &str */ "watermark" timestamp with time zone /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ ) RETURNS VOID /* core::result::Result<(), pg_trickle::error::PgTrickleError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'advance_watermark_wrapper'; /* */ /* */ -- src/api.rs:68 -- pg_trickle::api::create_stream_table_if_not_exists CREATE FUNCTION pgtrickle."create_stream_table_if_not_exists"( "name" TEXT, /* &str */ "query" TEXT, /* &str */ "schedule" TEXT DEFAULT 'calculated', /* core::option::Option<&str> */ "refresh_mode" TEXT DEFAULT 'AUTO', /* &str */ "initialize" bool DEFAULT true, /* bool */ "diamond_consistency" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "diamond_schedule_policy" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "cdc_mode" TEXT DEFAULT NULL, /* core::option::Option<&str> */ "append_only" bool DEFAULT false /* bool */ ) RETURNS void LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_stream_table_if_not_exists_wrapper'; /* */ /* */ -- src/monitor.rs:631 -- pg_trickle::monitor::explain_st CREATE FUNCTION pgtrickle."explain_st"( "name" TEXT /* &str */ ) RETURNS TABLE ( "property" TEXT, /* alloc::string::String */ "value" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'explain_st_wrapper'; /* */ /* */ -- src/monitor.rs:473 -- pg_trickle::monitor::get_refresh_history CREATE FUNCTION pgtrickle."get_refresh_history"( "name" TEXT, /* &str */ "max_rows" INT DEFAULT 20 /* i32 */ ) RETURNS TABLE ( "refresh_id" bigint, /* i64 */ "data_timestamp" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "start_time" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "end_time" timestamp with time zone, /* core::option::Option */ "action" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "rows_inserted" bigint, /* i64 */ "rows_deleted" bigint, /* i64 */ "duration_ms" double precision, /* core::option::Option */ "error_message" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'get_refresh_history_wrapper'; /* */ /* */ -- src/api.rs:2231 -- pg_trickle::api::resume_stream_table CREATE FUNCTION pgtrickle."resume_stream_table"( "name" TEXT /* &str */ ) RETURNS void STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'resume_stream_table_wrapper'; /* */ /* */ -- src/api.rs:2804 -- pg_trickle::api::rebuild_cdc_triggers CREATE FUNCTION pgtrickle."rebuild_cdc_triggers"() RETURNS TEXT /* &str */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'rebuild_cdc_triggers_wrapper'; /* */ /* */ -- src/api.rs:2778 -- pg_trickle::api::version CREATE FUNCTION pgtrickle."version"() RETURNS TEXT /* &str */ IMMUTABLE STRICT PARALLEL SAFE LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'version_wrapper'; /* */ /* */ -- src/api.rs:3172 -- pg_trickle::api::bootstrap_gate_status CREATE FUNCTION pgtrickle."bootstrap_gate_status"() RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "schema_name" TEXT, /* alloc::string::String */ "gated" bool, /* bool */ "gated_at" timestamp with time zone, /* core::option::Option */ "ungated_at" timestamp with time zone, /* core::option::Option */ "gated_by" TEXT, /* core::option::Option */ "gate_duration" interval, /* core::option::Option */ "affected_stream_tables" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'bootstrap_gate_status_fn_wrapper'; /* */ /* */ -- src/monitor.rs:346 -- pg_trickle::monitor::st_refresh_stats CREATE FUNCTION pgtrickle."st_refresh_stats"() RETURNS TABLE ( "pgt_name" TEXT, /* alloc::string::String */ "pgt_schema" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "refresh_mode" TEXT, /* alloc::string::String */ "is_populated" bool, /* bool */ "total_refreshes" bigint, /* i64 */ "successful_refreshes" bigint, /* i64 */ "failed_refreshes" bigint, /* i64 */ "total_rows_inserted" bigint, /* i64 */ "total_rows_deleted" bigint, /* i64 */ "avg_duration_ms" double precision, /* f64 */ "last_refresh_action" TEXT, /* core::option::Option */ "last_refresh_status" TEXT, /* core::option::Option */ "last_refresh_at" timestamp with time zone, /* core::option::Option */ "staleness_secs" double precision, /* core::option::Option */ "stale" bool /* bool */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_refresh_stats_wrapper'; /* */ /* */ -- src/monitor.rs:612 -- pg_trickle::monitor::slot_health CREATE FUNCTION pgtrickle."slot_health"() RETURNS TABLE ( "slot_name" TEXT, /* alloc::string::String */ "source_relid" bigint, /* i64 */ "active" bool, /* bool */ "retained_wal_bytes" bigint, /* i64 */ "wal_status" TEXT /* alloc::string::String */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'slot_health_wrapper'; /* */ /* */ -- src/monitor.rs:1097 -- pg_trickle::monitor::list_sources CREATE FUNCTION pgtrickle."list_sources"( "name" TEXT /* &str */ ) RETURNS TABLE ( "source_table" TEXT, /* alloc::string::String */ "source_oid" bigint, /* i64 */ "source_type" TEXT, /* alloc::string::String */ "cdc_mode" TEXT, /* alloc::string::String */ "columns_used" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'list_sources_wrapper'; /* */ /* */ -- src/monitor.rs:570 -- pg_trickle::monitor::st_auto_threshold CREATE FUNCTION pgtrickle."st_auto_threshold"( "name" TEXT /* &str */ ) RETURNS double precision /* core::option::Option */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'st_auto_threshold_wrapper'; /* */ /* */ -- src/api.rs:3276 -- pg_trickle::api::create_watermark_group CREATE FUNCTION pgtrickle."create_watermark_group"( "group_name" TEXT, /* &str */ "sources" TEXT[], /* alloc::vec::Vec */ "tolerance_secs" double precision DEFAULT 0.0 /* f64 */ ) RETURNS INT /* core::result::Result */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'create_watermark_group_wrapper'; /* */ /* */ -- src/monitor.rs:1723 -- pg_trickle::monitor::refresh_timeline CREATE FUNCTION pgtrickle."refresh_timeline"( "max_rows" INT DEFAULT 50 /* i32 */ ) RETURNS TABLE ( "start_time" timestamp with time zone, /* pgrx::datetime::time_stamp_with_timezone::TimestampWithTimeZone */ "stream_table" TEXT, /* alloc::string::String */ "action" TEXT, /* alloc::string::String */ "status" TEXT, /* alloc::string::String */ "rows_inserted" bigint, /* i64 */ "rows_deleted" bigint, /* i64 */ "duration_ms" double precision, /* core::option::Option */ "error_message" TEXT /* core::option::Option */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'refresh_timeline_wrapper'; /* */ /* */ -- src/lib.rs:457 -- requires: -- _signal_launcher_rescan -- finalize SELECT pgtrickle._signal_launcher_rescan(); /* */