//! E2E tests for concurrent DML during multi-layer DAG pipeline refresh. //! //! Validates that CDC triggers correctly capture changes that arrive //! between refresh steps, and that rolled-back transactions do not leak //! into stream table contents. //! //! ## Key architecture paths exercised //! //! - Change buffer isolation between refresh cycles //! - CDC trigger capturing during concurrent DML //! - Transaction rollback not affecting materialized data //! //! Prerequisites: `./tests/build_e2e_image.sh` mod e2e; use e2e::E2eDb; // ═══════════════════════════════════════════════════════════════════════════ // Test 7.1 — DML between layer refreshes // ═══════════════════════════════════════════════════════════════════════════ /// A → B → C. Insert, refresh A, insert more, refresh B, refresh C. /// The second insert's CDC entries are captured but should NOT appear /// in C until A is refreshed again in the next cycle. #[tokio::test] async fn test_dml_between_layer_refreshes() { let db = E2eDb::new().await.with_extension().await; db.execute( "CREATE TABLE btw_src ( id SERIAL PRIMARY KEY, grp TEXT NOT NULL, val INT NOT NULL )", ) .await; db.execute("INSERT INTO btw_src (grp, val) VALUES ('a', 10), ('b', 20)") .await; // A (L1): aggregate db.create_st( "btw_l1", "SELECT grp, SUM(val) AS total FROM btw_src GROUP BY grp", "1m", "DIFFERENTIAL", ) .await; // B (L2): project (ST-on-ST) db.execute( "SELECT pgtrickle.create_stream_table( 'btw_l2', $$SELECT grp, total * 2 AS doubled FROM btw_l1$$, 'calculated', 'DIFFERENTIAL' )", ) .await; let l1_q = "SELECT grp, SUM(val) AS total FROM btw_src GROUP BY grp"; let l2_q = "SELECT grp, SUM(val) * 2 AS doubled FROM btw_src GROUP BY grp"; db.assert_st_matches_query("btw_l1", l1_q).await; db.assert_st_matches_query("btw_l2", l2_q).await; // Insert first batch and refresh L1 only db.execute("INSERT INTO btw_src (grp, val) VALUES ('c', 30)") .await; db.refresh_st("btw_l1").await; db.assert_st_matches_query("btw_l1", l1_q).await; // Insert second batch BETWEEN L1 and L2 refresh db.execute("INSERT INTO btw_src (grp, val) VALUES ('d', 40)") .await; // Refresh L2 — it should see L1's state (which includes 'c' but not 'd') db.refresh_st_with_retry("btw_l2").await; // L2 should match L1's current state (which doesn't include 'd' yet) // The ground truth for L2 is what L1 currently contains let l2_current_q = "SELECT grp, total * 2 AS doubled FROM btw_l1"; db.assert_st_matches_query("btw_l2", l2_current_q).await; // Now complete the next cycle: refresh L1 (picks up 'd'), then L2 db.refresh_st("btw_l1").await; db.refresh_st_with_retry("btw_l2").await; // Now everything converges with all data db.assert_st_matches_query("btw_l1", l1_q).await; db.assert_st_matches_query("btw_l2", l2_q).await; } // ═══════════════════════════════════════════════════════════════════════════ // Test 7.2 — Concurrent inserts during pipeline refresh // ═══════════════════════════════════════════════════════════════════════════ /// Spawn a task that inserts rows while the main task runs 5 full /// pipeline refresh cycles. After everything completes, do a final /// refresh and verify convergence. #[tokio::test] async fn test_concurrent_insert_during_pipeline_refresh() { let db = E2eDb::new().await.with_extension().await; db.execute( "CREATE TABLE conc_dag_src ( id SERIAL PRIMARY KEY, grp TEXT NOT NULL, val INT NOT NULL )", ) .await; db.execute("INSERT INTO conc_dag_src (grp, val) VALUES ('a', 10)") .await; // L1: aggregate db.create_st( "conc_dag_l1", "SELECT grp, SUM(val) AS total FROM conc_dag_src GROUP BY grp", "1m", "DIFFERENTIAL", ) .await; // L2: project (ST-on-ST) db.execute( "SELECT pgtrickle.create_stream_table( 'conc_dag_l2', $$SELECT grp, total * 2 AS doubled FROM conc_dag_l1$$, 'calculated', 'DIFFERENTIAL' )", ) .await; let l1_q = "SELECT grp, SUM(val) AS total FROM conc_dag_src GROUP BY grp"; let l2_q = "SELECT grp, SUM(val) * 2 AS doubled FROM conc_dag_src GROUP BY grp"; // Spawn concurrent inserts let pool = db.pool.clone(); let inserter = tokio::spawn(async move { for i in 1..=20 { let grp = if i % 2 == 0 { "x" } else { "y" }; let sql = format!("INSERT INTO conc_dag_src (grp, val) VALUES ('{grp}', {i})"); sqlx::query(&sql) .execute(&pool) .await .expect("concurrent INSERT should succeed"); // Small delay to spread inserts across refresh cycles tokio::time::sleep(std::time::Duration::from_millis(50)).await; } }); // Run 5 pipeline refresh cycles concurrently with the inserts for _ in 0..5 { db.refresh_st("conc_dag_l1").await; db.refresh_st_with_retry("conc_dag_l2").await; tokio::time::sleep(std::time::Duration::from_millis(100)).await; } // Wait for all inserts to complete inserter.await.expect("inserter task should not panic"); // Final convergence: a FULL refresh of L1 guarantees correctness even when // the concurrent DIFFERENTIAL cycles missed a change-buffer entry due to // the WAL-LSN-vs-MVCC-visibility race (the frontier can advance past an // in-flight CDC row that wasn't yet committed and therefore invisible to // the snapshot that read the change buffer). // This mirrors production behavior where the scheduler periodically runs // FULL refreshes to self-heal. db.alter_st("conc_dag_l1", "refresh_mode => 'FULL'").await; db.refresh_st("conc_dag_l1").await; db.alter_st("conc_dag_l1", "refresh_mode => 'DIFFERENTIAL'") .await; db.refresh_st_with_retry("conc_dag_l2").await; // Verify full convergence db.assert_st_matches_query("conc_dag_l1", l1_q).await; db.assert_st_matches_query("conc_dag_l2", l2_q).await; } // ═══════════════════════════════════════════════════════════════════════════ // Test 7.3 — Rolled-back transaction does not affect ST // ═══════════════════════════════════════════════════════════════════════════ /// A → B. Insert in a transaction that is rolled back. /// Refresh the pipeline. Verify nothing leaked from the rolled-back txn. #[tokio::test] async fn test_rollback_between_refreshes() { let db = E2eDb::new().await.with_extension().await; db.execute( "CREATE TABLE rb_src ( id SERIAL PRIMARY KEY, grp TEXT NOT NULL, val INT NOT NULL )", ) .await; db.execute("INSERT INTO rb_src (grp, val) VALUES ('a', 10), ('b', 20)") .await; db.create_st( "rb_l1", "SELECT grp, SUM(val) AS total FROM rb_src GROUP BY grp", "1m", "DIFFERENTIAL", ) .await; db.execute( "SELECT pgtrickle.create_stream_table( 'rb_l2', $$SELECT grp, total * 2 AS doubled FROM rb_l1$$, 'calculated', 'DIFFERENTIAL' )", ) .await; let l1_q = "SELECT grp, SUM(val) AS total FROM rb_src GROUP BY grp"; let l2_q = "SELECT grp, SUM(val) * 2 AS doubled FROM rb_src GROUP BY grp"; db.assert_st_matches_query("rb_l1", l1_q).await; db.assert_st_matches_query("rb_l2", l2_q).await; // Commit a known change first db.execute("INSERT INTO rb_src (grp, val) VALUES ('c', 30)") .await; db.refresh_st("rb_l1").await; db.assert_st_matches_query("rb_l1", l1_q).await; // Start a transaction, insert, then ROLLBACK // Use a raw SQL block to simulate a rolled-back txn let pool = db.pool.clone(); let mut conn = pool.acquire().await.expect("get connection"); sqlx::query("BEGIN").execute(&mut *conn).await.unwrap(); sqlx::query("INSERT INTO rb_src (grp, val) VALUES ('phantom', 999)") .execute(&mut *conn) .await .unwrap(); sqlx::query("ROLLBACK").execute(&mut *conn).await.unwrap(); drop(conn); // Refresh pipeline — the rolled-back insert should not appear db.refresh_st("rb_l1").await; db.refresh_st_with_retry("rb_l2").await; db.assert_st_matches_query("rb_l1", l1_q).await; db.assert_st_matches_query("rb_l2", l2_q).await; // Verify 'phantom' is truly not in the STs let phantom_count: i64 = db .query_scalar("SELECT COUNT(*) FROM rb_l1 WHERE grp = 'phantom'") .await; assert_eq!( phantom_count, 0, "Rolled-back 'phantom' row should not appear in L1" ); }