# v0.86.0 — Distributed Delta Computation > **Status:** Planned > **Scope:** Very Large > **Driven by:** [Assessment 16](../plans/PLAN_OVERALL_ASSESSMENT_16.md) — LT-3, LT-4, LT-10 ## Theme Enable delta computation to be distributed across multiple worker processes, potentially on different hosts. Partitioned delta computation divides work by source key ranges; read-replica execution offloads compute entirely from the primary; and zero-copy shared-memory protocols minimize transfer overhead for co-located workers. ## Items ### LT-3: Partitioned Delta Computation For large stream tables with high delta volume, partition the delta computation across multiple workers by source key ranges: **Architecture:** ``` Source Table (partitioned by hash(id) into 8 ranges) │ ├─ Range [0, 1B) → Worker A (computes partial delta) ├─ Range [1B, 2B) → Worker B (computes partial delta) ├─ Range [2B, 3B) → Worker C (computes partial delta) └─ ... │ ▼ Coordinator merges partial deltas → final MERGE ``` **Implementation:** 1. **Partition assignment:** Coordinator divides change buffer rows by hash(primary_key) % num_workers 2. **Partial delta queries:** Each worker executes the delta SQL with a `WHERE __pgt_pk_hash BETWEEN $1 AND $2` predicate pushed into the scan 3. **Result assembly:** Workers write partial results to temp tables; coordinator executes final MERGE from UNION ALL of partials 4. **Consistency:** All workers see the same frontier (read from coordinator) **Requirements:** - Source table must have a primary key (for hash partitioning) - Stream table query must be partition-safe (no cross-partition dependencies in aggregates — i.e., GROUP BY key must be a superset of partition key) - Worker count configurable per-ST: `ALTER STREAM TABLE ... SET (partitions = 4)` **Non-partition-safe fallback:** STs with cross-partition aggregates (e.g., global SUM) use a two-phase approach: local partial aggregates → global combine step. ### LT-4: Read-Replica DVM Execution Run delta computation on read replicas (streaming replication), applying only the final MERGE results to the primary: **Architecture:** ``` ┌─────────────────────────┐ │ Primary (writes only) │ │ • Source tables │ │ • Stream tables │ │ • Change buffers │ │ • Receives MERGE from │ │ replica workers │ └────────────┬────────────┘ │ streaming replication ▼ ┌─────────────────────────┐ │ Read Replica │ │ • Source tables (RO) │ │ • Change buffers (RO) │ │ • DVM Workers read here │ │ • Delta computation │ │ (no writes to PG) │ └────────────┬────────────┘ │ delta results (via tokio-postgres to primary) ▼ ┌─────────────────────────┐ │ Primary │ │ • MERGE delta into ST │ │ • Advance frontier │ └─────────────────────────┘ ``` **Benefits:** - Zero compute load on primary for delta SQL execution - Primary only handles the final MERGE (typically much smaller than delta) - Leverages existing streaming replication infrastructure - Change buffers are available on replica (WAL-replicated tables) **Challenges and solutions:** - **Replication lag:** Workers must wait for change buffer rows to be visible on replica. Use `pg_last_wal_replay_lsn()` to detect readiness. - **Frontier coordination:** Frontiers are read from primary (via dedicated connection), not from replica (which may be behind). - **Write conflict:** MERGE on primary must handle the case where source data changed between replica read and primary write. Use SAVEPOINT + retry. ### LT-10: Zero-Copy Change Buffer Protocol For workers co-located with PostgreSQL (same host, sidecar deployment), use shared memory and UNIX domain sockets for zero-copy transfer of change events: **Protocol:** 1. CDC trigger/consumer writes change events to a memory-mapped file (`/dev/shm/pgtrickle/changes_`) 2. Worker maps the same file and reads events directly (no serialization) 3. Coordination via futex-based signaling (producer signals consumer on write) 4. Cleanup via reference counting (events freed when all consumers advance past) **Memory layout:** ``` ┌─────────────────────────────────────────────┐ │ Header: magic, version, write_pos, read_pos │ ├─────────────────────────────────────────────┤ │ Event[0]: lsn, action, data_offset, data_len│ │ Event[1]: ... │ │ ... │ ├─────────────────────────────────────────────┤ │ Data region (variable-length row payloads) │ └─────────────────────────────────────────────┘ ``` **Fallback:** When shared memory is unavailable (remote workers, different hosts), falls back to COPY BINARY protocol over TCP. ### Worker Affinity & Data Locality Optimize worker assignment based on data locality: - **Partition-aligned scheduling:** Assign workers to ST partitions that match the source table's physical partition layout - **NUMA-aware:** On multi-socket systems, prefer workers on the same NUMA node as the PostgreSQL shared buffers containing the relevant pages - **Sticky assignment:** Workers that previously processed an ST retain affinity (warm caches, prepared statements) unless load balancing requires reassignment - **Cost-based rebalancing:** Periodically (every 60s) evaluate worker utilization and rebalance assignments to equalize load