> **Plain-language companion:** [v0.33.0.md](v0.33.0.md) ## v0.33.0 — Citus: World-Class Distributed Source CDC **Status: Released (2026-04-26).** Derived from [plans/ecosystem/PLAN_CITUS.md](../plans/ecosystem/PLAN_CITUS.md) §6 Phases 3, 4, 5, and 6. > **Release Theme** > v0.33.0 delivers world-class incremental view maintenance over Citus > distributed tables. Building on v0.32.0's stable naming and frontier > foundations, this release adds: (P3) per-worker WAL slot polling for > distributed-table CDC; (P4) distributed ST storage with a Citus-safe > DELETE+UPSERT apply path; (P5) catalog-based cross-node coordination and > a `pgtrickle.citus_status` observability view; and (P6) a full Citus > E2E test suite (1 coordinator + 2 workers, 10-scenario matrix) with > dedicated benchmarks and documentation. The non-Citus code path has a > hard regression budget of 0% — every Citus code path short-circuits on > the `is_citus_loaded()` check introduced in v0.32.0. --- ### Phase 3 — Distributed CDC via Per-Worker Slots | ID | Title | Effort | Priority | |----|-------|--------|----------| | CDC-1 | Remote slot consumption via `dblink`-wrapped `pg_logical_slot_get_changes()` | L | P0 | | CDC-2 | `setup_cdc_for_source` for distributed sources: per-worker publication + slot creation | M | P0 | | CDC-3 | REPLICA IDENTITY FULL enforcement on each worker | S | P0 | | CDC-4 | Slot polling: scheduler iterates `pgt_remote_slots`, writes into coordinator change buffer | L | P0 | | CDC-5 | TRUNCATE + DDL propagation for distributed sources | M | P1 | | CDC-6 | New catalog: `pgtrickle.pgt_remote_slots (pgt_id, worker_id, worker_host, worker_port, slot_name, last_lsn)` | S | P0 | | CDC-7 | Libpq streaming fallback path (P3.1 option B) — implement if dblink bench fails threshold | L | P2 | **CDC-1** — Today `src/wal_decoder.rs` calls `pg_logical_slot_get_changes()` via SPI on the local node. Add a sibling `poll_remote_slot(conn: &PgConnection, slot_name: &str, …)` that executes the same SQL via a `dblink` foreign connection to a worker. Connection parameters come from `pgt_remote_slots.worker_host/port`. Connection pooling reuses existing `libpq` handles per scheduler tick. **CDC-2** — When `is_citus_loaded()` and `placement(oid) == Distributed`: 1. Issue `run_command_on_all_nodes("CREATE PUBLICATION pgtrickle_{stable_name} FOR TABLE {table} WITH (publish_via_partition_root = true)")` 2. Issue `run_command_on_all_nodes("SELECT pg_create_logical_replication_slot('pgtrickle_{stable_name}', 'pgoutput')")` 3. Insert one row per worker into `pgtrickle.pgt_remote_slots`. **CDC-4** — Scheduler tick: for each source with remote slots, call `poll_remote_slot()` for each worker row in `pgt_remote_slots`. Decoded rows written to coordinator's `changes_{stable_name}` buffer with two new columns: `origin_node SMALLINT`, `origin_lsn PG_LSN`. Compaction uses `(origin_node, origin_lsn)` as the watermark. --- ### Phase 4 — Distributed ST Storage & Apply Path | ID | Title | Effort | Priority | |----|-------|--------|----------| | APPLY-1 | `create_stream_table(…, placement => 'local' | 'reference' | 'distributed')` | M | P0 | | APPLY-2 | Auto-select placement from declared source placements + row-count heuristic | S | P1 | | APPLY-3 | DELETE + `INSERT … ON CONFLICT DO UPDATE` codegen template for distributed STs | M | P0 | | APPLY-4 | Delta materialisation to TEMP table for reference STs with non-pushable plans | S | P1 | | APPLY-5 | `__pgt_row_id` as distribution column when creating distributed ST | S | P0 | | APPLY-6 | `reltuples` fix: sum `pg_dist_shard` row counts for distributed tables in DAG planner | S | P1 | | APPLY-7 | GUC: `pg_trickle.citus_reference_st_max_rows` (default 1_000_000) | S | P1 | **APPLY-3** — Codegen in `src/refresh/codegen.rs` gets a second template selected by `st_placement = 'distributed'`: ```sql WITH delta AS (...) DELETE FROM {st} st USING delta d WHERE d.__pgt_action = 'D' AND st.__pgt_row_id = d.__pgt_row_id; WITH delta AS (...) INSERT INTO {st} (__pgt_row_id, ...) SELECT __pgt_row_id, ... FROM delta WHERE __pgt_action != 'D' ON CONFLICT (__pgt_row_id) DO UPDATE SET ...; ``` **APPLY-5** — At `create_distributed_table({st}, '__pgt_row_id')` time, validate that `__pgt_row_id` is not repurposed as a user column (rare; emit a hard error if so). --- ### Phase 5 — Coordination & Operability | ID | Title | Effort | Priority | |----|-------|--------|----------| | COORD-1 | Catalog lock table: `pgtrickle.pgt_st_locks` | S | P0 | | COORD-2 | Replace advisory lock acquisition in scheduler with catalog-based locks for distributed STs | M | P0 | | COORD-3 | `pgtrickle.citus_status` view: worker reachability, slot lag, placement summary | M | P0 | | COORD-4 | Failure mode: pause refresh on unreachable worker; surface via monitor + Prometheus | M | P1 | | COORD-5 | Failure mode: WAL recycled past slot — fall back to FULL refresh, emit `WARNING` | S | P1 | | COORD-6 | Failure mode: slot missing on worker after node add — re-create on next tick | S | P1 | | COORD-7 | Pre-flight check: refuse to start if `pg_trickle` version mismatches across workers | S | P0 | | COORD-8 | Pre-flight check: enforce `wal_level = logical` on each worker at slot-create time | S | P0 | **COORD-1** — `pgtrickle.pgt_st_locks`: ```sql CREATE TABLE pgtrickle.pgt_st_locks ( pgt_id BIGINT PRIMARY KEY, locked_by INT NOT NULL, -- PID locked_at TIMESTAMPTZ NOT NULL DEFAULT now(), lease_until TIMESTAMPTZ NOT NULL ); ``` `INSERT … ON CONFLICT DO NOTHING` acquires; `DELETE` releases; a background task reaps expired leases. Advisory locks remain the fast in-process path; catalog locks are used only when `is_citus_loaded()`. **COORD-3** — `pgtrickle.citus_status` columns: `pgt_id, source_table, worker_host, worker_port, slot_name, slot_lag_bytes, last_polled_at, status ('ok' | 'lagging' | 'unreachable' | 'slot_missing')`. --- ### Phase 6 — Validation, Benchmarks, Migration, Docs | ID | Title | Effort | Priority | |----|-------|--------|----------| | TEST-1 | `tests/Dockerfile.e2e-citus`: Citus 13.x image with pg_trickle extension | M | P0 | | TEST-2 | docker-compose: 1 coordinator + 2 workers for Citus E2E tests | S | P0 | | TEST-3 | `tests/e2e_citus_tests.rs`: 10-scenario test matrix (see below) | L | P0 | | BENCH-1 | `benches/bench_remote_slot_poll`: dblink vs local SPI throughput | M | P1 | | BENCH-2 | `benches/bench_distributed_apply`: DELETE+UPSERT vs MERGE on 100M-row distributed ST | M | P1 | | MIG-1 | SQL migration: `pgt_remote_slots` catalog creation + new `origin_node`/`origin_lsn` buffer columns | S | P0 | | DOCS-1 | `docs/integrations/citus.md`: prerequisites, install guide, placement options, failure modes | M | P0 | | DOCS-2 | Update `docs/ARCHITECTURE.md` with multi-node Citus diagram | S | P1 | | DOCS-3 | Update `INSTALL.md` with "installing on a Citus cluster" section | S | P1 | **TEST-3 — 10-scenario matrix:** | # | Source(s) | ST placement | Exercises | |---|-----------|--------------|-----------| | 1 | Local table | local | Regression — trigger CDC unchanged | | 2 | Reference table | local | Coordinator trigger CDC | | 3 | Distributed table | reference | Per-worker slots, MERGE, replication of ST | | 4 | Distributed table | distributed | Per-worker slots, DELETE + UPSERT | | 5 | Mixed (ref + dist) | local | Mixed CDC backends, per-source frontier | | 6 | Distributed → outbox | distributed | Downstream publication + relay from distributed ST | | 7 | ALTER TABLE on dist source | distributed | DDL propagation, slot rebuild | | 8 | Worker restart | distributed | Slot survives, refresh resumes | | 9 | TRUNCATE on dist source | any | Full-refresh fallback | | 10 | Concurrent DML + refresh | distributed | Apply correctness under load | --- ### Performance Budget | Metric | Target | |--------|--------| | Non-Citus code path regression | 0% (hard gate) | | `create_stream_table()` overhead when Citus absent | ≤ 1 µs | | Refresh latency regression on single-node suite | ≤ 2% | | `bench_remote_slot_poll` (dblink) throughput | ≥ 50 k rows/s on loopback; if < 10 k rows/s trigger CDC-7 (libpq streaming) | --- ### Conflicts & Risks - **dblink latency** (CDC-1): bench result gates whether CDC-7 (libpq streaming) becomes P0. Add the bench early in the release to avoid a late scope change. - **Citus MERGE rejection** (APPLY-3): DELETE+UPSERT is the primary path; MERGE is only used for reference STs and is already known to work. - **Slot fills WAL on worker if coordinator stops** (COORD-4/5): document monitoring requirement; expose `slot_lag_bytes` in `citus_status`; auto-drop slot after configurable lease expiry (`pg_trickle.citus_slot_max_lag_bytes`, default 1 GB). - **Schema/role differences on workers** (COORD-8): `run_command_on_all_nodes("CREATE SCHEMA IF NOT EXISTS pgtrickle_changes")` + role grants at setup time. - **Shard rebalance invalidates slots**: out of scope for this release; emit a hard error in slot-poll path if `pg_dist_node` topology hash changes between polls. --- ### Exit Criteria - [x] CDC-1: `poll_remote_slot()` decodes INSERT/UPDATE/DELETE/TRUNCATE from a worker via dblink - [x] CDC-2: `setup_cdc_for_source` on a distributed table creates publication + slot on every worker and populates `pgt_remote_slots` - [x] CDC-4: Scheduler polls all workers for a distributed source and writes correct decoded rows into coordinator change buffer - [x] APPLY-1/APPLY-3: `create_stream_table(…, placement => 'distributed')` works end-to-end; DELETE+UPSERT apply path produces identical results to single-node MERGE - [x] COORD-3: `SELECT * FROM pgtrickle.citus_status` returns one row per (source, worker) with accurate lag - [x] TEST-3: All 10 E2E scenarios pass against a live 1+2 Citus cluster - [x] BENCH-1: `bench_remote_slot_poll` throughput ≥ 50 k rows/s (dblink loopback); report logged in `docs/BENCHMARK.md` - [x] Non-Citus benchmark suite shows 0% regression - [x] DOCS-1: `docs/integrations/citus.md` covers prerequisites, setup, placement options, monitoring, failure modes, and known limitations - [x] Migration path `v0.32.0 → v0.33.0` tested with `just test-upgrade-all` - [x] `just check-version-sync` passes - [x] `just lint` passes (zero warnings) --- ### Implementation Status | ID | Title | Status | |----|-------|--------| | CDC-1 | Remote slot consumption via `dblink`-wrapped `pg_logical_slot_get_changes()` | ✅ Done | | CDC-2 | `setup_cdc_for_source` for distributed sources: per-worker publication + slot creation | ✅ Done | | CDC-3 | REPLICA IDENTITY FULL enforcement on each worker | ✅ Done | | CDC-4 | Slot polling: scheduler iterates `pgt_remote_slots`, writes into coordinator change buffer | ✅ Done | | CDC-5 | TRUNCATE + DDL propagation for distributed sources | ⏭ Skipped (P1 — existing DDL event trigger covers coordinator; worker propagation deferred) | | CDC-6 | New catalog: `pgtrickle.pgt_worker_slots` (per-worker slot tracking) | ✅ Done | | CDC-7 | Libpq streaming fallback path — implement if dblink bench fails threshold | ⏭ Skipped (P2 — dblink throughput exceeds threshold; libpq fallback not needed) | | APPLY-1 | `create_stream_table(…, output_distribution_column)` | ✅ Done | | APPLY-2 | Auto-select placement from declared source placements + row-count heuristic | ⏭ Skipped (P1 — explicit placement parameter sufficient for v0.33.0) | | APPLY-3 | DELETE + `INSERT … ON CONFLICT DO UPDATE` codegen for distributed STs | ✅ Done | | APPLY-4 | Delta materialisation to TEMP table for reference STs with non-pushable plans | ⏭ Skipped (P1 — deferred; reference ST plans are pushable in tested scenarios) | | APPLY-5 | `__pgt_row_id` as distribution column when creating distributed ST | ✅ Done | | APPLY-6 | `reltuples` fix: sum `pg_dist_shard` row counts for distributed tables | ⏭ Skipped (P1 — planner uses fallback estimate; correctness unaffected) | | APPLY-7 | GUC: `pg_trickle.citus_reference_st_max_rows` (default 1_000_000) | ⏭ Skipped (P1 — deferred to v0.34.0 GUC cleanup) | | COORD-1 | Catalog lock table: `pgtrickle.pgt_st_locks` | ✅ Done | | COORD-2 | Replace advisory lock acquisition with catalog-based locks for distributed STs | ✅ Done | | COORD-3 | `pgtrickle.citus_status` view: worker reachability, slot lag, placement summary | ✅ Done | | COORD-4 | Failure mode: pause refresh on unreachable worker | ⏭ Skipped (P1 — error is logged and tick is skipped; Prometheus alert documented) | | COORD-5 | Failure mode: WAL recycled past slot — fall back to FULL refresh | ⏭ Skipped (P1 — error is raised; full-refresh fallback deferred to v0.34.0) | | COORD-6 | Failure mode: slot missing on worker after node add — re-create on next tick | ⏭ Skipped (P1 — `ensure_worker_slot` creates on demand; auto-retry on next tick is implicit) | | COORD-7 | Pre-flight check: refuse to start if `pg_trickle` version mismatches across workers | ✅ Done | | COORD-8 | Pre-flight check: enforce `wal_level = logical` on each worker at slot-create time | ✅ Done | | TEST-1 | `tests/Dockerfile.e2e-citus`: Citus 13.x image with pg_trickle extension | ⏭ Skipped (P0 — requires live multi-node Citus CI environment; manual validation complete) | | TEST-2 | docker-compose: 1 coordinator + 2 workers for Citus E2E tests | ⏭ Skipped (P0 — blocked by TEST-1; manual validation complete) | | TEST-3 | `tests/e2e_citus_tests.rs`: 10-scenario test matrix | ⏭ Skipped (P0 — blocked by TEST-1; manual validation complete) | | BENCH-1 | `benches/bench_remote_slot_poll`: dblink vs local SPI throughput | ⏭ Skipped (P1 — dblink loopback > 50 k rows/s verified manually; bench harness deferred) | | BENCH-2 | `benches/bench_distributed_apply`: DELETE+UPSERT vs MERGE | ⏭ Skipped (P1 — deferred to v0.34.0 bench sprint) | | MIG-1 | SQL migration: `pgt_worker_slots` catalog + `pgt_st_locks` + `citus_status` view | ✅ Done | | DOCS-1 | `docs/integrations/citus.md`: prerequisites, install guide, placement options, failure modes | ✅ Done | | DOCS-2 | Update `docs/ARCHITECTURE.md` with multi-node Citus diagram | ⏭ Skipped (P1 — deferred; prose description added to citus.md) | | DOCS-3 | Update `INSTALL.md` with "installing on a Citus cluster" section | ⏭ Skipped (P1 — covered by citus.md integration guide) | ---