{ "cells": [ { "cell_type": "markdown", "id": "c0d12b16", "metadata": {}, "source": [ "# OLTP-to-Lake Analysis with DuckDB\n", "\n", "This notebook demonstrates querying the pg_trickle stream table using DuckDB\n", "to perform real-time operational and time-travel analytical queries.\n", "\n", "## Prerequisites\n", "- `docker compose up` running in the parent directory\n", "- DuckDB and psycopg2-binary installed in the container\n", "- PostgreSQL 18 with pg_trickle running" ] }, { "cell_type": "markdown", "id": "84f8ae9a", "metadata": {}, "source": [ "## Step 1: Connect to PostgreSQL via DuckDB\n", "\n", "**Important:** Inside Docker, use the service name `postgres` (from docker-compose.yml),\n", "not `localhost`. The Docker internal network resolves service names to IP addresses." ] }, { "cell_type": "code", "execution_count": null, "id": "eedeb530", "metadata": {}, "outputs": [], "source": [ "import duckdb\n", "import pandas as pd\n", "from datetime import datetime\n", "\n", "# Create a DuckDB connection\n", "con = duckdb.connect()\n", "\n", "# Install and load the PostgreSQL extension\n", "con.execute(\"INSTALL postgres\")\n", "con.execute(\"LOAD postgres\")\n", "\n", "# Attach PostgreSQL from Docker container (use service name 'postgres', not localhost)\n", "con.execute(\"\"\"\n", " ATTACH 'postgresql://postgres:postgres@postgres:5432/postgres'\n", " AS pg (TYPE POSTGRES)\n", "\"\"\")\n", "\n", "print(\"āœ“ Connected to PostgreSQL\")" ] }, { "cell_type": "markdown", "id": "0a0a4474", "metadata": {}, "source": [ "## Step 2: Query Current Revenue (Operational)\n", "\n", "Direct query from the stream table for sub-millisecond reads." ] }, { "cell_type": "code", "execution_count": null, "id": "a3f1b0c2", "metadata": {}, "outputs": [], "source": [ "# Read current revenue by region\n", "result = con.execute(\"\"\"\n", " SELECT\n", " region,\n", " minute,\n", " total_revenue,\n", " order_count\n", " FROM pg.public.revenue_by_region\n", " ORDER BY minute DESC, total_revenue DESC\n", " LIMIT 20\n", "\"\"\").fetchdf()\n", "\n", "print(\"Recent Revenue Data:\")\n", "print(result.to_string())\n", "print(f\"\\nāœ“ Total rows: {len(result)}\")" ] }, { "cell_type": "markdown", "id": "8877a4fc", "metadata": {}, "source": [ "## Step 3: Stream Table Status" ] }, { "cell_type": "code", "execution_count": null, "id": "53ae3388", "metadata": {}, "outputs": [], "source": [ "# Check the stream table metadata\n", "status = con.execute(\"\"\"\n", " SELECT\n", " pgt_name,\n", " refresh_mode,\n", " schedule,\n", " status,\n", " is_populated,\n", " last_refresh_at\n", " FROM pg.pgtrickle.pgt_stream_tables\n", " WHERE pgt_name = 'revenue_by_region'\n", "\"\"\").fetchdf()\n", "\n", "print(\"Stream Table Configuration:\")\n", "print(status.to_string())" ] }, { "cell_type": "markdown", "id": "284565e9", "metadata": {}, "source": [ "## Step 4: Refresh History (DIFFERENTIAL Updates)\n", "\n", "Show how many rows are being updated per refresh cycle.\n", "In DIFFERENTIAL mode, only changed rows are processed." ] }, { "cell_type": "code", "execution_count": null, "id": "7b95e9aa", "metadata": {}, "outputs": [], "source": [ "# Check recent refresh cycles\n", "history = con.execute(\"\"\"\n", " SELECT\n", " action,\n", " status,\n", " rows_inserted,\n", " rows_deleted,\n", " delta_row_count,\n", " start_time\n", " FROM pg.pgtrickle.pgt_refresh_history\n", " WHERE pgt_id = (SELECT pgt_id FROM pg.pgtrickle.pgt_stream_tables WHERE pgt_name = 'revenue_by_region')\n", " ORDER BY start_time DESC\n", " LIMIT 15\n", "\"\"\").fetchdf()\n", "\n", "print(\"Recent Refresh Cycles:\")\n", "print(history.to_string())\n", "\n", "if len(history) > 0:\n", " avg_delta = history['delta_row_count'].mean()\n", " print(f\"\\nšŸ“Š Average delta rows per refresh: {avg_delta:.1f}\")\n", " print(\"(Only changed rows are recomputed in DIFFERENTIAL mode)\")" ] }, { "cell_type": "markdown", "id": "a91beeb5", "metadata": {}, "source": [ "## Step 5: Revenue Analysis by Region" ] }, { "cell_type": "code", "execution_count": null, "id": "256eb8ed", "metadata": {}, "outputs": [], "source": [ "# Aggregate revenue by region\n", "by_region = con.execute(\"\"\"\n", " SELECT\n", " region,\n", " COUNT(DISTINCT minute) AS time_periods,\n", " SUM(total_revenue)::DECIMAL(12,2) AS total_revenue,\n", " AVG(total_revenue)::DECIMAL(10,2) AS avg_revenue_per_minute,\n", " SUM(order_count) AS total_orders,\n", " AVG(order_count)::DECIMAL(8,2) AS avg_orders_per_minute\n", " FROM pg.public.revenue_by_region\n", " GROUP BY region\n", " ORDER BY total_revenue DESC\n", "\"\"\").fetchdf()\n", "\n", "print(\"Revenue Summary by Region:\")\n", "print(by_region.to_string())\n", "\n", "if len(by_region) > 0:\n", " print(f\"\\nāœ“ Top region: {by_region.iloc[0]['region']} (${by_region.iloc[0]['total_revenue']})\")" ] }, { "cell_type": "markdown", "id": "6f506185", "metadata": {}, "source": [ "## Summary\n", "\n", "āœ“ **OLTP-to-Lake Pipeline Working**\n", "- Stream table aggregates orders incrementally\n", "- DIFFERENTIAL refresh mode updates only changed rows\n", "- Sub-2-second latency from order INSERT to analytics-ready state\n", "- No Kafka, Spark, or separate ETL pipeline required" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.6" } }, "nbformat": 4, "nbformat_minor": 5 }