> **Plain-language companion:** [v0.35.0.md](v0.35.0.md) ## v0.35.0 — Correctness Sprint, Reactive Subscriptions & Zero-Downtime Operations **Status: Planned.** Derived from [plans/PLAN_OVERALL_ASSESSMENT_7.md](plans/PLAN_OVERALL_ASSESSMENT_7.md) §3, §7, §9, §11, §13; A01–A06, A07–A08, A10–A11, A13, A19, A21–A24, A27–A28, A30, A32–A34, A37–A42; F1, F3, F17. > **Release Theme** > v0.35.0 is the mandatory quality gate before new feature development can > resume. It closes EC-01 (phantom-row residue in multi-table joins — flagged > in three consecutive overall assessments), hardens Citus distributed > coordination with a real chaos test rig, and wires the SQLSTATE error > classifier that shipped in v0.30.0 but was never connected to call sites. > On top of that foundation it delivers reactive subscriptions (PG NOTIFY on > every stream-table change), zero-downtime query evolution via shadow-ST, and > closes ~30 operational/observability/documentation gaps identified in the v7 > overall assessment. --- ### Correctness (P0 — Blocks v1.0) | ID | Title | Effort | Priority | |----|-------|--------|----------| | A01 | EC-01: unconditional PH-D1 cleanup on every join cycle | L | P0 | | A02 | EC-01: 50k-iteration property test for join-delta convergence | M | P0 | | A04 | Wire SQLSTATE error classifier end-to-end | S | P0 | | A05 | Fix `expect()` panic at `rewrites.rs:5471` | XS | P0 | | CORR-SUB | Reactive subscription: coalesce NOTIFY storms | S | P0 | **A01 — EC-01 unconditional PH-D1 cleanup.** `src/dvm/operators/join.rs:657-668` still hard-codes `is_deduplicated: false`, so the Z-set pipeline's MERGE path can accumulate a non-zero per-row-id residual when an insert-then-delete on the left side collides with a delete on the right within the same refresh cycle. `src/refresh/phd1.rs:34-99` implements a cross-cycle anti-join cleanup, but it is only invoked when the prior cycle reported a non-zero residual — and that flag is not set on every code path. Step 1: route every refresh cycle through PH-D1 unconditionally with batch size 1,024 rows. Cost is negligible (anti-join against the freshly-applied delta). This removes the residual-detection coupling and stops the bug from producing inconsistent aggregates or `UNIQUE_VIOLATION` errors. Step 2: re-engineer Part 2 row-id derivation so Part 1a and Part 1b emit convergent ids, flip `is_deduplicated: true` for INNER joins on stable PKs, gate behind a proptest corpus (see A02). **Schema change:** No. **A02 — EC-01 property-test corpus.** Add a 50,000-iteration proptest generator to `tests/e2e_property_tests.rs` that generates arbitrary insert/update/delete sequences on two- and three-table join sources, runs DIFFERENTIAL refresh, and asserts DIFFERENTIAL == FULL at each cycle. Shrink to minimal counterexamples; add regression fixtures for every TPC-H Q7/Q15 flake documented in repo memory. **A04 — Wire SQLSTATE classifier.** `src/error.rs:312-380` (`classify_spi_sqlstate_retryable`) exists but `SpiErrorCode` is constructed in zero call sites. Surface `pg_sys::ErrorData.sqlerrcode` from pgrx SPI errors via `SpiError::error_code()`, construct `SpiErrorCode { code, msg }` everywhere a `SpiError(String)` is constructed today. The legacy text matcher at `src/error.rs:282-340` stays as a final fallback for non-SPI sources. **Impact:** eliminates infinite retry loops on PostgreSQL builds with non-English `lc_messages`. **A05 — Fix `expect()` panic at `rewrites.rs:5471`.** `join_and_predicates()` calls `iter.next().expect("at least one predicate required")`. This is production code (the `#[cfg(test)]` block starts at line 5934). Change the signature to `Result` returning `Err(PgTrickleError::InvalidArgument("empty predicate set".into()))` and propagate with `?` at the two call sites. Half-day task. **CORR-SUB** — The subscribe API must coalesce rapid successive changes into a single NOTIFY payload when the refresh interval is shorter than the LISTEN client's poll loop. Implement a `pg_trickle.notify_coalesce_ms` GUC (default 250 ms) that debounces NOTIFY calls per stream table. --- ### Correctness (P1 — Citus Chaos) | ID | Title | Effort | Priority | |----|-------|--------|----------| | A03 | Citus chaos test rig: Docker Compose coordinator + 3 workers | L | P1 | | A27 | Citus upgrade regression with frozen v0.31.0 fixture | M | P1 | **A03 — Citus chaos test rig.** `src/citus.rs` implements `pgt_st_locks` (distributed advisory mutex via `INSERT … ON CONFLICT DO NOTHING` + TTL lease) and `poll_worker_slot_changes`, but none of the failure modes have a test: lease expiry during refresh, coordinator failover mid-lease, clock skew, partial `pg_dist_node` rebalance, worker death during poll, concurrent `handle_vp_promoted` calls. Create `tests/e2e_citus_chaos_tests.rs` backed by a Docker Compose Citus rig (coordinator + 3 workers). Drive: (a) kill-and-restart worker mid-poll, (b) coordinator restart mid-lease, (c) `pg_dist_node` removal + re-add, (d) 1k-stream-table refresh under sustained node churn. Gate on a new `citus-tests.yml` workflow that runs on push-to-main. **A27 — Frozen fixture for stable-name backfill.** No test asserts that a `pgt_change_tracking` row written at v0.31.0 is correctly re-keyed by the v0.32.0 stable-name backfill migration. Add a binary dump fixture from v0.31.0 to the upgrade test suite and assert the expected stable names after migration. --- ### Ease of Use / Operational | ID | Title | Effort | Priority | |----|-------|--------|----------| | A06 | Unit tests for inbox/outbox | M | P0 | | A07 | `pg_trickle.enabled = false` gates CDC trigger writes | S | P1 | | A08 | `pg_trickle.force_full_refresh` override GUC | XS | P1 | | A10 | History prune in dedicated bgworker with LIMIT | S | P1 | | A11 | `start_time` index on `pgt_refresh_history` | XS | P1 | | A13 | Citus lease acquisition: randomised backoff on conflict | XS | P1 | | CITUS-XSHARD | Citus: detect non-co-located ST↔source join plans and emit advisory | S | P1 | | A22 | Emit `pgrx::notice!()` on every FULL fallback | S | P1 | | A23 | `pgtrickle.explain_stream_table()` operator-tree visualiser | S | P1 | | A24 | Generated GUC catalogue in docs (CI sync check) | S | P2 | | A37 | Grafana p50/p99 refresh latency panels + alert rules | S | P1 | | A42 | Multi-architecture Docker images (arm64 + amd64) | S | P1 | | F17 | `pgtrickle.sla_summary()` view: per-ST p50/p99, freshness, error budget | S | P1 | | UX-SUB | `pgtrickle.subscribe(name, channel)` / `unsubscribe()` / `list_subscriptions()` | M | P0 | | UX-GUC | `pg_trickle.notify_coalesce_ms` GUC (default 250 ms) | XS | P0 | | UX-SHADOW | `ALTER QUERY shadow_build := true` shadow-ST zero-downtime mode | L | P1 | | UX-STATUS | `pgtrickle.view_evolution_status(name)` monitoring function | S | P1 | **A06 — Inbox/outbox unit tests.** `src/api/inbox.rs` (1,034 LOC) and `src/api/outbox.rs` (942 LOC) have no `#[cfg(test)] mod tests` block. For an at-least-once + dedup guarantee, E2E coverage alone is too thin. Add unit tests for: dedup-key roundtrip, envelope encode/decode, NULL value handling, version-skew rejection, proptest-driven envelope fuzzing. **A07 — CDC kill switch fix.** `pg_trickle.enabled = false` stops scheduler dispatching but CDC triggers continue writing to `pgtrickle_changes.changes_`. Add a fast-path check at the top of every CDC trigger body that returns `NULL` (no-op) when `pg_trickle.enabled = false`. Also add a `pg_trickle.cdc_paused` GUC for a durable hold that survives session reconnects. **A08 — Force-full-refresh override.** Per-ST `refresh_mode` in `pgt_stream_tables` takes precedence over the cluster GUC `pg_trickle.refresh_strategy`. An SRE who sets the cluster to FULL for diagnosis still sees DIFFERENTIAL on STs with an explicit row value. Add `pg_trickle.force_full_refresh = bool` (default `false`) that overrides per-ST mode for the duration the GUC is set. **A10 — History prune bgworker.** At 1,000 STs × 12 refreshes/min the `pgt_refresh_history` table sustains ~720k inserts/hour. The prune DELETE is O(rows-deleted) per pass with no `LIMIT` and no cancel guard. Move the prune to a dedicated `bgworker_history_pruner` background worker on a configurable interval (default 60 s) with `DELETE … LIMIT 10000`. **A11 — History table index.** Several monitor queries scan `pgt_refresh_history` by `start_time DESC` alone. At 10M rows this is multi-second. Add a secondary index on `start_time`; with the retention prune in place this stays small. **A13 — Citus lease backoff.** `pgt_st_locks` acquisition retries on conflict with no delay. With multiple coordinators racing, this produces O(N²) attempts/second. Add 50–500 ms randomised jitter on each `INSERT … ON CONFLICT DO NOTHING` failure. **CITUS-XSHARD — Cross-shard join plan advisory.** When a `distributed` stream table's `__pgt_row_id` distribution column does not match the source table's Citus distribution column, any query that JOINs the ST back to its source produces a re-partition or broadcast join (PLAN_CITUS.md §P4.4). This is documented but there is no runtime signal; users discover it only by examining `EXPLAIN` output after the fact. At `create_stream_table()` time, when `st_placement = 'distributed'`: 1. Compare the ST's inferred distribution column (from `output_distribution_column` or the auto-selected `__pgt_row_id`) against the distribution column of each distributed source in the query, using `pg_dist_partition.partkey`. 2. If no source column matches, emit: `NOTICE: stream table "" is distributed on __pgt_row_id which does not co-locate with source "". Queries that JOIN this stream table to its source will use cross-shard joins (re-partition or broadcast). Consider passing output_distribution_column => '' to avoid this.` 3. Record the co-location status in a new nullable column `pgtrickle.pgt_stream_tables.citus_colocated_with TEXT` (NULL = local/reference, `''` = co-located, `'none'` = non-co-located). Expose it in `pgtrickle.citus_status`. No schema migration is needed beyond the nullable column add (existing rows default to NULL). The advisory is also re-emitted by `alter_stream_table()` if the distribution is changed. **A22 — FULL fallback NOTICE.** When the DVM parser falls back to FULL refresh (SubLink-in-OR walker miss, non-monotone aggregate, etc.) it does so silently. Emit `pgrx::notice!("stream table \"{}\" using FULL refresh: {}", name, reason)` at every fallback path. **A23 — `EXPLAIN STREAM TABLE`.** `pgtrickle.explain_stream_table(name TEXT, format TEXT DEFAULT 'text')` walks the cached OpTree and returns a textual/JSON/DOT rendering showing which DVM operators will be invoked and which fall back to FULL recompute. Format mirrors PostgreSQL's `EXPLAIN (FORMAT TEXT|JSON)`. **A24 — GUC catalogue sync.** `src/config.rs` defines 111 GUCs; `docs/CONFIGURATION.md` does not enumerate all of them. Add a CI step that generates a Markdown table from `GucRegistry` definitions and fails if the table in docs is out of sync. **A37 — Grafana SLI panels.** Add p50/p99 refresh latency panels, change-buffer depth and CDC lag panels, and Prometheus alert rules to `monitoring/grafana/`. Create `monitoring/grafana/alerts/pg_trickle.yml` with at least: latency p99 > 30 s, change-buffer depth > 100k rows, slot lag > critical threshold. **A42 — Multi-arch images.** Build and push `linux/arm64` + `linux/amd64` manifests for the Docker Hub and GHCR images in the release workflow. **F17 — SLA summary function.** `pgtrickle.sla_summary()` returns one row per ST with columns `p50_ms`, `p99_ms`, `freshness_lag_s`, `error_rate`, `error_budget_remaining` computed over the last `sla_window_hours` (GUC, default 24 h). Drives the Grafana dashboard and on-call runbooks. **UX-SUB — Reactive subscription API.** `pgtrickle.subscribe(stream_table TEXT, channel TEXT)` emits `pg_notify(channel, payload)` on every non-empty refresh within the same transaction. Payload: `{"name": …, "refresh_id": …, "inserted_count": N, "deleted_count": N}`. `pgtrickle.unsubscribe(name, channel)` and `pgtrickle.list_subscriptions()` complete the API. **Schema change:** new `pgtrickle.pgt_subscriptions` catalog table. **UX-SHADOW — Shadow-ST zero-downtime ALTER QUERY.** `alter_query(name, new_query, shadow_build := true)` creates `__pgt_shadow_`, refreshes it to convergence without locking the live table, then atomically swaps storage tables. The live table is readable and writable throughout. Ships behind a feature flag. **Schema change:** add `in_shadow_build BOOLEAN` and `shadow_table_name TEXT` to `pgtrickle.pgt_stream_tables`. --- ### Performance | ID | Title | Effort | Priority | |----|-------|--------|----------| | A21 | IVM lock-mode fallback Prometheus counter | XS | P2 | | CITUS-BENCH | Citus: benchmark `dblink` vs streaming libpq for per-worker slot polling | M | P1 | **A21 — IVM lock-mode fallback metric.** `src/ivm.rs:48-91`: on parse failure IVM defaults to `Exclusive` mode (correct, fail-closed) but emits no metric. Add `pgtrickle_ivm_lock_mode_fallback_total{reason="parse_error"}` so operators can see how often this occurs. **CITUS-BENCH — `dblink` vs streaming libpq poll benchmark.** `src/scheduler.rs` (via `src/citus.rs:poll_worker_slot_changes`) currently polls per-worker WAL slots through a `dblink`-wrapped `pg_logical_slot_get_changes()` call (PLAN_CITUS.md §3.1 option A). This has never been benchmarked. Streaming libpq (`START_REPLICATION`) would give push-based wakeups with sub-tick delivery latency at the cost of a persistent connection per worker. Add `benches/bench_remote_slot_poll.rs` that measures: - `dblink` + `pg_logical_slot_get_changes()` throughput (rows/s, p50/p99 latency) at 1, 4, 9 workers with 100k CDC events per source - Native libpq streaming poll throughput under the same conditions If the streaming path delivers ≥ 30% lower p99 latency **or** ≥ 20% higher throughput at 9 workers, migrate `poll_worker_slot_changes` to the libpq path in the same PR. Record the decision (and the numbers) in `docs/BENCHMARK.md` under a new "Citus Distributed CDC" section. If `dblink` is within tolerance, close OQ#1 in PLAN_CITUS.md and document the rationale so it is not re-opened. --- ### Safety | ID | Title | Effort | Priority | |----|-------|--------|----------| | A19 | Add `// SAFETY:` comments to all unsafe blocks in `rewrites.rs` | S | P2 | **A19 — SAFETY comments in rewrites.rs.** 67 unsafe blocks in `src/dvm/parser/rewrites.rs` lack inline `// SAFETY:` comments (standard pgrx FFI patterns). Add comments to satisfy the AGENTS.md convention and make future refactors safer. --- ### Ecosystem | ID | Title | Effort | Priority | |----|-------|--------|----------| | A28 | Lightweight SQLancer run on every PR (100-case budget) | S | P1 | | A30 | Relay: `${ENV:VAR}` secret interpolation in connection strings | S | P1 | | A38 | Relay: exponential reconnection backoff | S | P1 | | A39 | Relay: backpressure — pause ingestion when sink is slow | M | P0 | | A41 | dbt Hub package submission | XS | P2 | **A28 — SQLancer on PR gate.** `.github/workflows/sqlancer.yml` runs Sundays + manual dispatch only. Add a 100-case fast-path run (5–10 min budget) to the PR workflow to catch DVM-vs-FULL equivalence regressions earlier. **A30 — Relay secret interpolation.** `pgtrickle-relay/src/config.rs` accepts connection strings in plaintext TOML. Add `${ENV:VAR_NAME}` expansion at load time and document the pattern in `docs/RELAY_GUIDE.md`. **A38 — Relay reconnection backoff.** The relay coordinator reconnects immediately on sink failure. Add exponential backoff (initial 100 ms, max 30 s, jitter ±20%) to prevent thundering-herd on sink restart. **A39 — Relay backpressure.** The relay has no upper bound on in-flight messages per sink. If a sink is slow, memory grows unboundedly. Add a configurable `sink_max_inflight` GUC (default 1,000 messages) that pauses upstream polling when the in-flight window is full. --- ### Documentation | ID | Title | Effort | Priority | |----|-------|--------|----------| | A32 | Citus end-to-end tutorial | M | P1 | | A33 | Outbox → relay → Kafka tutorial | M | P1 | | A34 | `pg_trickle_dump` runbook | S | P2 | **A32 — Citus tutorial.** Create `docs/tutorials/CITUS_DISTRIBUTED.md`: install Citus → install pg_trickle → create a distributed source table → create a co-located distributed stream table → observe replication lag via `citus_status`. **A33 — Outbox→relay→Kafka tutorial.** Create `docs/tutorials/OUTBOX_RELAY_KAFKA.md`: end-to-end code samples for configuring the transactional outbox, starting the relay connector, and consuming messages in Kafka with exactly-once semantics. **A34 — `pg_trickle_dump` runbook.** `src/bin/pg_trickle_dump.rs` (458 LOC) has no documentation. Create `docs/PG_TRICKLE_DUMP.md` describing what it does, when to run it, and how to interpret its output. --- ### Test Coverage | ID | Title | Effort | Priority | |----|-------|--------|----------| | TEST-SUB1 | E2E: `subscribe()` receives NOTIFY on every non-empty refresh | M | P0 | | TEST-SUB2 | E2E: NOTIFY coalescing under high-frequency refresh | S | P1 | | TEST-SHA1 | E2E: shadow-ST ALTER QUERY while reads/writes are in flight | L | P1 | | TEST-SHA2 | E2E: shadow-ST rollback if new query fails to converge | M | P1 | --- ### Conflicts & Risks - **A01/A02** (EC-01 fix) modifies `src/dvm/operators/join.rs` and `src/refresh/phd1.rs` — changes here must pass the full TPC-H suite before merging. Do not merge EC-01 fix until 50k-iteration proptest is green. - **UX-SHADOW** (shadow-ST) touches the refresh orchestrator — the highest-change-risk module. Ships behind a feature flag; do not remove the flag until TPC-H validation is complete. - Shadow-ST doubles CDC write overhead during the build window. Add `shadow_refresh_throttle_ms` GUC to rate-limit background refreshes. - **A03** (Citus chaos rig) requires Docker Compose with Citus image in CI; add to the `citus-tests.yml` workflow only (not the standard PR matrix). ### Exit Criteria - [ ] A01: Every join refresh cycle routes unconditionally through PH-D1; no UNIQUE_VIOLATION in TPC-H Q7/Q15 IMMEDIATE over 1,000 cycles - [ ] A02: 50k-iteration proptest passes with zero counterexamples; regression fixtures added for all prior flakes - [ ] A03: Citus chaos rig passes all 4 scenarios; `citus-tests.yml` green on push-to-main - [ ] CITUS-BENCH: bench results committed to `docs/BENCHMARK.md`; libpq migration decision recorded in PLAN_CITUS.md OQ#1 - [ ] CITUS-XSHARD: non-co-located ST emits `NOTICE` at create time; `citus_colocated_with` column added to `pgt_stream_tables` and visible in `citus_status` - [ ] A04: `SpiErrorCode` variant constructed at every SPI call site; `classify_spi_sqlstate_retryable` is the primary classifier - [ ] A05: `join_and_predicates()` returns `Result`; no `expect()` in production code - [ ] A06: `inbox.rs` and `outbox.rs` each have `#[cfg(test)] mod tests` with dedup, envelope, and proptest coverage - [ ] A07: CDC triggers return `NULL` when `pg_trickle.enabled = false`; `cdc_paused` GUC documented - [ ] A39: Relay backpressure tested: in-flight window caps at `sink_max_inflight` - [ ] UX-SUB: `subscribe()` / `unsubscribe()` / `list_subscriptions()` registered; NOTIFY emitted on non-empty refresh - [ ] UX-SHADOW: Shadow-ST builds in background; atomic swap; live table readable throughout - [ ] F17: `pgtrickle.sla_summary()` returns per-ST p50/p99, freshness, error budget - [ ] Extension upgrade path tested (`0.34.0 → 0.35.0`) - [ ] `just check-version-sync` passes ---