> **Plain-language companion:** [v0.28.0.md](v0.28.0.md) ## v0.28.0 — Transactional Inbox & Outbox Patterns **Status: Released.** Driven by [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) and [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md). Outbox helper moved here from v0.22.0 to ship alongside the inbox helper and production-grade advanced features as a complete transactional messaging solution. > **Release Theme** > This release delivers a **complete, production-grade solution** for the two > most common event-driven integration patterns in microservice architectures. > **Part A (Essential)** ships the Transactional Outbox (reliable atomic event > publication) and Transactional Inbox (reliable idempotent event consumption) > as zero-boilerplate SQL helpers. **Part B (Advanced)** adds Consumer Groups > for coordinated multi-relay outbox polling with Kafka-style offset tracking, > visibility timeouts, and lag monitoring — and Ordered Processing for the > inbox, including per-aggregate sequence ordering, gap detection, priority > queues, and partition-affinity helpers for competing workers. Together, > Parts A and B let pg_trickle users build reliable, exactly-once event > pipelines that scale from a single relay to multi-instance deployments, > using nothing but PostgreSQL. > > See [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) > and [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) > for the full architecture and API design. --- ### Known Limitations in v0.28.0 | Limitation | Rationale | Future Path | |------------|-----------|-------------| | **Outbox requires DIFFERENTIAL mode.** `enable_outbox()` on `IMMEDIATE`-mode stream tables returns `OutboxRequiresNotImmediateMode`. | Outbox writes one row per refresh cycle inside the refresh transaction. IMMEDIATE refreshes fire inside every source transaction; adding an outbox INSERT there imposes that cost on every application write. | Post-1.0 opt-in GUC if demand justifies. | | **Ordering and priority are mutually exclusive per inbox.** Calling both `enable_inbox_ordering()` and `enable_inbox_priority()` on the same inbox returns `InboxOrderingPriorityConflict`. | Per-aggregate sequence ordering must surface the next message in sequence regardless of priority level; priority tiers violate that guarantee. | Use separate inboxes per priority class, each with `enable_inbox_ordering()` applied independently. | | **Gap detection degrades above ~100K aggregates.** The `gaps_` stream table uses `LEAD()` over pending messages, which is O(N log N) in pending message count — not O(sequence range). This is a significant improvement over the `generate_series` approach; however, refresh time still scales with pending message volume. | Acceptable up to ~1M pending messages at 30 s schedule. Above 10M pending messages, auto-refresh may be slow; use `inbox_ordering_gaps()` for on-demand checks. | Post-v0.28.0: delta-based detection scanning only aggregates with recent activity. | | **Consumer groups provide at-least-once delivery per consumer instance, not exactly-once globally.** | Exactly-once is achieved by composition: relay uses broker idempotency keys; inbox uses `ON CONFLICT (event_id) DO NOTHING`. Three-layer deduplication is more resilient than a monolithic exactly-once guarantee. | Design decision. Documented in PATTERNS.md and SQL_REFERENCE.md. | | **AUTO mode may fall back to FULL refresh while outbox is enabled.** When AUTO refresh falls back to FULL, the outbox header row carries `"full_refresh": true`. If the number of current rows exceeds `outbox_inline_threshold_rows`, the claim-check path applies: rows land in `outbox_delta_rows_` and the relay fetches via cursor. A `pg_trickle_alert outbox_full_refresh` event is emitted regardless of which path is taken. Relays must detect the `full_refresh` flag, apply snapshot semantics (upsert rather than publish-as-new), and handle either inline or claim-check payloads. | AUTO refresh adapts to IVM cost at runtime; blocking the FULL fallback permanently would compromise the adaptation that makes AUTO useful. The sentinel flag preserves correctness; the claim-check path prevents memory exhaustion on large tables. | Reference relay updated in OUTBOX-8 to demonstrate all combinations. Post-v0.28.0: consider a GUC to disable FULL fallback per ST when outbox is enabled. | | **`next_` ordered ST scans all processed rows.** The `last_processed` CTE in the aggregate-ordered ST runs `MAX(sequence_num) GROUP BY aggregate_id` over every processed row on each refresh. For inboxes with large volumes of processed history this grows without bound. | A partial index `(aggregate_id, sequence_num) WHERE processed_at IS NOT NULL` is created by `enable_inbox_ordering()` to mitigate this at v0.28.0, making it an index-only scan. Scaling thresholds: < 100K rows → < 5 ms at 1 s schedule; 100K–1M → increase schedule to `5s`; > 1M → increase to `10s–30s`; > 10M → use `inbox_ordering_gaps()` on-demand only. | Post-v0.28.0: introduce `pgt_inbox_sequence_state` catalog table updated atomically via `advance_inbox_sequence()`, making the CTE O(changed aggregates). | | **Global consumer monitoring STs created once, not reference-counted.** `pgt_consumer_status`, `pgt_consumer_group_lag`, `pgt_consumer_active_leases` are auto-created on the first `create_consumer_group()` call. They must be created idempotently and torn down only when the last consumer group for an outbox is dropped. | A single set of monitoring STs per outbox is correct and cheaper than per-group STs. | Implementation: `create_stream_table()` called with `if_not_exists := true`; `drop_consumer_group()` decrements a reference count and drops STs at zero. | | **Outbox relay latency bounded by poll interval.** Relays discover new outbox rows by polling. The pg_trickle extension emits `pg_notify('pgtrickle_outbox_new', outbox_table_name)` after each outbox INSERT (v0.28.0), but the `pgtrickle-relay` binary does not yet use LISTEN — it starts polling on the standard interval. Minimum relay latency today equals the poll interval (`visibility_seconds`). | The NOTIFY is cheap (≈2 µs, inside the existing refresh transaction) and is emitted from v0.28.0 onwards so relay authors can begin using it immediately. The `pgtrickle-relay` CLI will use LISTEN/NOTIFY in v0.29.0. | v0.29.0 relay: subscribe to `pgtrickle_outbox_new` for sub-100 ms wake-up (see E2E latency benchmark in PLAN_RELAY_CLI.md §E.5). | | **`replay_inbox_messages()` accepts only explicit event ID lists.** A free-form `where_clause` parameter was removed to eliminate SQL injection risk. | `EXPLAIN`-based validation of dynamic SQL is insufficient; parameterised `WHERE event_id = ANY($1)` is the safe API. | Operators who need filter-based replay should run a parameterised `SELECT ARRAY_AGG(event_id) ... WHERE ` first, then pass the result to `replay_inbox_messages()`. | --- ### Part A — Essential Patterns #### Transactional Outbox Helper (P2 — §9.12) > **In plain terms:** After each DIFFERENTIAL refresh cycle, pg_trickle > writes a row to `pgtrickle.outbox_` within the same transaction as > the MERGE — either both succeed or neither does. For small deltas the row > carries a versioned inline JSON payload `{"v":1, "inserted":[…], > "deleted":[…]}`. For large deltas (above `outbox_inline_threshold_rows`, > default 10 000 rows) the row carries a lightweight claim-check header > `{"v":1, "claim_check": true, …}` and the actual rows land in the > companion table `pgtrickle.outbox_delta_rows_`, which the relay > reads via a server-side cursor in bounded batches — constant memory > regardless of delta size. Eliminates the dual-write problem for > downstream event buses without a CDC connector or external replication > slot. | Item | Description | Effort | Ref | |------|-------------|--------|-----| | OUTBOX-1 | **Catalog + SQL functions.** `pgt_outbox_config` catalog table. `enable_outbox(name, retention_hours)` / `disable_outbox(name, if_exists)` SQL functions. `OutboxAlreadyEnabled`, `OutboxNotEnabled`, `OutboxRequiresNotImmediateMode` error variants. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §A.1–A.2 | | OUTBOX-2 | **Outbox table creation.** `pgtrickle.outbox_` with `id BIGSERIAL`, `pgt_id UUID`, `refresh_id UUID`, `created_at`, `inserted_count INT`, `deleted_count INT`, `is_claim_check BOOLEAN DEFAULT false`, `payload JSONB`. Index on `created_at`. Naming: 7-byte `outbox_` prefix + up to 56-byte stream table name; collision resolution appends 7-char hex suffix derived from `left(md5(name), 7)`. Final name stored in `pgt_outbox_config.outbox_table_name`. Also creates: (a) **latest-row view** `pgtrickle.pgt_outbox_latest_` (`ORDER BY id DESC LIMIT 1`) for quick lag inspection and operational checks; (b) **delta rows table** `pgtrickle.outbox_delta_rows_` with `outbox_id BIGINT REFERENCES outbox_(id)`, `row_num INT`, `op CHAR(1) CHECK (op IN ('I','D'))`, `payload JSONB`, `PRIMARY KEY (outbox_id, row_num)` — populated only for claim-check entries. *(Note: `pgt_consumer_claim_check_acks` is created in Part B / OUTBOX-B1, not here — it has no purpose without consumer groups.)* All objects dropped alongside the outbox table. | 1d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §A.3 | | OUTBOX-3 | **Refresh-path integration.** After successful MERGE, if outbox is enabled, INSERT outbox row within the same transaction — **unless** `outbox_skip_empty_delta = true` (default) and `inserted_count = 0 AND deleted_count = 0`, in which case no INSERT or NOTIFY is issued, saving write amplification on quiet refresh cycles. In-memory `outbox_enabled_set` cache with DDL-triggered invalidation. Hot-path cost < 50 ns when disabled. **Routing:** if `delta_row_count <= outbox_inline_threshold_rows`, serialise `Vec` to inline JSONB as before. If `delta_row_count > outbox_inline_threshold_rows`, write `is_claim_check = true` header row first (no payload), then INSERT delta rows into `outbox_delta_rows_` in batches controlled by `outbox_claim_check_batch_size` GUC (default 1 000 rows/call) — keeping Rust heap bounded regardless of delta size. Both writes are in the same transaction. **FULL-refresh fallback:** when AUTO mode falls back to FULL refresh, the outbox header row additionally carries `"full_refresh": true`; if row count exceeds the threshold the claim-check path applies; a `pg_trickle_alert outbox_full_refresh` event is emitted so relays apply snapshot semantics. **NOTIFY:** emit `pg_notify('pgtrickle_outbox_new', outbox_table_name)` inside the same transaction after the outbox INSERT, enabling relay authors to use LISTEN for sub-second wake-up (cost: ~2 µs per refresh; skipped when empty-delta skip applies). | 1.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §A.4 | | OUTBOX-4 | **Versioned payload format — two paths.** **Inline path** (small delta): `{"v":1, "inserted":[…], "deleted":[…]}` with `to_jsonb()` type mapping. **Claim-check path** (large delta): `{"v":1, "claim_check": true, "inserted_count": N, "deleted_count": N, "refresh_id": "…"}` — no row data in the outbox row itself; relay reads `outbox_delta_rows_` via server-side cursor and calls `outbox_rows_consumed(stream_table, outbox_id)` when done. FULL-fallback payloads additionally set `"full_refresh": true` in the header; claim-check applies when the full-refresh row count exceeds the threshold. GUC `outbox_inline_threshold_rows` (default 10 000 rows) controls the routing threshold. **No truncation path** — data is never silently dropped. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §A.5 | | OUTBOX-5 | **Retention drain.** Scheduler cleanup step: batched DELETE on `outbox_` with `outbox_drain_batch_size` GUC (default 10 000). Cascades to `outbox_delta_rows_` via FK `ON DELETE CASCADE` — no separate drain step needed for delta rows. Per-ST or global `outbox_retention_hours` (default 24). `last_drained_at` / `last_drained_count` tracked in catalog. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §A.6 | | OUTBOX-6 | **Lifecycle & cascade.** `drop_stream_table()` cascades to outbox table + delta rows table + metadata. `alter_stream_table()` errors if column set changed while outbox enabled. `outbox_status()` monitoring function (includes `claim_check_pending_count` and `storage_status` fields). `outbox_rows_consumed(stream_table TEXT, outbox_id BIGINT)` SQL function: called by relay after cursor consumption to record per-group completion in `pgt_consumer_claim_check_acks`; idempotent. **Note:** `stream_table` takes the stream table name (as registered in `pgt_stream_tables`), not the outbox table name — the function resolves the outbox table via `pgt_outbox_config`. **8 Part-A GUCs** (`outbox_enabled`, `outbox_retention_hours`, `outbox_drain_batch_size`, `outbox_inline_threshold_rows`, `outbox_claim_check_batch_size`, `outbox_drain_interval_seconds`, `outbox_storage_critical_mb`, `outbox_skip_empty_delta`) + 4 Part-B GUCs (`consumer_dead_threshold_hours`, `consumer_stale_offset_threshold_days`, `consumer_cleanup_enabled`, `outbox_force_retention`). | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §A.7–A.8 | | OUTBOX-7 | **Tests & benchmark.** Unit: enable/disable/validation/naming/cascade. Integration: end-to-end inline outbox write; claim-check triggered at threshold boundary (N = threshold, N = threshold+1); delta rows populated atomically in same transaction; relay cursor-fetch returns all rows in order; `outbox_rows_consumed()` idempotency; retention drain cascades delta rows via FK; rollback on outbox INSERT failure leaves no orphan delta rows; `pg_notify('pgtrickle_outbox_new', ...)` emitted. Benchmark gates: (a) `refresh_no_outbox` vs `refresh_outbox_inline` vs `refresh_outbox_claim_check` — < 10 % overhead at inline threshold, < 25 % at large payloads; (b) `poll_outbox()` < 5 ms at 10K outbox rows; (c) `commit_offset()` < 10 ms with 10 concurrent relays; (d) `consumer_lag()` < 50 ms at 100K outbox rows; (e) E2E latency benchmark `benches/e2e_outbox_latency.rs`: p50 < 1.5 s (polling), p95 < 2.5 s (see PLAN_RELAY_CLI.md §E.5). | 1.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §D | | OUTBOX-8 | **Documentation & examples.** SQL_REFERENCE.md: outbox API + both payload formats (inline and claim-check) + `outbox_rows_consumed()` + `pgtrickle_outbox_new` NOTIFY channel. CONFIGURATION.md: 7 GUCs (replacing `outbox_max_payload_bytes` with `outbox_inline_threshold_rows`; adding `outbox_claim_check_batch_size` and `outbox_storage_critical_mb` with tuning table). PATTERNS.md: Transactional Outbox section including claim-check relay pattern; WAL overhead analysis; backpressure guidance for dead consumers (`outbox_storage_critical_mb` alert workflow). Reference Python relay (`examples/relay/outbox_relay.py`) demonstrates both inline and claim-check paths. | 1d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §C, §E | > **Outbox essential subtotal: ~7 days** #### Transactional Inbox Helper > **In plain terms:** `create_inbox('payment_inbox')` creates a > production-grade inbox table with auto-managed stream tables for the > pending-message queue, dead-letter queue, and processing statistics. > Applications write to the inbox (`ON CONFLICT DO NOTHING` for dedup), > process messages from the pending stream table, and pg_trickle handles > DLQ routing, alerts, retention, and monitoring automatically. > `enable_inbox_tracking()` adopts an existing inbox table into pg_trickle's > monitoring without schema changes. | Item | Description | Effort | Ref | |------|-------------|--------|-----| | INBOX-1 | **Catalog + `create_inbox()`.** `pgt_inbox_config` catalog table with column mapping (`id_column`, `processed_at_column`, `retry_count_column`, `error_column`, `received_at_column`, `event_type_column`). `create_inbox(name, schema, max_retries, schedule, with_dead_letter, with_stats, retention_hours)` creates inbox table in the specified schema (default `pgtrickle`) + metadata. `InboxAlreadyExists`, `InboxNotFound`, `InboxTableNotFound`, `InboxColumnMissing` error variants. | 1d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.1–A.3 | | INBOX-2 | **Inbox table DDL.** Standard schema: `event_id TEXT PK`, `event_type`, `source`, `aggregate_id`, `payload JSONB`, `received_at`, `processed_at`, `error`, `retry_count`, `trace_id`. Partial indexes for pending, DLQ, and processed rows. Autovacuum tuning. | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.3 | | INBOX-3 | **Auto-created stream tables.** Pending ST (`WHERE processed_at IS NULL AND retry_count < max_retries`, DIFFERENTIAL, user-defined schedule). DLQ ST (`WHERE processed_at IS NULL AND retry_count >= max_retries`, DIFFERENTIAL, 30 s). Stats ST (GROUP BY `event_type` with pending/processed/dead_letter/avg processing time — `max_pending_age_sec` removed from the materialised ST query to enable **DIFFERENTIAL** mode and eliminate the O(N) full scan every 10 s; use `inbox_health()` for `oldest_pending_age_sec` on demand). All STs use column-mapped SQL from `pgt_inbox_config`. | 1d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.4 | | INBOX-4 | **`enable_inbox_tracking()`.** Adopt existing table: validate columns exist with compatible types, validate PK/UNIQUE on id column, create stream tables using mapped column names, insert metadata with `is_managed = false`. Gracefully omit optional columns (`source`, `aggregate_id`, `trace_id`) if not present. | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.6 | | INBOX-5 | **DLQ alert mechanism.** Post-refresh hook on DLQ stream table: when `rows_inserted > 0`, emit `pg_trickle_alert` event `inbox_dlq_message` per new entry (capped at `inbox_dlq_alert_max_per_refresh`, default 10; excess batched into summary alert). | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.5 | | INBOX-6 | **`inbox_health()` + `inbox_status()`.** `inbox_health(name)` returns JSONB with `pending_count`, `dead_letter_count`, `avg_processing_time_sec`, `oldest_pending_age_sec`, `throughput_per_sec`, `health_status` (`healthy`/`degraded`/`critical`). `inbox_status(name)` returns tabular overview of all inboxes. | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.1 | | INBOX-7 | **Retention drain + `replay_inbox_messages()`.** Processed message drain via scheduler (batched DELETE, `inbox_processed_retention_hours` default 72 h). DLQ messages kept forever by default (`inbox_dlq_retention_hours` default 0). `replay_inbox_messages(name TEXT, event_ids TEXT[])` resets `processed_at` + `retry_count` for the specified message IDs using a parameterised `WHERE event_id = ANY($1)` — no free-form SQL accepted; eliminates injection surface entirely. | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.7–A.8 | | INBOX-8 | **`drop_inbox()` + lifecycle.** `drop_inbox(name, if_exists, cascade)`: always drops stream tables + metadata; drops inbox table only if `cascade := true` AND `is_managed = true`. `DROP EXTENSION` cascades managed tables; adopted tables survive. 6 GUCs (`inbox_enabled`, `inbox_processed_retention_hours`, `inbox_dlq_retention_hours`, `inbox_drain_batch_size`, `inbox_drain_interval_seconds`, `inbox_dlq_alert_max_per_refresh`). | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §A.9–A.10 | | INBOX-9 | **Tests & benchmark.** Unit: create/drop/enable_tracking/replay/health. Integration: end-to-end inbox lifecycle, DLQ routing, DLQ alert, retention drain, concurrent processors with `FOR UPDATE SKIP LOCKED`, `enable_inbox_tracking()` with non-standard columns. Benchmark gates: (a) pending ST refresh < 5 ms at 100 pending, < 50 ms at 10K pending; (b) `next_` ordered ST refresh at each threshold (100K/1M/10M processed rows) matches documented scaling table; (c) stats ST FULL refresh < 5 ms at 100K rows, < 50 ms at 1M rows; (d) backpressure indicator: `inbox_health()` returns `degraded` within 2 refresh cycles when `oldest_pending_age_sec` exceeds threshold. | 1d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §D | | INBOX-10 | **Documentation & examples.** SQL_REFERENCE.md: inbox API. CONFIGURATION.md: 6 GUCs. PATTERNS.md: Transactional Inbox section + "Bidirectional Event Pipeline" (inbox → business logic → outbox) worked example. Reference examples: `inbox_writer_nats.py`, `inbox_processor.py`, `webhook_receiver.py`. | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §C, §E | > **Inbox essential subtotal: ~6.5 days** #### Shared Infrastructure (Part A) | Item | Description | Effort | Ref | |------|-------------|--------|-----| | SHARED-1 | **Upgrade SQL.** `sql/pg_trickle--0.23.0--0.24.0.sql`: create `pgt_outbox_config` and `pgt_inbox_config` catalog tables, register all new SQL functions. | 0.5d | — | | SHARED-2 | **PATTERNS.md integration guide.** New "Event-Driven Integration Patterns" chapter in `docs/PATTERNS.md` covering: when to use outbox vs inbox vs both, transport comparison (NATS/Kafka/pgmq), bidirectional pipeline (inbox → business logic → outbox), and competing consumer patterns (`FOR UPDATE SKIP LOCKED`). | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX.md), [PLAN_TRANSACTIONAL_INBOX.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX.md) | | SHARED-3 | **E2E integration test.** Full pipeline: inbox receives event → processor creates business entity → outbox captures delta → verify end-to-end exactly-once delivery. | 0.5d | — | > **Part A subtotal: ~15 days** --- ### Part B — Production Patterns #### Consumer Groups for Outbox > **In plain terms:** Multiple relay processes can share a single outbox > table safely using consumer groups — the same concept as Kafka consumer > groups or SQS consumer groups, but implemented entirely in PostgreSQL. > Each group has its own offset pointer. Relays call `poll_outbox()` to > claim a batch under a visibility timeout (like SQS), then call > `commit_offset()` when done. If a relay crashes, its lease expires and > another relay picks up the batch. `consumer_lag()` shows how far behind > each consumer is. Dead relays are reaped automatically after 24 h. | Item | Description | Effort | Ref | |------|-------------|--------|-----| | OUTBOX-B1 | **Consumer group catalog + lifecycle.** `pgt_consumer_groups` + `pgt_consumer_offsets` + `pgt_consumer_leases` catalog tables. Also creates `pgt_consumer_claim_check_acks` (tracks per-group cursor-consumption completion for claim-check retention drain safety; not created in Part A since it has no purpose without consumer groups). `create_consumer_group(name, outbox, auto_offset_reset)` / `drop_consumer_group(name)` SQL functions; `drop_consumer_group()` decrements a per-outbox reference count and drops per-outbox monitoring STs when count reaches zero. `auto_offset_reset` values: `latest` (default) or `earliest`. `ConsumerGroupAlreadyExists`, `ConsumerGroupNotFound` error variants. | 1d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.2–B.3 | | OUTBOX-B2 | **`poll_outbox()` with visibility timeout and lease management.** Returns next batch for `(group, consumer_id)` using `FOR UPDATE SKIP LOCKED`. Acquires lease in `pgt_consumer_leases` with configurable `visibility_seconds` (default 30). Auto-registers new consumer_id on first call based on `auto_offset_reset`. Skips rows already leased by other consumers. | 1.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.4 | | OUTBOX-B2a | **`extend_lease()` — lease renewal for long-running relays.** `extend_lease(group, consumer, extension_seconds INT DEFAULT 30)` extends the `visibility_until` of all active leases held by the named consumer, returning the new `visibility_until` timestamp. Prevents spurious re-delivery when broker publish or business logic takes longer than the original `visibility_seconds`. Calling `consumer_heartbeat()` does **not** extend leases — heartbeat and lease lifetime are separate concerns. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.4 | | OUTBOX-B3 | **`commit_offset()` + `seek_offset()`.** `commit_offset(group, consumer, last_offset)` monotonically advances offset, releases lease, rejects regression with warning. `seek_offset(group, consumer, new_offset)` resets to any position and clears leases; emits `pg_trickle_alert` event `consumer_seeked`. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.4, §B.6 | | OUTBOX-B4 | **Heartbeat + liveness.** `consumer_heartbeat(group, consumer)` updates `last_heartbeat_at` (liveness only — does **not** extend active leases; use `extend_lease()` for that). Consumer is healthy when `last_heartbeat_at > now() - 60 s`. `pg_trickle_alert` event `consumer_unhealthy` when consumer transitions healthy → unhealthy. `consumer_lag()` **live SQL function** (always-fresh, suitable for ad-hoc inspection) exposes per-consumer `healthy` boolean, current lag, and offset. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.5 | | OUTBOX-B5 | **Monitoring stream tables.** Three auto-created STs on first `create_consumer_group()`: `pgt_consumer_status` (per-consumer offset + heartbeat timestamp, **FULL mode**, 5 s — FULL because `pgt_consumer_offsets` is updated on every heartbeat and offset commit; at typical relay poll rates most rows change between refreshes, making FULL simpler than DIFFERENTIAL for this small table), `pgt_consumer_group_lag` (per-group aggregate lag, DIFFERENTIAL, 10 s), `pgt_consumer_active_leases` (current leases filtered by `visibility_until > now()`, **FULL mode**, 5 s — FULL because the filter changes every cycle as leases expire). Use `consumer_lag()` for ad-hoc inspection of live health data including `heartbeat_age_sec`; use `pgt_consumer_group_lag` ST for Grafana dashboards and alerting rules (materialized every 10 s). | 1d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.7 | | OUTBOX-B6 | **Dead consumer auto-cleanup.** Scheduler step (GUC `consumer_cleanup_enabled`, default `true`): reap consumers with `last_heartbeat_at < now() - consumer_dead_threshold_hours` (GUC, default 24 h), release their leases. Remove from offsets if also `last_commit_at < now() - consumer_stale_offset_threshold_days` (GUC, default 7 d). Emit `pg_trickle_alert` event `consumer_reaped`. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.9 | | OUTBOX-B7 | **Retention safety guard.** When consumer groups are enabled, retention drain refuses to delete `outbox_` rows with `id > MIN(last_offset across all consumers)` to prevent silent data loss for slow relays. For claim-check rows, additionally waits until all consumer groups that have polled past that `outbox_id` have called `outbox_rows_consumed()` for it — preventing delta rows from being cascade-deleted via FK before the relay finishes cursor consumption. GUC `outbox_force_retention` (default `false`) allows operator override for permanently abandoned consumers. | 0.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B.6 | | OUTBOX-B8 | **Tests.** Integration: multi-relay group creation, visibility timeout expiry + re-poll, `commit_offset` idempotency, `seek_offset` replay, heartbeat → unhealthy transition, dead consumer reaping, retention guard prevents early drain. Benchmark: `poll_outbox` latency < 5 ms at 10K outbox rows. | 1.5d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §D | | OUTBOX-B9 | **Documentation & reference relay.** SQL_REFERENCE.md: consumer group API + delivery guarantee section (at-least-once per consumer; exactly-once by composition). CONFIGURATION.md: `consumer_cleanup_enabled`, `outbox_force_retention`, `consumer_dead_threshold_hours` (default 24), `consumer_stale_offset_threshold_days` (default 7) GUCs. Reference Python relay with group coordination (`examples/relay/outbox_relay.py`). Rust equivalent (`examples/relay/outbox_relay.rs`). PATTERNS.md: multi-relay competing consumers section + claim-check large delta handling guide (server-side cursor consumption, `outbox_rows_consumed()`, bounded-memory relay loop) + latest-state consumer section (dedup view). | 1d | [PLAN_TRANSACTIONAL_OUTBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_OUTBOX_HELPER.md) §B, §C | > **Consumer groups subtotal: ~8 days** #### Ordered Processing for Inbox > **In plain terms:** For financial, order management, and audit-trail > use-cases, messages about the same entity (customer, order, account) > must be processed in the order they were produced. `enable_inbox_ordering()` > creates a `next_` stream table that surfaces only the *next expected* > message per aggregate — preventing out-of-order processing automatically. > Gap detection alerts when a message is missing too long. Priority queues > let critical messages use a 1-second refresh schedule while background > messages use 30 seconds. Worker partition affinity reduces contention when > multiple processors share an inbox. | Item | Description | Effort | Ref | |------|-------------|--------|-----| | INBOX-B1 | **`enable_inbox_ordering()` + aggregate-ordered stream table.** `pgt_inbox_ordering_config` catalog table. `enable_inbox_ordering(inbox, aggregate_id_col, sequence_num_col)` creates `next_` ST: `DISTINCT ON (aggregate_id)` selecting only the row where `sequence_num = last_processed_seq + 1`. Ensures only the next expected message per aggregate is surfaced. `disable_inbox_ordering(inbox)` drops the ST + config row. **Mutually exclusive with `enable_inbox_priority()`** — returns `InboxOrderingPriorityConflict` if priority is already enabled on this inbox (and vice versa). | 1.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §B.2–B.3 | | INBOX-B2 | **Gap detection stream table + alert.** `gaps_` ST uses a `LEAD()` window function (O(N log N)) to detect missing sequence numbers by comparing adjacent sequences in the pending-only messages. Uses `FULL` refresh mode (contains `now()` in `gap_age_sec`). Emits `pg_trickle_alert` event `inbox_ordering_gap` when new gaps appear. `inbox_ordering_gaps(inbox_name)` SQL function for ad-hoc inspection. 30 s refresh schedule. Scales to 1M+ pending messages without the O(sequence_range) blowup of `generate_series`. | 1d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §B.4 | | INBOX-B3 | **`enable_inbox_priority()` + tier-based stream tables.** `pgt_inbox_priority_config` catalog table. `enable_inbox_priority(inbox, priority_col, tiers JSONB)` creates one `pending__` ST per priority tier with per-tier `schedule` and `WHERE priority BETWEEN min AND max`. Default 3 tiers: critical (1–2, 1 s), normal (3–6, 5 s), background (7–9, 30 s). Original `pending_` preserved as unified view. `disable_inbox_priority(inbox, if_exists)` drops all tier STs + config row; original unified `pending_` is restored. | 1d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §B.5 | | INBOX-B4 | **`inbox_is_my_partition()` helper.** Boolean-returning SQL function with signature `inbox_is_my_partition(aggregate_id TEXT, worker_id INT, total_workers INT) RETURNS BOOLEAN`. Evaluates `abs(hashtext(aggregate_id)) % total_workers = worker_id` inline in the WHERE clause. Advisory only — workers can still process any message; the condition makes each worker prefer its subset for cache locality. Composable with prepared statements and ORMs without SQL string interpolation. Documented in PATTERNS.md with Python + SQL usage example. | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §B.6 | | INBOX-B5 | **Tests.** Integration: ordered ST surfaces only next-sequence messages; out-of-order arrivals withheld until preceding sequence processed; gap detection fires alert after configurable delay; priority tier routing; partition affinity correctness (no messages lost). Benchmark gate: `gaps_` ST refresh at 1M messages across 10K aggregates must complete in < 1 s at 30 s schedule (uses `LEAD()` window function; O(N log N) not O(sequence_range)). Chaos: processor crash mid-processing + replay recovery; concurrent processors with `FOR UPDATE SKIP LOCKED` (no duplicate processing at 10 concurrent workers). | 1.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §D | | INBOX-B6 | **Documentation & examples.** SQL_REFERENCE.md: ordering + priority API. CONFIGURATION.md: ordering GUCs. PATTERNS.md: per-aggregate ordering, gap recovery, priority queue, and competing workers with partition affinity sections. Reference `examples/inbox/inbox_processor_ordered.py`. | 0.5d | [PLAN_TRANSACTIONAL_INBOX_HELPER.md](plans/patterns/PLAN_TRANSACTIONAL_INBOX_HELPER.md) §B, §C | > **Ordered processing subtotal: ~6 days** #### Shared Infrastructure (Part B) | Item | Description | Effort | Ref | |------|-------------|--------|-----| | SHARED-B1 | **Upgrade SQL additions.** Extend `sql/pg_trickle--0.23.0--0.24.0.sql`: create `pgt_consumer_groups`, `pgt_consumer_offsets`, `pgt_consumer_leases`, `pgt_inbox_ordering_config`, `pgt_inbox_priority_config` tables; register all Part B SQL functions. *(Note: `pgt_consumer_claim_check_acks` is created dynamically by `create_consumer_group()` at runtime, not in the upgrade script — it has no purpose without consumer groups.)* | 0.5d | — | | SHARED-B2 | **Advanced PATTERNS.md sections.** Add to "Event-Driven Integration Patterns" chapter: competing relays with consumer groups, ordered inbox processing end-to-end, priority queues (when to use), partition-affinity for high-throughput inboxes, claim-check large delta handling guide (when triggered, cursor consumption loop, `outbox_rows_consumed()`, interaction with `full_refresh` flag), latest-state consumer pattern (dedup view), and FULL-refresh fallback handling for relay authors. Note in PATTERNS.md: add Grafana dashboard panel recommendations for consumer lag (`pgt_consumer_group_lag` ST), DLQ growth rate (`dlq_` ST), inbox pending backlog (`pending_` ST), and inbox throughput (`stats_` ST). | 0.5d | — | | SHARED-B3 | **Advanced E2E tests.** (1) Multi-relay group test: 3 relays share one outbox group, verify each row published exactly once, simulate relay crash + visibility timeout redelivery. (2) Ordered inbox test: publish 10 messages out-of-order per aggregate, verify processor receives them in sequence order. (3) Concurrent stress: 10 relay workers + 100K outbox rows; verify < 0.1% duplicate rate at broker. | 1d | — | | SHARED-B4 | **dbt adapter updates.** Add `outbox_enabled`, `consumer_group` and `inbox_config` properties to dbt model config; add `pgtrickle_outbox_config` and `pgtrickle_create_inbox` macros; update dbt-pgtrickle docs and integration tests. | 0.5d | [dbt-pgtrickle/AGENTS.md](dbt-pgtrickle/AGENTS.md) | > **Part B subtotal: ~17.5 days** --- ### Implementation Phases | Phase | Description | Duration | |-------|-------------|----------| | A-SHARED | Upgrade SQL, shared Part A catalog infrastructure | Day 1 | | A-OUTBOX | Outbox helper: catalog, table DDL, refresh-path hook, payload format, retention, lifecycle, GUCs | Days 1–5 | | A-INBOX | Inbox helper: catalog, table DDL, stream tables, `enable_inbox_tracking`, DLQ alerts, health, replay, retention, lifecycle, GUCs | Days 5–11 | | A-TEST | Part A integration tests, E2E pipeline test, benchmarks | Days 11–13 | | A-DOC | Part A documentation, PATTERNS.md guide, reference examples | Days 13–14 | | B-OUTBOX | Consumer groups: catalog, `poll_outbox`, `commit_offset`, `seek_offset`, heartbeat, monitoring STs, dead consumer cleanup, retention guard | Days 14–22 | | B-INBOX | Ordered processing: `enable_inbox_ordering`, gap detection, priority queues, worker partition helper | Days 22–28 | | B-TEST | Part B integration tests, multi-relay E2E, ordered inbox E2E | Days 28–31 | | B-DOC | Part B documentation, advanced PATTERNS.md sections, reference relay implementations | Days 31–33 | > **v0.28.0 total: ~6–7 weeks solo / ~4–5 weeks with two developers working Part A and Part B tracks in parallel** (Part A: essential patterns + Part B: production patterns) **Exit criteria:** - [x] OUTBOX-1/2: `enable_outbox()` creates outbox table + `pgt_outbox_latest_` view with correct schema; catalog row present - [x] OUTBOX-1: `enable_outbox()` on IMMEDIATE-mode stream table returns `OutboxRequiresNotImmediateMode` with clear message - [x] OUTBOX-2: Naming collision resolution: truncation + hex suffix tested end-to-end; final name stored in catalog - [x] OUTBOX-3/CC: Initial load (first refresh, all rows as `"inserted"`) above `outbox_inline_threshold_rows` uses claim-check path; `outbox_delta_rows_` populated atomically; no data loss - [x] OUTBOX-3/CC: Bulk source update (many rows changed in one cycle) above threshold uses claim-check path; relay cursor returns all inserted + deleted rows correctly - [x] OUTBOX-3: Refresh populates outbox payload within same transaction; rollback on outbox INSERT failure leaves no orphan delta rows - [x] OUTBOX-4: Small deltas (≤ `outbox_inline_threshold_rows`) produce inline `{"v":1, "inserted":[…], "deleted":[…]}`; large deltas produce claim-check header `{"v":1, "claim_check": true, …}` with rows in `outbox_delta_rows_`; relay cursor consumption + `outbox_rows_consumed()` documented + tested; no truncation path exists - [x] OUTBOX-5: Retention drain removes rows older than `outbox_retention_hours`; respects batch size - [x] OUTBOX-6: `drop_stream_table()` cascades to outbox + latest-row view; `outbox_status()` returns correct data - [x] OUTBOX-7: Benchmark shows < 10 % overhead vs baseline at small payloads - [x] INBOX-1/2: `create_inbox()` creates inbox table + 3 stream tables + metadata - [x] INBOX-3: Pending ST reflects unprocessed messages; DLQ ST reflects poisoned messages - [x] INBOX-4: `enable_inbox_tracking()` works with non-standard column names on existing tables - [x] INBOX-5: `pg_trickle_alert` fires when new DLQ entries appear - [x] INBOX-6: `inbox_health()` returns correct health status; `inbox_status()` lists all inboxes - [x] INBOX-7: `replay_inbox_messages()` resets messages by explicit `event_ids` array (no `where_clause`); uses parameterised `WHERE event_id = ANY($1)` — no dynamic SQL; retention drain respects DLQ; processor crash + replay recovery path documented - [x] INBOX-8: `drop_inbox(cascade := true)` drops managed table; preserves adopted tables - [x] SHARED-3: End-to-end inbox → business logic → outbox pipeline test passes - [x] SHARED-4: dbt adapter updated with `outbox_enabled` and `inbox_config` properties; integration tests pass - [x] OUTBOX-B1: `create_consumer_group()` creates group + offset + lease tables; idempotent re-create - [x] OUTBOX-3/4: FULL-refresh fallback writes `"full_refresh": true` in header; claim-check applies when row count exceeds `outbox_inline_threshold_rows`; reference relay handles all four combinations (inline/claim-check × differential/full-refresh) correctly - [x] OUTBOX-B2: `poll_outbox()` returns correct batch; no overlap between concurrent relays; visibility timeout expires and row re-delivered - [x] OUTBOX-B2a: `extend_lease()` extends visibility_until for all active consumer leases; re-delivery does not occur when relay calls extend_lease before timeout - [x] OUTBOX-B3: `commit_offset()` advances monotonically; `seek_offset()` enables replay from any position - [x] OUTBOX-B4: Heartbeat tracks liveness; `consumer_unhealthy` alert fires on timeout - [x] OUTBOX-B5: Three monitoring STs (status/FULL, group lag/DIFFERENTIAL, active leases/FULL) created idempotently (second `create_consumer_group()` does not fail); refreshed correctly; dropped when last group is dropped (reference count reaches zero) - [x] OUTBOX-B6: Dead relay reaped after `consumer_dead_threshold_hours` (default 24 h, configurable); leases released; `consumer_reaped` alert emitted - [x] OUTBOX-B7: Retention drain respects `MIN(last_offset)`; `outbox_force_retention` override works - [x] OUTBOX-B8: Multi-relay group E2E: each outbox row published exactly once across 3 concurrent relays - [x] OUTBOX-B8b: Concurrent relay stress test: 10 relays, 100K outbox rows, < 0.1% duplicate rate before broker dedup; 0% after - [x] INBOX-B1: `next_` ST surfaces only next expected sequence per aggregate; withholds future sequences; partial index `(aggregate_id, sequence_num) WHERE processed_at IS NOT NULL` created by `enable_inbox_ordering()` - [x] INBOX-B1: `enable_inbox_ordering()` + `enable_inbox_priority()` together returns `InboxOrderingPriorityConflict` with clear message - [x] INBOX-B2: `gaps_` ST detects missing sequences using `LEAD()` window function; `inbox_ordering_gap` alert fires; gap detection benchmark passes (< 1 s at 10K aggregates, 1M messages; O(N log N) not O(sequence_range)) - [x] INBOX-B3: Priority tier STs refresh at configured schedules; messages route to correct tier - [x] INBOX-B3: `disable_inbox_priority()` drops all tier STs + config row; unified `pending_` is restored - [x] INBOX-B1: `disable_inbox_ordering()` drops `next_` ST + config row; inbox resumes normal pending behaviour - [x] INBOX-B4: `inbox_is_my_partition(aggregate_id, worker_id, total_workers)` returns BOOLEAN; no messages lost across N workers; usable in prepared statements without SQL interpolation - [x] SHARED-B3: Ordered inbox E2E: 10 out-of-order arrivals per aggregate delivered to processor in order - [x] SHARED-B4: dbt adapter updated with consumer group and inbox ordering properties - [x] Extension upgrade path tested (`0.27.0 → 0.28.0`) — `sql/pg_trickle--0.27.0--0.28.0.sql` validated by `scripts/check_upgrade_completeness.sh` - [x] `just check-version-sync` passes ---