> **Plain-language companion:** [v0.29.0.md](v0.29.0.md) ## v0.29.0 — Relay CLI (`pgtrickle-relay`) **Status: ✅ Released.** See [plans/relay/PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) for the full design. > **Release Theme** > This release ships `pgtrickle-relay` — a standalone bidirectional Rust CLI > binary that bridges pg-trickle outboxes and inboxes with popular messaging > systems. In **forward mode** it polls outbox tables and publishes deltas to > external sinks; in **reverse mode** it consumes messages from external > sources and writes them into pg-trickle inbox tables. Both directions share > symmetric Source/Sink trait abstractions, config system, observability, and > error handling. Implemented as a workspace member alongside `pgtrickle-tui`, > with 8 backends behind Cargo feature flags. The relay makes the v0.28.0 > outbox and inbox immediately usable — zero custom relay code required. > > See [plans/relay/PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) > for the full architecture, backend specifications, and phased implementation plan. ### Phase 1 — Core Framework + Forward Tier 1 Sinks | Item | Description | Effort | Ref | |------|-------------|--------|-----| | RELAY-CAT | **Catalog schema + SQL API.** `sql/pg_trickle--0.23.0--0.24.0.sql`: create `pgtrickle.relay_outbox_config` + `pgtrickle.relay_inbox_config` tables, shared `relay_config_notify()` trigger (uses `TG_TABLE_NAME` to identify direction), and 7 `SECURITY DEFINER` SQL wrapper functions: `set_relay_outbox`, `set_relay_inbox`, `enable_relay`, `disable_relay`, `delete_relay`, `get_relay_config`, `list_relay_configs`. Functions validate required JSONB keys and raise clear exceptions. Direct table access is revoked from `pgtrickle_relay`; only `EXECUTE` on the API functions is granted — tables are an internal implementation detail. | 0.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §A.14 | | RELAY-1 | **Crate scaffold.** Workspace member `pgtrickle-relay/` with `Cargo.toml`, feature flags per backend, CLI parsing via `clap` (`--postgres-url`, `--metrics-addr`, `--log-format`, `--log-level`; no config subcommands — pipeline management is SQL-only), DB bootstrap (connect to PG, load `relay_outbox_config` + `relay_inbox_config`, `LISTEN pgtrickle_relay_config`), `RelayError` enum, `RelayMessage` envelope type. | 1.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §A.1–A.3, §A.7 | | RELAY-2 | **Source + Sink traits + relay loop.** `async trait Source` with `poll`/`acknowledge`, `async trait Sink` with `publish`/`is_healthy`. Generic relay loop composing any source with any sink via `CancellationToken`. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §A.4–A.6 | | RELAY-3 | **Outbox poller source.** Simple mode (offset tracked in memory) and consumer group mode (`poll_outbox()` + `commit_offset()`). Heartbeat background task. Lease renewal via `extend_lease()`. | 2d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §A.8 | | RELAY-4 | **Payload decoder.** All four modes: inline differential, inline full-refresh, claim-check differential, claim-check full-refresh. Server-side cursor for claim-check rows. `outbox_rows_consumed()` called after cursor consumption. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §A.9 | | RELAY-5 | **Sink: stdout/file.** `jsonl`, `json_pretty`, `csv` formats. File rotation. | 0.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.4 | | RELAY-6 | **Sink: NATS JetStream.** `async-nats`. Subject template. `Nats-Msg-Id` dedup header. `Pgtrickle-Full-Refresh` header. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.1 | | RELAY-7 | **Sink: HTTP webhook.** `reqwest`. Batch and per-event mode. `Idempotency-Key` header. Configurable timeout, custom headers, retry-on-status. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.2 | | RELAY-8 | **Sink: Apache Kafka.** `rdkafka`. Idempotent producer. Dedup key as record key. Topic template. Compression, acks, SASL/SSL. | 1.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.3 | | RELAY-9 | **Observability + shutdown.** `axum` at `:9090/metrics` + `GET /health`. Prometheus counters for both modes. SIGTERM/SIGINT graceful shutdown. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §A.11–A.12 | > **Phase 1 subtotal: ~10.5 days** ### Phase 2 — Forward Tier 2 Sinks | Item | Description | Effort | Ref | |------|-------------|--------|-----| | RELAY-10 | **Sink: Redis Streams.** `redis` crate. `XADD` with `MAXLEN ~`. Stream key template. Dedup key field. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.5 | | RELAY-11 | **Sink: Amazon SQS.** `aws-sdk-sqs`. `SendMessageBatch`. `MessageDeduplicationId` for FIFO queues. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.6 | | RELAY-12 | **Sink: PostgreSQL inbox (remote).** `tokio-postgres`. Inserts into compatible inbox table on different PG. `ON CONFLICT (event_id) DO NOTHING`. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.7 | | RELAY-13 | **Sink: RabbitMQ AMQP.** `lapin`. Exchange + routing key template. `message-id` AMQP property. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §B.8 | | RELAY-14 | **Subject/topic routing templates.** Variables: `{stream_table}`, `{op}`, `{outbox_id}`, `{refresh_id}`. Per-event-type override map. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §A.3 | > **Phase 2 subtotal: ~5 days** ### Phase 3 — Reverse Mode (Sources + Inbox Sink) | Item | Description | Effort | Ref | |------|-------------|--------|-----| | RELAY-22 | **Inbox sink.** pg-trickle inbox writer with batch insert, `ON CONFLICT (event_id) DO NOTHING`, dedup tracking metric, configurable column mapping. | 1.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §D | | RELAY-23 | **Source: NATS JetStream consumer.** Durable pull consumer, ack after inbox write. Dedup key from `Nats-Msg-Id` header or stream sequence. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §C.1 | | RELAY-24 | **Source: Apache Kafka consumer.** `rdkafka` `StreamConsumer`, manual offset commit after inbox write. Dedup key from record key or partition:offset. | 1.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §C.3 | | RELAY-25 | **Source: HTTP webhook receiver.** `axum` server, synchronous ack (200 after inbox write). Dedup key from `Idempotency-Key` header. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §C.2 | | RELAY-26 | **Source: Redis Streams consumer.** `XREADGROUP` + `XACK`. Dedup key from `pgt_dedup_key` field or entry ID. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §C.5 | | RELAY-27 | **Source: Amazon SQS consumer.** `ReceiveMessage` + `DeleteMessage`. Dedup key from `MessageDeduplicationId` (FIFO) or `MessageId`. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §C.6 | | RELAY-28 | **Source: RabbitMQ consumer.** `basic_consume` + manual ack/nack. Dedup key from `message-id` AMQP property. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §C.7 | | RELAY-29 | **Source: stdin/file reader.** JSONL format. Dedup key from `dedup_key` field or generated UUID. | 0.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §C.4 | | RELAY-30 | **Reverse-mode config.** Dedup key mapping, event type extraction, inbox column mapping. | 0.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §D | > **Phase 3 subtotal: ~10 days** ### Phase 4 — Testing & Polish | Item | Description | Effort | Ref | |------|-------------|--------|-----| | RELAY-15 | **Unit tests.** Payload decoder (all 4 modes), config merging, subject templates, dedup key generation, retry backoff, envelope round-trip, mock source→sink. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §E.1 | | RELAY-16 | **Forward integration tests (Testcontainers).** NATS, Kafka (Redpanda), webhook (WireMock), Redis, PG inbox — end-to-end per sink with dedup verification. | 2d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §E.2 | | RELAY-17 | **Forward consumer group E2E.** 2 relay instances share one consumer group; zero duplicates; crash recovery; claim-check large delta. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §E.2 | | RELAY-31 | **Reverse integration tests (Testcontainers).** NATS→inbox, Kafka→inbox, webhook→inbox, Redis→inbox, SQS→inbox, RabbitMQ→inbox, stdin→inbox — dedup verification per source. | 2d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §E.3 | | RELAY-32 | **Reverse dedup + crash recovery E2E.** Duplicate messages produce 1 inbox row; kill relay mid-batch → restart → zero lost messages. | 0.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §E.3 | | RELAY-18 | **Benchmarks.** Forward + reverse throughput (100K events), latency p50/p95/p99, memory bounded during claim-check. | 0.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §E.4 | > **Phase 4 subtotal: ~7 days** ### Phase 5 — Documentation & Distribution | Item | Description | Effort | Ref | |------|-------------|--------|-----| | RELAY-19 | **Documentation.** `pgtrickle-relay/README.md` quick start (forward + reverse). `docs/RELAY.md` comprehensive guide. `docs/PATTERNS.md` relay section with worked examples per backend. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §F.1 | | RELAY-20 | **Dockerfile + GitHub Actions.** Distroless container image `grove/pgtrickle-relay`. CI matrix: Linux amd64/arm64, macOS amd64/arm64. Pre-built binaries on GitHub Releases. | 1d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §F.2 | | RELAY-21 | **Release automation.** Docker Hub publish, Homebrew formula (`brew install grove/tap/pgtrickle-relay`), `cargo publish pgtrickle-relay`. | 0.5d | [PLAN_RELAY_CLI.md](plans/relay/PLAN_RELAY_CLI.md) §F.2 | > **Phase 5 subtotal: ~2.5 days** ### Implementation Phases | Phase | Description | Duration | |-------|-------------|----------| | Phase 1 | Core framework: Source/Sink traits, outbox poller, payload decoder, NATS/webhook/Kafka sinks, metrics, shutdown | Days 1–10 | | Phase 2 | Tier 2 sinks: Redis, SQS, PG inbox, RabbitMQ + routing templates | Days 10–15 | | Phase 3 | Reverse mode: inbox sink, NATS/Kafka/webhook/Redis/SQS/RabbitMQ/stdin sources + reverse config | Days 15–25 | | Phase 4 | Tests: unit, Testcontainers integration (forward + reverse), consumer group E2E, benchmarks | Days 25–32 | | Phase 5 | Distribution: Docker, CI binaries, Homebrew, docs, cargo publish | Days 32–34.5 | > **v0.29.0 total: ~36.5 days solo / ~23 days with two developers** > (Phases 1–2 forward sinks and Phase 3 reverse sources can be parallelised. > Requires v0.28.0 outbox + consumer groups for full forward E2E; reverse > mode only needs inbox table schema.) **Exit criteria:** - [ ] RELAY-CAT: Migration `sql/pg_trickle--0.23.0--0.24.0.sql` creates `relay_outbox_config` + `relay_inbox_config` tables and `relay_config_notify()` trigger - [ ] RELAY-CAT: `set_relay_outbox()` validates `source_type = 'outbox'`; `set_relay_inbox()` validates `sink_type = 'pg-inbox'`; missing keys raise clear exception - [ ] RELAY-CAT: `enable_relay()`/`disable_relay()`/`delete_relay()` search both tables; raise exception on missing name - [ ] RELAY-CAT: `list_relay_configs()` returns all pipelines with `direction` column; `get_relay_config()` raises on missing name - [ ] RELAY-CAT: functions are `SECURITY DEFINER`; `pgtrickle_relay` role has no direct table access; `SELECT * FROM pgtrickle.relay_outbox_config` fails with permission denied for relay role - [ ] RELAY-1: `pgtrickle-relay` crate builds with `--features default` and `--features nats,webhook,kafka` - [ ] RELAY-2: Source + Sink traits compose correctly; relay loop runs with mock source/sink - [ ] RELAY-3: Simple mode polls and forwards events; consumer group mode uses `poll_outbox()` + `commit_offset()` correctly - [ ] RELAY-4: Inline payload decoded and published; claim-check cursor fetch returns all rows; `outbox_rows_consumed()` called; full-refresh flag triggers upsert semantics - [ ] RELAY-5: stdout/file backend writes valid JSONL; all 3 formats tested - [ ] RELAY-6: NATS E2E: relay publishes; consumer verifies dedup via `Nats-Msg-Id` - [ ] RELAY-7: Webhook E2E: relay POSTs batch; WireMock verifies `Idempotency-Key` header - [ ] RELAY-8: Kafka E2E: relay produces records; consumer group verifies zero duplicates - [ ] RELAY-9: `/metrics` returns valid Prometheus exposition; `/health` returns 200 healthy, 503 degraded - [ ] RELAY-10: Redis E2E: `XRANGE` returns all relayed events in order - [ ] RELAY-11: SQS E2E: `SendMessageBatch` used; FIFO dedup verified - [ ] RELAY-12: PG inbox E2E: events appear in target inbox; duplicate publish does not duplicate row - [ ] RELAY-13: RabbitMQ E2E: events delivered to bound queue; `message-id` property set - [ ] RELAY-14: Subject template `pgtrickle.{stream_table}.{op}` resolves correctly - [ ] RELAY-15: All unit tests pass - [ ] RELAY-16: All forward Testcontainers integration tests pass per sink - [ ] RELAY-17: Forward consumer group E2E: 2 relays, 0 duplicates; crash recovery verified - [ ] RELAY-18: Forward throughput > 10K events/sec inline → NATS; reverse throughput > 10K events/sec Kafka → inbox; memory bounded during claim-check - [ ] RELAY-19: `docs/RELAY.md` published; quick start covers forward + reverse with NATS, webhook, Kafka - [ ] RELAY-20: Docker image `grove/pgtrickle-relay:0.24.0` published; distroless < 50 MB - [ ] RELAY-21: `cargo install pgtrickle-relay` works; Homebrew formula passes `brew audit` - [ ] RELAY-22: Inbox sink writes events with `ON CONFLICT` dedup; batch insert verified - [ ] RELAY-23: NATS→inbox E2E: durable consumer delivers to inbox; ack only after write - [ ] RELAY-24: Kafka→inbox E2E: offset committed only after inbox write; crash recovery verified - [ ] RELAY-25: Webhook→inbox E2E: POST returns 200 only after inbox write - [ ] RELAY-26: Redis→inbox E2E: XACK sent only after inbox write - [ ] RELAY-27: SQS→inbox E2E: DeleteMessage after inbox write; visibility timeout re-poll verified - [ ] RELAY-28: RabbitMQ→inbox E2E: manual ack after inbox write; nack+requeue on failure - [ ] RELAY-29: stdin→inbox: piped JSONL arrives in inbox; dedup key extracted - [ ] RELAY-30: Reverse config: event type extraction + column mapping works - [ ] RELAY-31: All reverse Testcontainers integration tests pass per source - [ ] RELAY-32: Reverse dedup: duplicate source message produces 1 inbox row; crash recovery zero loss - [ ] Extension upgrade path tested (`0.28.0 → 0.29.0`) - [ ] `just check-version-sync` passes ---