> **Plain-language companion:** [v0.35.0.md](v0.35.0.md) ## v0.35.0 — Citus: World-Class Distributed Source CDC **Status: Planned.** Derived from [plans/ecosystem/PLAN_CITUS.md](../plans/ecosystem/PLAN_CITUS.md) §6 Phases 3, 4, 5, and 6. > **Release Theme** > v0.35.0 delivers world-class incremental view maintenance over Citus > distributed tables. Building on v0.32.0's stable naming and frontier > foundations, this release adds: (P3) per-worker WAL slot polling for > distributed-table CDC; (P4) distributed ST storage with a Citus-safe > DELETE+UPSERT apply path; (P5) catalog-based cross-node coordination and > a `pgtrickle.citus_status` observability view; and (P6) a full Citus > E2E test suite (1 coordinator + 2 workers, 10-scenario matrix) with > dedicated benchmarks and documentation. The non-Citus code path has a > hard regression budget of 0% — every Citus code path short-circuits on > the `is_citus_loaded()` check introduced in v0.32.0. --- ### Phase 3 — Distributed CDC via Per-Worker Slots | ID | Title | Effort | Priority | |----|-------|--------|----------| | CDC-1 | Remote slot consumption via `dblink`-wrapped `pg_logical_slot_get_changes()` | L | P0 | | CDC-2 | `setup_cdc_for_source` for distributed sources: per-worker publication + slot creation | M | P0 | | CDC-3 | REPLICA IDENTITY FULL enforcement on each worker | S | P0 | | CDC-4 | Slot polling: scheduler iterates `pgt_remote_slots`, writes into coordinator change buffer | L | P0 | | CDC-5 | TRUNCATE + DDL propagation for distributed sources | M | P1 | | CDC-6 | New catalog: `pgtrickle.pgt_remote_slots (pgt_id, worker_id, worker_host, worker_port, slot_name, last_lsn)` | S | P0 | | CDC-7 | Libpq streaming fallback path (P3.1 option B) — implement if dblink bench fails threshold | L | P2 | **CDC-1** — Today `src/wal_decoder.rs` calls `pg_logical_slot_get_changes()` via SPI on the local node. Add a sibling `poll_remote_slot(conn: &PgConnection, slot_name: &str, …)` that executes the same SQL via a `dblink` foreign connection to a worker. Connection parameters come from `pgt_remote_slots.worker_host/port`. Connection pooling reuses existing `libpq` handles per scheduler tick. **CDC-2** — When `is_citus_loaded()` and `placement(oid) == Distributed`: 1. Issue `run_command_on_all_nodes("CREATE PUBLICATION pgtrickle_{stable_name} FOR TABLE {table} WITH (publish_via_partition_root = true)")` 2. Issue `run_command_on_all_nodes("SELECT pg_create_logical_replication_slot('pgtrickle_{stable_name}', 'pgoutput')")` 3. Insert one row per worker into `pgtrickle.pgt_remote_slots`. **CDC-4** — Scheduler tick: for each source with remote slots, call `poll_remote_slot()` for each worker row in `pgt_remote_slots`. Decoded rows written to coordinator's `changes_{stable_name}` buffer with two new columns: `origin_node SMALLINT`, `origin_lsn PG_LSN`. Compaction uses `(origin_node, origin_lsn)` as the watermark. --- ### Phase 4 — Distributed ST Storage & Apply Path | ID | Title | Effort | Priority | |----|-------|--------|----------| | APPLY-1 | `create_stream_table(…, placement => 'local' | 'reference' | 'distributed')` | M | P0 | | APPLY-2 | Auto-select placement from declared source placements + row-count heuristic | S | P1 | | APPLY-3 | DELETE + `INSERT … ON CONFLICT DO UPDATE` codegen template for distributed STs | M | P0 | | APPLY-4 | Delta materialisation to TEMP table for reference STs with non-pushable plans | S | P1 | | APPLY-5 | `__pgt_row_id` as distribution column when creating distributed ST | S | P0 | | APPLY-6 | `reltuples` fix: sum `pg_dist_shard` row counts for distributed tables in DAG planner | S | P1 | | APPLY-7 | GUC: `pg_trickle.citus_reference_st_max_rows` (default 1_000_000) | S | P1 | **APPLY-3** — Codegen in `src/refresh/codegen.rs` gets a second template selected by `st_placement = 'distributed'`: ```sql WITH delta AS (...) DELETE FROM {st} st USING delta d WHERE d.__pgt_action = 'D' AND st.__pgt_row_id = d.__pgt_row_id; WITH delta AS (...) INSERT INTO {st} (__pgt_row_id, ...) SELECT __pgt_row_id, ... FROM delta WHERE __pgt_action != 'D' ON CONFLICT (__pgt_row_id) DO UPDATE SET ...; ``` **APPLY-5** — At `create_distributed_table({st}, '__pgt_row_id')` time, validate that `__pgt_row_id` is not repurposed as a user column (rare; emit a hard error if so). --- ### Phase 5 — Coordination & Operability | ID | Title | Effort | Priority | |----|-------|--------|----------| | COORD-1 | Catalog lock table: `pgtrickle.pgt_st_locks` | S | P0 | | COORD-2 | Replace advisory lock acquisition in scheduler with catalog-based locks for distributed STs | M | P0 | | COORD-3 | `pgtrickle.citus_status` view: worker reachability, slot lag, placement summary | M | P0 | | COORD-4 | Failure mode: pause refresh on unreachable worker; surface via monitor + Prometheus | M | P1 | | COORD-5 | Failure mode: WAL recycled past slot — fall back to FULL refresh, emit `WARNING` | S | P1 | | COORD-6 | Failure mode: slot missing on worker after node add — re-create on next tick | S | P1 | | COORD-7 | Pre-flight check: refuse to start if `pg_trickle` version mismatches across workers | S | P0 | | COORD-8 | Pre-flight check: enforce `wal_level = logical` on each worker at slot-create time | S | P0 | **COORD-1** — `pgtrickle.pgt_st_locks`: ```sql CREATE TABLE pgtrickle.pgt_st_locks ( pgt_id BIGINT PRIMARY KEY, locked_by INT NOT NULL, -- PID locked_at TIMESTAMPTZ NOT NULL DEFAULT now(), lease_until TIMESTAMPTZ NOT NULL ); ``` `INSERT … ON CONFLICT DO NOTHING` acquires; `DELETE` releases; a background task reaps expired leases. Advisory locks remain the fast in-process path; catalog locks are used only when `is_citus_loaded()`. **COORD-3** — `pgtrickle.citus_status` columns: `pgt_id, source_table, worker_host, worker_port, slot_name, slot_lag_bytes, last_polled_at, status ('ok' | 'lagging' | 'unreachable' | 'slot_missing')`. --- ### Phase 6 — Validation, Benchmarks, Migration, Docs | ID | Title | Effort | Priority | |----|-------|--------|----------| | TEST-1 | `tests/Dockerfile.e2e-citus`: Citus 13.x image with pg_trickle extension | M | P0 | | TEST-2 | docker-compose: 1 coordinator + 2 workers for Citus E2E tests | S | P0 | | TEST-3 | `tests/e2e_citus_tests.rs`: 10-scenario test matrix (see below) | L | P0 | | BENCH-1 | `benches/bench_remote_slot_poll`: dblink vs local SPI throughput | M | P1 | | BENCH-2 | `benches/bench_distributed_apply`: DELETE+UPSERT vs MERGE on 100M-row distributed ST | M | P1 | | MIG-1 | SQL migration: `pgt_remote_slots` catalog creation + new `origin_node`/`origin_lsn` buffer columns | S | P0 | | DOCS-1 | `docs/integrations/citus.md`: prerequisites, install guide, placement options, failure modes | M | P0 | | DOCS-2 | Update `docs/ARCHITECTURE.md` with multi-node Citus diagram | S | P1 | | DOCS-3 | Update `INSTALL.md` with "installing on a Citus cluster" section | S | P1 | **TEST-3 — 10-scenario matrix:** | # | Source(s) | ST placement | Exercises | |---|-----------|--------------|-----------| | 1 | Local table | local | Regression — trigger CDC unchanged | | 2 | Reference table | local | Coordinator trigger CDC | | 3 | Distributed table | reference | Per-worker slots, MERGE, replication of ST | | 4 | Distributed table | distributed | Per-worker slots, DELETE + UPSERT | | 5 | Mixed (ref + dist) | local | Mixed CDC backends, per-source frontier | | 6 | Distributed → outbox | distributed | Downstream publication + relay from distributed ST | | 7 | ALTER TABLE on dist source | distributed | DDL propagation, slot rebuild | | 8 | Worker restart | distributed | Slot survives, refresh resumes | | 9 | TRUNCATE on dist source | any | Full-refresh fallback | | 10 | Concurrent DML + refresh | distributed | Apply correctness under load | --- ### Performance Budget | Metric | Target | |--------|--------| | Non-Citus code path regression | 0% (hard gate) | | `create_stream_table()` overhead when Citus absent | ≤ 1 µs | | Refresh latency regression on single-node suite | ≤ 2% | | `bench_remote_slot_poll` (dblink) throughput | ≥ 50 k rows/s on loopback; if < 10 k rows/s trigger CDC-7 (libpq streaming) | --- ### Conflicts & Risks - **dblink latency** (CDC-1): bench result gates whether CDC-7 (libpq streaming) becomes P0. Add the bench early in the release to avoid a late scope change. - **Citus MERGE rejection** (APPLY-3): DELETE+UPSERT is the primary path; MERGE is only used for reference STs and is already known to work. - **Slot fills WAL on worker if coordinator stops** (COORD-4/5): document monitoring requirement; expose `slot_lag_bytes` in `citus_status`; auto-drop slot after configurable lease expiry (`pg_trickle.citus_slot_max_lag_bytes`, default 1 GB). - **Schema/role differences on workers** (COORD-8): `run_command_on_all_nodes("CREATE SCHEMA IF NOT EXISTS pgtrickle_changes")` + role grants at setup time. - **Shard rebalance invalidates slots**: out of scope for this release; emit a hard error in slot-poll path if `pg_dist_node` topology hash changes between polls. --- ### Exit Criteria - [ ] CDC-1: `poll_remote_slot()` decodes INSERT/UPDATE/DELETE/TRUNCATE from a worker via dblink - [ ] CDC-2: `setup_cdc_for_source` on a distributed table creates publication + slot on every worker and populates `pgt_remote_slots` - [ ] CDC-4: Scheduler polls all workers for a distributed source and writes correct decoded rows into coordinator change buffer - [ ] APPLY-1/APPLY-3: `create_stream_table(…, placement => 'distributed')` works end-to-end; DELETE+UPSERT apply path produces identical results to single-node MERGE - [ ] COORD-3: `SELECT * FROM pgtrickle.citus_status` returns one row per (source, worker) with accurate lag - [ ] TEST-3: All 10 E2E scenarios pass against a live 1+2 Citus cluster - [ ] BENCH-1: `bench_remote_slot_poll` throughput ≥ 50 k rows/s (dblink loopback); report logged in `docs/BENCHMARK.md` - [ ] Non-Citus benchmark suite shows 0% regression - [ ] DOCS-1: `docs/integrations/citus.md` covers prerequisites, setup, placement options, monitoring, failure modes, and known limitations - [ ] Migration path `v0.32.0 → v0.35.0` tested with `just test-upgrade-all` - [ ] `just check-version-sync` passes - [ ] `just lint` passes (zero warnings) ---