# Plan: CDC Mode / Refresh Mode Interaction Gaps Date: 2026-03-07 Status: DONE Last Updated: 2026-03-08 > **G1–G6 implemented** — per-table `cdc_mode` override, explicit > IMMEDIATE+`cdc_mode => 'wal'` rejection, WAL slot advancement, > adaptive-fallback cleanup, `pgt_cdc_status`, and the defensive > differential-baseline guard all shipped in v0.2.3. --- ## 1. Problem Statement pg_trickle has four refresh modes (`AUTO`, `FULL`, `DIFFERENTIAL`, `IMMEDIATE`) and four CDC modes (`auto`, `trigger`, `wal`, plus the internal `transitioning` state). Not all combinations are explicitly validated, and several edge cases can produce surprising behavior: incorrect results, resource leaks, or opaque errors. This plan addresses six specific gaps, ordered by user impact. ### Gap Summary | # | Gap | Severity | Effort | |---|-----|----------|--------| | G1 | No per-table `cdc_mode` override | High | Medium | | G2 | `IMMEDIATE` + `wal` GUC — no validation | Medium | Small | | G3 | `FULL` refresh does not advance WAL slot | Medium | Small | | G4 | `AUTO→FULL` adaptive fallback does not flush change buffers | Medium | Medium | | G5 | `TRANSITIONING` state lacks user-facing query ergonomics | Low | Small | | G6 | `DIFFERENTIAL` without initialization baseline | Medium | Small | --- ## 2. Current State ### Refresh Modes | Mode | Mechanism | CDC Path | |------|-----------|----------| | `AUTO` | Attempts `DIFFERENTIAL`; falls back to `FULL` when query is non-differentiable or change ratio > threshold | Standard CDC (trigger/WAL) | | `FULL` | Truncate + reload | Standard CDC | | `DIFFERENTIAL` | Delta query over change buffers | Standard CDC | | `IMMEDIATE` | Statement-level IVM triggers (in-transaction) | IVM triggers, **bypasses CDC entirely** | ### CDC Modes | Mode | Set By | Scope | |------|--------|-------| | `auto` (default GUC) | `pg_trickle.cdc_mode` | Cluster-wide | | `trigger` | GUC | Cluster-wide | | `wal` | GUC | Cluster-wide | | `transitioning` | Internal state machine | Per-source in `pgt_dependencies.cdc_mode` | ### Key Code Locations | Component | File | Entry Point | |-----------|------|-------------| | CDC mode GUC | `src/config.rs:88–106` | `PGS_CDC_MODE` | | CDC setup | `src/api.rs:2319` | `setup_cdc_for_source()` | | IVM vs CDC branch | `src/api.rs:452–470` | `setup_trigger_infrastructure()` | | Full refresh | `src/refresh.rs:1011` | `execute_full_refresh()` | | Differential refresh | `src/refresh.rs:1260` | `execute_differential_refresh()` | | Adaptive fallback | `src/refresh.rs:1476–1554` | Change ratio check | | Scheduler dispatch | `src/scheduler.rs:1240–1395` | `RefreshAction` match | | Buffer cleanup | `src/refresh.rs:128` | `drain_pending_cleanups()` | | dbt materialization | `dbt-pgtrickle/macros/materializations/stream_table.sql` | N/A | --- ## 3. Gap Details & Implementation Plans --- ### G1: Per-Table `cdc_mode` Override ✅ **Problem.** `cdc_mode` is a cluster-wide GUC (`pg_trickle.cdc_mode`). In mixed environments — some tables have a PK (WAL-capable), others don't — users must choose the lowest common denominator (`trigger`) globally or manually `ALTER TABLE ... REPLICA IDENTITY FULL` on every keyless table. There is no way to say "use WAL for this table and triggers for that one." **Current code path.** `setup_cdc_for_source()` (`src/api.rs:2341`) reads `config::pg_trickle_cdc_mode()` — the global GUC — for every source. The catalog (`pgt_dependencies.cdc_mode`) already stores a per-source CDC mode, but only as a *result* of the state machine (never as user input). **dbt impact.** The `stream_table` materialization does not expose `cdc_mode` as a model config key. Users of `dbt-pgtrickle` have no way to set this per model. #### Design Add an optional `cdc_mode` parameter to `create_stream_table()` and `alter_stream_table()`. When set, it overrides the global GUC for all source tables of that stream table. When `NULL` (default), the global GUC applies (preserving backward compatibility). #### Implementation Steps 1. **SQL API — `create_stream_table()`** (`src/api.rs:33–41`) - Add parameter: `cdc_mode: default!(Option<&str>, "NULL")` - Pass through to `create_stream_table_impl()`. 2. **SQL API — `alter_stream_table()`** (`src/api.rs`) - Add parameter: `cdc_mode: default!(Option<&str>, "NULL")` - When non-NULL on ALTER, re-evaluate CDC setup for all sources: - `trigger→wal`: validate PK/REPLICA IDENTITY, create slot, begin transition. - `wal→trigger`: drop slot, recreate trigger. - Same mode: no-op. 3. **Catalog — `pgt_stream_tables`** (`sql/` upgrade migration) - Add column: `requested_cdc_mode TEXT DEFAULT NULL CHECK (requested_cdc_mode IN ('trigger', 'wal', 'auto', NULL))` - This stores the **user's intent**; `pgt_dependencies.cdc_mode` continues to store the **effective state** per source. 4. **CDC setup** (`src/api.rs:2319–2400`) - Modify `setup_cdc_for_source()` to accept an optional `cdc_mode_override` parameter. When `Some(...)`, use it instead of reading the GUC. - Update the PK/REPLICA IDENTITY validation to use the effective mode. 5. **dbt adapter** (`dbt-pgtrickle/macros/materializations/stream_table.sql`) - Add config key `cdc_mode` (default `none`). - Thread it into `pgtrickle_create_stream_table()` and `pgtrickle_alter_stream_table()` macro calls. 6. **Upgrade migration** (`sql/pg_trickle----.sql`) - `ALTER TABLE pgtrickle.pgt_stream_tables ADD COLUMN requested_cdc_mode TEXT DEFAULT NULL ...` - Update `create_stream_table` and `alter_stream_table` function signatures. 7. **Documentation** - `docs/SQL_REFERENCE.md`: Document new parameter. - `docs/CONFIGURATION.md`: Clarify GUC vs per-table precedence. 8. **Tests** - Unit test: `cdc_mode_override` parameter parsing. - Integration test: create ST with `cdc_mode => 'trigger'` while global GUC is `auto`; verify trigger is used, no slot created. - Integration test: create ST with `cdc_mode => 'wal'` on a keyless table without REPLICA IDENTITY FULL; verify error. - E2E test: alter existing ST's `cdc_mode` from `trigger` to `wal`; verify transition completes. - dbt integration test: model config `cdc_mode: 'trigger'`. --- ### G2: Explicit Validation of `IMMEDIATE` + WAL CDC ✅ **Problem.** If a user sets `pg_trickle.cdc_mode = 'wal'` and creates a stream table with `refresh_mode = 'IMMEDIATE'`, the system silently bypasses WAL entirely (the `is_immediate()` branch in `setup_trigger_infrastructure()` skips CDC setup). This is correct behavior, but confusing: the user asked for WAL and got IVM triggers with no feedback. With G1 (per-table `cdc_mode`), the risk increases: a user could explicitly write `cdc_mode => 'wal', refresh_mode => 'IMMEDIATE'`, which is an incoherent configuration. **Current code path.** `setup_trigger_infrastructure()` (`src/api.rs:452`) has an `if refresh_mode.is_immediate() { ... } else { ... }` branch. The `else` branch calls `setup_cdc_for_source()`. The `if` branch calls `ivm::setup_ivm_triggers()`. No validation rejects the combination. **Progress (2026-03-08).** Phase 1 is implemented: when the cluster-wide `pg_trickle.cdc_mode` GUC is `'wal'` and a stream table is created or altered to `refresh_mode = 'IMMEDIATE'`, pg_trickle now emits an INFO explaining that WAL CDC is ignored and that statement-level IVM triggers will be used instead. The explicit rejection path is still pending because there is not yet a per-table `cdc_mode` override surface; that arrives with G1. #### Implementation Steps 1. **Phase 1 — INFO for implicit global-GUC mismatch** (`src/api.rs`) - If `refresh_mode = 'IMMEDIATE'` and the effective `cdc_mode` comes from the global GUC with value `'wal'`, emit: ```text INFO: cdc_mode 'wal' has no effect for IMMEDIATE refresh mode — using IVM triggers ``` - Implement in both `create_stream_table_impl()` and `alter_stream_table_impl()`. 2. **Phase 2 — Explicit rejection once G1 exists** (`src/api.rs`) - After parsing `refresh_mode` and determining an explicit per-table `cdc_mode` override: ```rust if refresh_mode.is_immediate() && effective_cdc_mode == "wal" { return Err(PgTrickleError::InvalidArgument( "refresh_mode = 'IMMEDIATE' is incompatible with cdc_mode = 'wal'. \ IMMEDIATE uses in-transaction IVM triggers; WAL-based CDC is async. \ Use cdc_mode = 'trigger' or 'auto', or choose a different refresh_mode." .to_string(), )); } ``` - Same check in `alter_stream_table_impl()` when altering refresh mode or cdc mode. 3. **Tests** - Phase 1 E2E test: GUC `wal` + `IMMEDIATE` create path succeeds and does not install CDC triggers or WAL slots. - Phase 1 E2E test: GUC `wal` + `ALTER ... refresh_mode='IMMEDIATE'` succeeds, leaves no WAL slots behind, and preserves synchronous IMMEDIATE propagation on subsequent DML. - Integration test: explicit `cdc_mode => 'wal'` + `IMMEDIATE` → error. - Integration test: GUC `wal` + `IMMEDIATE` (no per-table override) → success with INFO log. --- ### G3: FULL Refresh Does Not Advance WAL Slot **Problem.** When `execute_full_refresh()` runs, it truncates the stream table and reloads from the defining query. The result is correct and a new frontier is stored (scheduler handles this at `src/scheduler.rs:1261–1273`). However, the change buffer rows consumed during prior differential cycles — and any new rows that accumulated during the full refresh — remain in the WAL slot's unacknowledged window. For trigger-based CDC this is benign: buffer rows are pruned by frontier-based cleanup. For WAL-based CDC, the logical replication slot's `confirmed_flush_lsn` is only advanced by the WAL decoder polling loop, **not** by the refresh executor. If the scheduler happens to do repeated FULL refreshes (e.g., the table is in `AUTO` mode with a high change ratio), the slot may retain WAL segments that are never needed. This causes: - WAL segment bloat (`pg_wal/` grows). - `pg_replication_slots.active_pid` shows stale lag. - Monitoring false alarms on replication lag. #### Implementation Steps 1. **Add helper: `advance_slot_to_current()`** (`src/wal_decoder.rs`) - New function: ```rust pub fn advance_slot_to_current(slot_name: &str) -> Result<(), PgTrickleError> { Spi::run_with_args( "SELECT pg_replication_slot_advance($1, pg_current_wal_lsn())", &[slot_name.into()], ).map_err(|e| PgTrickleError::SpiError(e.to_string())) } ``` 2. **Call after FULL refresh in scheduler** (`src/scheduler.rs:1261–1275`) - After `execute_full_refresh()` succeeds and the new frontier is stored, advance all WAL-mode slots for this ST's sources: ```rust // Advance WAL slots past the current LSN since full refresh // made all prior change-buffer data irrelevant. for dep in deps.iter().filter(|d| d.cdc_mode == CdcMode::Wal) { if let Some(ref slot) = dep.slot_name { if let Err(e) = wal_decoder::advance_slot_to_current(slot) { log!("pg_trickle: failed to advance slot {}: {}", slot, e); } } } ``` 3. **Also flush change buffer tables after FULL** (`src/refresh.rs:1011`) - At the end of `execute_full_refresh()`, truncate change buffers for the ST's sources so the next differential cycle doesn't reprocess stale rows: ```rust for oid in &source_oids { let _ = Spi::run(&format!( "TRUNCATE {change_schema}.changes_{}", oid.to_u32() )); } ``` - This is safe because the FULL refresh already materialized the complete state. Note: if a source is shared by multiple STs, the truncate must be conditional (only if all co-tracking STs also received a full refresh or their frontier is being reset). Use a safe cleanup approach: delete only rows with `lsn <= new_frontier_lsn` rather than TRUNCATE. 4. **Tests** - Integration test: create WAL-mode ST, insert rows, trigger FULL refresh, check `pg_replication_slots.confirmed_flush_lsn` has advanced. - Integration test: verify change buffer is empty after FULL refresh. **Progress (2026-03-08).** Implemented in `Unreleased`. `advance_slot_to_current(slot_name)` added to `src/wal_decoder.rs`; uses `pg_replication_slot_advance($1, pg_current_wal_lsn())` and silently skips if the slot does not exist. The shared `post_full_refresh_cleanup()` helper in `src/refresh.rs` iterates all WAL/TRANSITIONING `StDependency` entries, calls `advance_slot_to_current()` for each, then calls `cleanup_change_buffers_by_frontier()`. It is invoked from `scheduler.rs` after `store_frontier()` in the `Full`, `Reinitialize`, and empty-prev `Differential` arms. --- ### G4: AUTO→FULL Adaptive Fallback — Change Buffer Cleanup **Problem.** When `execute_differential_refresh()` detects that the change ratio exceeds `PGS_DIFFERENTIAL_MAX_CHANGE_RATIO` (default 15%), it falls back to `execute_manual_full_refresh()` (`src/refresh.rs:1476–1554`). The buffer cleanup (`drain_pending_cleanups()` + `cleanup_change_buffers_by_frontier()`) runs **before** the fallback decision at line 1331, operating only on rows ≤ the safe frontier LSN. When FULL runs, it recomputes the entire table from scratch, making all pending change buffer rows irrelevant. But these rows are **not** flushed — they persist in the buffer and will be picked up by the next scheduler tick, which may: 1. See changes → attempt DIFFERENTIAL again. 2. Exceed threshold again → fall back to FULL again. 3. Loop indefinitely on bulk-loaded data. This can also cause a "change ratio ping-pong" where the scheduler alternates between DIFFERENTIAL (small delta) and FULL (accumulated stale delta pushes ratio over threshold). #### Implementation Steps 1. **Flush change buffers after adaptive fallback** (`src/refresh.rs`) - In the adaptive fallback path (after `execute_manual_full_refresh()` returns), delete all change buffer rows up to the new frontier LSN: ```rust if should_fallback { let result = execute_manual_full_refresh(st, schema, table_name, source_oids); // After successful FULL, clear stale deltas to prevent // the next DIFFERENTIAL from re-triggering fallback. if result.is_ok() { cleanup_change_buffers_by_frontier(&change_schema, &catalog_source_oids); } return result; } ``` 2. **Deduplicate with G3** — The FULL-refresh cleanup logic from G3 applies here too. Factor out into a shared `post_full_refresh_cleanup()` helper called from both the scheduled FULL path and the adaptive fallback path. 3. **Tests** - Integration test: bulk INSERT that triggers adaptive fallback → verify change buffer is empty after refresh. - Integration test: after fallback FULL, insert one row → next cycle should succeed as DIFFERENTIAL without hitting the ratio threshold. **Progress (2026-03-08).** Implemented in `Unreleased`. G3 and G4 share the `post_full_refresh_cleanup()` helper in `src/refresh.rs`. In the adaptive fallback path inside `execute_differential_refresh()`, the helper is called after a successful adaptive FULL (TRUNCATE-needed path and ratio-threshold path). This prevents stale change-buffer rows from pushing the ratio over the threshold again on the next scheduler tick, eliminating the ping-pong cycle. --- ### G5: TRANSITIONING State User Visibility **Problem.** The `TRANSITIONING` CDC state is tracked in `pgt_dependencies.cdc_mode` and is visible in `pgtrickle.pgt_stream_table_sources`. However: - There is no simple way to query "which stream tables are currently transitioning?" without joining across catalog tables. - The `pgtrickle.pg_stat_stream_tables` monitoring view does not surface per-source CDC mode. - No NOTIFY/event is emitted when a transition starts or completes. This makes it difficult to debug slow transitions or stuck states. #### Implementation Steps 1. **Add CDC mode to `pg_stat_stream_tables`** (`src/monitor.rs`) - Add a `cdc_modes` column (text array) showing the distinct CDC modes across all sources. Example: `{wal}`, `{trigger,wal}`, `{transitioning,wal}`. - Alternatively, add a scalar `cdc_status` column with a summary: `'wal'`, `'trigger'`, `'mixed'`, `'transitioning'`. 2. **Add convenience view** (`sql/` upgrade migration) ```sql CREATE VIEW pgtrickle.pgt_cdc_status AS SELECT st.pgt_schema, st.pgt_name, d.source_relid, c.relname AS source_name, d.cdc_mode, d.slot_name FROM pgtrickle.pgt_dependencies d JOIN pgtrickle.pgt_stream_tables st ON st.pgt_id = d.pgt_id JOIN pg_class c ON c.oid = d.source_relid WHERE d.source_type = 'TABLE'; ``` 3. **NOTIFY on transition events** (`src/wal_decoder.rs` or `src/scheduler.rs`) - When CDC mode changes from `TRIGGER` → `TRANSITIONING`: ```sql NOTIFY pgtrickle_cdc_transition, '{"source_oid": 12345, "from": "trigger", "to": "transitioning"}' ``` - When `TRANSITIONING` → `WAL`: ```sql NOTIFY pgtrickle_cdc_transition, '{"source_oid": 12345, "from": "transitioning", "to": "wal"}' ``` 4. **Documentation** — Update `docs/SQL_REFERENCE.md` with the new view and NOTIFY channel. 5. **Tests** - Integration test: create ST with `auto` mode on a `wal_level = logical` cluster → verify `pgt_cdc_status` shows `TRANSITIONING` then `WAL`. - Integration test: verify NOTIFY payload is emitted. **Progress (2026-03-08).** Implemented in `Unreleased`. `pgtrickle.pgt_cdc_status` view added to `src/lib.rs` (the `pg_trickle_monitoring_views` extension_sql! block) alongside a `cdc_modes` text-array column in `pg_stat_stream_tables`. NOTIFY on transitions (TRIGGER → TRANSITIONING via `finish_wal_transition()` and TRANSITIONING → WAL via `complete_wal_transition()`) was already implemented by `emit_cdc_transition_notify()` in `src/wal_decoder.rs` in prior work. --- ### G6: DIFFERENTIAL Without Initialization Baseline **Progress (2026-03-08).** Completed in `Unreleased`. The low-level `execute_differential_refresh()` path now rejects unpopulated stream tables and empty previous frontiers before any SPI work begins, and E2E coverage continues to verify that manual refresh for `initialize => false` falls back to FULL rather than surfacing that internal guard. **Problem.** If `execute_differential_refresh()` is called on a stream table that has `is_populated = false` (never initialized), the frontier defaults to `'0/0'::pg_lsn`. This means the delta query scans the *entire* change buffer from the beginning of WAL, which: - Is semantically wrong: a delta applied to an empty table is not the same as a full materialization (aggregates, JOINs, etc. produce different results). - Is prohibitively slow on large buffers. **Current mitigation.** The manual refresh path (`execute_manual_differential_refresh()` at `src/api.rs:1898`) checks `st.is_populated` and falls back to FULL. The scheduler path (`src/scheduler.rs:1294–1340`) checks `prev_frontier.is_empty()` and falls back to FULL. So in *practice*, this gap is mitigated. **Residual risk.** `execute_differential_refresh()` itself has no guard — it trusts its callers. A future caller could skip the check. #### Implementation Steps 1. **Defensive check in `execute_differential_refresh()`** (`src/refresh.rs`) - Add an early return at the top of the function: ```rust if !st.is_populated { return Err(PgTrickleError::InvalidArgument(format!( "Cannot run DIFFERENTIAL refresh on unpopulated stream table {}.{} — \ a FULL refresh is required first.", st.pgt_schema, st.pgt_name ))); } ``` - Callers already handle errors by falling back to FULL or marking for reinit, so this is safe. 2. **Also guard on empty frontier** (belt-and-suspenders) - After the `is_populated` check: ```rust if prev_frontier.is_empty() { return Err(PgTrickleError::InvalidArgument(format!( "Cannot run DIFFERENTIAL refresh on {}.{} — no previous frontier exists.", st.pgt_schema, st.pgt_name ))); } ``` 3. **Tests** - Unit tests: call `execute_differential_refresh()` with `is_populated = false` and with `prev_frontier.is_empty()` → both return `InvalidArgument` before SPI work. - Integration/E2E test: create ST with `initialize => false`, attempt manual DIFFERENTIAL refresh → verify it still falls back to FULL and populates the stream table. --- ## 4. Implementation Order The gaps have interdependencies. Recommended order: ``` Phase 1 (quick wins — small, independent, high value): ├── G6: Defensive check in execute_differential_refresh() ├── G2: Validate IMMEDIATE + WAL combination └── G3: Advance WAL slot after FULL refresh Phase 2 (buffer hygiene): └── G4: Flush change buffers on adaptive fallback (shares cleanup helper with G3) Phase 3 (observability): └── G5: TRANSITIONING visibility (view + NOTIFY) Phase 4 (feature): └── G1: Per-table cdc_mode override (largest change — SQL API, catalog, dbt, migration, docs) ``` ### Effort Estimates | Phase | Gaps | Complexity | |-------|------|------------| | 1 | G6, G2, G3 | ~3–5 files changed, no migration | | 2 | G4 | ~2 files changed, refactor shared cleanup | | 3 | G5 | ~3 files + migration, new view | | 4 | G1 | ~8–10 files + migration + dbt macro changes | --- ## 5. Migration Strategy Phases 1–2 require no SQL migration (pure Rust logic changes). Phase 3 requires an upgrade migration for the convenience view. Phase 4 requires an upgrade migration for the new `pgt_stream_tables.requested_cdc_mode` column and updated function signatures. All changes are backward compatible: existing stream tables continue to work without any user action. The per-table `cdc_mode` defaults to `NULL` (inherit from GUC), and the new validations only reject configurations that were already broken or undefined. --- ## 6. Open Questions 1. **G1 granularity**: Should per-table `cdc_mode` be on the *stream table* (applies to all its sources) or on individual *source dependencies*? The former is simpler; the latter handles the case where one ST joins a PK-table with a keyless table. 2. **G3 shared sources**: When a source table is tracked by multiple STs and only one does a FULL refresh, we cannot TRUNCATE the shared change buffer. Should we use per-ST frontier-based DELETE instead? Or maintain per-ST buffer tables (major architectural change)? 3. **G4 ping-pong prevention**: Beyond buffer cleanup, should we add a backoff mechanism? E.g., after an adaptive fallback, force the next N cycles to use FULL to let the buffer stabilize. 4. **G5 NOTIFY volume**: On a cluster with hundreds of stream tables, CDC transition NOTIFYs could be noisy. Should this be gated behind a GUC (e.g., `pg_trickle.notify_cdc_transitions = on`)? --- ## 7. Related Plans - [PLAN_HYBRID_CDC.md](PLAN_HYBRID_CDC.md) — The original trigger→WAL transition design. G1 and G2 extend this. - [PLAN_TRANSACTIONAL_IVM.md](PLAN_TRANSACTIONAL_IVM.md) — IMMEDIATE mode design. G2 adds validation at the boundary between IMMEDIATE and CDC. - [PLAN_BOOTSTRAP_GATING.md](PLAN_BOOTSTRAP_GATING.md) — Bootstrap readiness. G6's defensive check complements this by preventing incorrect differential refreshes on uninitiated tables. - [PLAN_REFRESH_MODE_DEFAULT.md](PLAN_REFRESH_MODE_DEFAULT.md) — AUTO mode default behavior. G4 addresses the buffer cleanup gap in AUTO's fallback.