# AGENTS.md — Development Guidelines for dbt-pg-trickle ## Project Overview dbt package providing a `stream_table` custom materialization for the [pg-trickle](https://github.com/grove/pg-trickle) PostgreSQL extension. Pure Jinja SQL macros — no Python adapter code. Works with stock `dbt-postgres`. - **Language:** Jinja2 SQL (dbt macros) - **Framework:** dbt Core ≥ 1.9, ≤ 1.10, dbt-postgres adapter - **Target database:** PostgreSQL 18 with pg_trickle extension - **Package name:** `dbt_pg_trickle` - **License:** Apache 2.0 Key docs: `plans/PLAN_DBT_MACRO.md` · `README.md` · `CHANGELOG.md` ## Directory Layout ``` macros/ ├── materializations/ │ └── stream_table.sql # Core custom materialization — entry point for dbt run ├── adapters/ │ ├── create_stream_table.sql # Wraps pgtrickle.create_stream_table() │ ├── create_or_replace_stream_table.sql # Wraps pgtrickle.create_or_replace_stream_table() (≥ 0.6.0) │ ├── alter_stream_table.sql # Wraps pgtrickle.alter_stream_table() │ ├── drop_stream_table.sql # Wraps pgtrickle.drop_stream_table() │ └── refresh_stream_table.sql # Wraps pgtrickle.refresh_stream_table() ├── hooks/ │ └── source_freshness.sql # Source freshness via pg_stat_stream_tables ├── operations/ │ ├── refresh.sql # Manual refresh run-operation (single table) │ ├── refresh_all.sql # Refresh all stream tables in dependency order │ ├── drop_all.sql # Drop all stream tables run-operation │ └── check_cdc_health.sql # CDC health check run-operation └── utils/ ├── stream_table_exists.sql # Check catalog for stream table existence ├── get_stream_table_info.sql # Read stream table metadata from catalog ├── stream_table_status.sql # Health status (healthy/stale/erroring/paused) └── has_create_or_replace.sql # Detect pg_trickle ≥ 0.6.0 tests/ └── generic/ └── stream_table_healthy.sql # Generic dbt test for stream table health integration_tests/ # Standalone dbt project for testing ├── dbt_project.yml ├── profiles.yml # Postgres → localhost:5432 ├── packages.yml # Installs parent via local: ../ ├── models/ # Test stream table models ├── seeds/ # CSV fixtures └── tests/ # Data tests / assertions ``` ## Workflow — Always Do This After any macro change, run the integration test suite: ```bash # The just command handles Docker, Python venv, and dbt installation: just test-dbt-fast # fast (reuses existing E2E Docker image) just test-dbt # full (rebuilds Docker image from source) ``` The test runner (`integration_tests/scripts/run_dbt_tests.sh`) automatically: - Creates a dedicated `.venv-dbt` virtual environment using **Python 3.13** (dbt-core 1.10 requires Python ≤3.13; mashumaro dependency incompatible with 3.14) - Installs `dbt-core~=1.10` and `dbt-postgres~=1.10` into that venv - Adds the Homebrew-keg PostgreSQL `bin/` to PATH so `psql` is available To run dbt commands manually after `just test-dbt-fast` (while the container is still up), activate the venv first: ```bash source .venv-dbt/bin/activate cd dbt-pgtrickle/integration_tests dbt deps && dbt run && dbt test ``` After changes to the materialization logic, also test the full-refresh and alter paths: ```bash dbt run --full-refresh # Test drop/recreate sleep 5 dbt test ``` Before committing, verify: 1. All integration tests pass 2. No Jinja compilation errors (`dbt compile` succeeds) 3. Macro naming is consistent with conventions below ## Jinja SQL Conventions ### Naming - **Macros:** `pgtrickle_` prefix (e.g., `pgtrickle_create_stream_table`) - **Materialization:** `stream_table` (registered via `{% materialization stream_table, adapter='postgres' %}`) - **Config keys:** snake_case (e.g., `stream_table_name`, `refresh_mode`) - **Files:** One macro per file, filename matches macro name (without `pgtrickle_` prefix for adapters) ### Quoting and Safety - Always use `dbt.string_literal()` to quote user-provided string values passed to SQL - Never concatenate raw user input into SQL strings - Guard `run_query()` calls with `{% if execute %}` to prevent parse-time execution ### Whitespace Control - Use `{%- ... -%}` (trim) on variable assignments: `{%- set foo = config.get('bar') -%}` - Use `{{ ... }}` (no trim) on output expressions that produce SQL - Put `{# comments #}` on separate lines for readability ### Error Handling - Use `exceptions.raise_compiler_error()` for fatal errors (missing config, invalid values) - Use `log("pg-trickle: ...", info=true)` for informational messages - Check `run_query()` results before accessing `.rows`: ```sql {% set result = run_query(query) %} {% if result and result.rows | length > 0 %} {{ return(result.rows[0]) }} {% else %} {{ return(none) }} {% endif %} ``` - Return `none` (not empty dict) when a stream table is not found ### Materialization Structure The `stream_table` materialization must handle exactly four cases: 1. **Full refresh + exists:** Drop, then create 2. **Does not exist:** Create 3. **Exists + query changed:** Drop, then create 4. **Exists + query unchanged:** Alter (schedule/mode only) or no-op Always call `adapter.cache_new()` after creating a new relation. Always call `run_hooks(pre_hooks)` / `run_hooks(post_hooks)`. Always return `{'relations': [target_relation]}`. ### DDL Transaction Safety (CRITICAL) **Never use `run_query()` for DDL calls to `pgtrickle.*` functions.** dbt wraps each model's main statement in an implicit `BEGIN … ROLLBACK` on the model's connection. `run_query()` reuses the same connection, so the DDL is silently rolled back when the main transaction ends — even when the model reports `OK`. Instead, use `{% call statement(..., auto_begin=False, fetch_result=False) %}` with explicit `BEGIN; … COMMIT;` embedded in the SQL: ```jinja {% call statement('pgtrickle_create', auto_begin=False, fetch_result=False) %} BEGIN; SELECT pgtrickle.create_stream_table(...); COMMIT; {% endcall %} ``` This guarantees the DDL commits unconditionally regardless of what dbt's connection state is. **Rule:** Every `dbt-pgtrickle/macros/adapters/*.sql` macro that mutates the database (create, drop, alter, refresh) must use this pattern. ### Config Defaults | Key | Default | Valid Values | |-----|---------|-------------| | `schedule` | `'1m'` | pg-trickle duration: `30s`, `1m`, `5m`, `1h`, etc. | | `refresh_mode` | `'DIFFERENTIAL'` | `'DIFFERENTIAL'`, `'FULL'` | | `initialize` | `true` | `true`, `false` | ## pg-trickle SQL API All functions live in the `pgtrickle` schema. The extension must be loaded via `shared_preload_libraries = 'pg_trickle'`. | Function | Purpose | |----------|---------| | `pgtrickle.create_stream_table(name, query, schedule, refresh_mode, initialize)` | Create a new stream table | | `pgtrickle.alter_stream_table(name, schedule, refresh_mode, status)` | Alter config (cannot change query) | | `pgtrickle.drop_stream_table(name)` | Drop stream table and all resources | | `pgtrickle.refresh_stream_table(name)` | Synchronous manual refresh | Catalog: `pgtrickle.pgt_stream_tables` (metadata), `pgtrickle.pg_stat_stream_tables` (monitoring view) ## Testing ### Test tiers | Tier | What | Command | |------|------|---------| | Compile check | Jinja syntax, macro resolution | `dbt compile` | | Integration tests | Full lifecycle against real PG + pg-trickle | `cd integration_tests && dbt seed && dbt run && dbt test` | | Full-refresh path | Drop/recreate behavior | `dbt run --full-refresh && dbt test` | | Operations test | Manual refresh, drop-all | `dbt run-operation refresh --args '{model_name: ...}'` | ### Test infrastructure - Integration tests require PostgreSQL 18 with pg-trickle extension - Build the Docker image: `docker build -t pg-trickle-e2e:latest -f tests/Dockerfile.e2e .` (from the [pg-trickle repo](https://github.com/grove/pg-trickle)) - Start: `docker run -d -e POSTGRES_PASSWORD=postgres -p 5432:5432 pg-trickle-e2e:latest` - Create extension: `psql -U postgres -c "CREATE EXTENSION pg_trickle;"` ### Test naming - Data tests: `assert_.sql` (e.g., `assert_totals_correct.sql`) - Schema tests: defined in `schema.yml` alongside model definitions ## Do's and Don'ts ### Do - Prefix all macros with `pgtrickle_` (except the materialization itself) - Use `dbt.string_literal()` for all string parameters to pg-trickle functions - Guard all `run_query()` calls with `{% if execute %}` - Log actions with `{{ log("pg-trickle: ...", info=true) }}` - Handle the "stream table not found" case gracefully (return `none`) - Test both create and full-refresh paths - Keep macros focused — one pg-trickle function per adapter macro ### Don't - Don't use `adapter.dispatch()` — this package is PostgreSQL-only - Don't call pg-trickle functions outside of macro wrappers - Don't use `run_query()` without `{% if execute %}` guard - Don't modify the `__pgt_row_id` column — it's managed by pg-trickle - Don't assume `ALTER` can change the defining query — it can't; must drop/recreate - Don't use `println` or `print` — use `log()` for dbt-compatible output - Don't add Python code — this is a macro-only package (Option A) ## Commit Messages Follow conventional commits: ``` feat: add stream_table materialization fix: handle schema-qualified names in existence check test: add full-refresh integration test docs: update config reference in README chore: update dbt version constraint ``` ## Code Review Checklist - [ ] Macro names follow `pgtrickle_` convention - [ ] All string params use `dbt.string_literal()` - [ ] All `run_query()` calls guarded by `{% if execute %}` - [ ] Materialization handles all four lifecycle cases - [ ] Integration tests pass (create, test, full-refresh, test) - [ ] Log messages are informative and prefixed with `pg-trickle:` - [ ] No raw SQL concatenation of user input - [ ] CHANGELOG.md updated