//! E2E tests for diamond dependency consistency (atomic refresh groups). //! //! These tests validate the diamond_consistency parameter on //! `create_stream_table` / `alter_stream_table`, the //! `pgtrickle.diamond_groups()` monitoring function, and end-to-end //! consistent refresh semantics for diamond topologies. mod e2e; use e2e::E2eDb; // ── Diamond topology helpers ─────────────────────────────────────────── /// Set up a simple diamond: A (base table) → B, C (stream tables) → D (stream table). /// /// A is a base table with `id INT, val INT`. /// B selects from A with a filter, C selects from A with a different filter, /// D joins B and C. async fn setup_diamond(db: &E2eDb, dc: &str) { // Base table db.execute("CREATE TABLE src_a (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_a VALUES (1, 10), (2, 20), (3, 30)") .await; // B: reads from A let sql_b = format!( "SELECT pgtrickle.create_stream_table('st_b', $$SELECT id, val FROM src_a WHERE val <= 20$$, \ '1m', 'FULL', true, '{dc}')" ); db.execute(&sql_b).await; // C: reads from A let sql_c = format!( "SELECT pgtrickle.create_stream_table('st_c', $$SELECT id, val FROM src_a WHERE val >= 20$$, \ '1m', 'FULL', true, '{dc}')" ); db.execute(&sql_c).await; // D: joins B and C (fan-in — the diamond convergence point) let sql_d = format!( "SELECT pgtrickle.create_stream_table('st_d', \ $$SELECT b.id AS b_id, c.id AS c_id, b.val + c.val AS total FROM st_b b JOIN st_c c ON b.id = c.id$$, \ '1m', 'FULL', true, '{dc}')" ); db.execute(&sql_d).await; } // ── Tests ────────────────────────────────────────────────────────────── /// Verify that the `diamond_consistency` column defaults to `'atomic'` /// when not specified (EC-13: default changed from `'none'` to `'atomic'`). #[tokio::test] async fn test_diamond_consistency_default() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src VALUES (1, 10)").await; db.create_st("test_default_dc", "SELECT * FROM src", "1m", "FULL") .await; let dc: String = db .query_scalar( "SELECT diamond_consistency FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_default_dc'", ) .await; assert_eq!( dc, "atomic", "expected default diamond_consistency to be 'atomic' (EC-13)" ); } /// Verify that create_stream_table accepts diamond_consistency='atomic'. #[tokio::test] async fn test_diamond_consistency_create_atomic() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src VALUES (1, 10)").await; db.execute( "SELECT pgtrickle.create_stream_table('test_atomic_dc', \ $$SELECT * FROM src$$, '1m', 'FULL', true, 'atomic')", ) .await; let dc: String = db .query_scalar( "SELECT diamond_consistency FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_atomic_dc'", ) .await; assert_eq!(dc, "atomic"); } /// Verify that alter_stream_table can change diamond_consistency. #[tokio::test] async fn test_diamond_consistency_alter() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src VALUES (1, 10)").await; db.create_st("test_alter_dc", "SELECT * FROM src", "1m", "FULL") .await; // Should start as 'atomic' (EC-13 default) let dc: String = db .query_scalar( "SELECT diamond_consistency FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_alter_dc'", ) .await; assert_eq!(dc, "atomic"); // Alter to 'none' db.alter_st("test_alter_dc", "diamond_consistency => 'none'") .await; let dc: String = db .query_scalar( "SELECT diamond_consistency FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_alter_dc'", ) .await; assert_eq!(dc, "none"); // Alter back to 'atomic' db.alter_st("test_alter_dc", "diamond_consistency => 'atomic'") .await; let dc: String = db .query_scalar( "SELECT diamond_consistency FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_alter_dc'", ) .await; assert_eq!(dc, "atomic"); } /// Verify that a diamond topology with diamond_consistency='atomic' detects /// the correct diamond group membership via `pgtrickle.diamond_groups()`. #[tokio::test] async fn test_diamond_groups_sql_function() { let db = E2eDb::new().await.with_extension().await; setup_diamond(&db, "atomic").await; // The diamond_groups() function should show a non-empty result let group_count: i64 = db .query_scalar("SELECT count(DISTINCT group_id) FROM pgtrickle.diamond_groups()") .await; assert!( group_count >= 1, "expected at least one diamond group, got {}", group_count ); // Check that the group has at least 2 members let member_count: i64 = db .query_scalar("SELECT count(*) FROM pgtrickle.diamond_groups()") .await; assert!( member_count >= 2, "expected at least 2 members in diamond group, got {}", member_count ); } /// Verify that a linear chain (no diamond) does not create consistency groups. #[tokio::test] async fn test_diamond_linear_unaffected() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src_lin (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_lin VALUES (1, 10)").await; // Linear chain: src_lin → st_x → st_y (no diamond) db.execute( "SELECT pgtrickle.create_stream_table('st_x', \ $$SELECT id, val FROM src_lin$$, '1m', 'FULL', true, 'atomic')", ) .await; db.execute( "SELECT pgtrickle.create_stream_table('st_y', \ $$SELECT id, val FROM st_x$$, '1m', 'FULL', true, 'atomic')", ) .await; // diamond_groups() should return 0 rows (no diamond in a linear chain) let group_count: i64 = db .query_scalar("SELECT count(DISTINCT group_id) FROM pgtrickle.diamond_groups()") .await; assert_eq!( group_count, 0, "linear chain should not produce diamond groups" ); } /// Verify that diamond_consistency='none' mode does not create groups. /// Even in a diamond topology, 'none' means no grouping. #[tokio::test] async fn test_diamond_none_mode_no_groups() { let db = E2eDb::new().await.with_extension().await; setup_diamond(&db, "none").await; // diamond_groups() shows topology-detected diamonds regardless of mode, // but the scheduler won't use SAVEPOINT for 'none' mode STs. // Verify the function returns results (diamond topology is detected). let count: i64 = db .query_scalar("SELECT count(*) FROM pgtrickle.diamond_groups()") .await; assert!( count >= 0, "diamond_groups() should return a non-negative count, got {count}" ); } /// Verify that refreshing all STs in a diamond succeeds and D contains /// consistent data when all members are in atomic mode. #[tokio::test] async fn test_diamond_atomic_all_succeed() { let db = E2eDb::new().await.with_extension().await; setup_diamond(&db, "atomic").await; // After creation with initialize=true, all STs should be populated. let d_count: i64 = db.count("st_d").await; // D joins B and C on id where val<=20 (B: {1,2}) and val>=20 (C: {2,3}) // Intersection: id=2 (val 20 appears in both). So D should have 1 row. assert_eq!( d_count, 1, "expected 1 row in st_d after initial population" ); // Verify D's content is consistent // total = b.val + c.val (INT4 + INT4 = INT4 in PostgreSQL) let total: i32 = db .query_scalar("SELECT total FROM st_d WHERE b_id = 2") .await; assert_eq!(total, 40, "expected total = 20 + 20 = 40"); // Insert more data and manually refresh db.execute("INSERT INTO src_a VALUES (4, 20)").await; db.refresh_st("st_b").await; db.refresh_st("st_c").await; db.refresh_st("st_d").await; // Now B has {1,2,4} (val<=20), C has {2,3,4} (val>=20), so D = {2,4} let d_count: i64 = db.count("st_d").await; assert_eq!(d_count, 2, "expected 2 rows in st_d after refresh"); } /// Verify that the `diamond_consistency` column is visible in the catalog. #[tokio::test] async fn test_diamond_consistency_in_catalog_view() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src_cat (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_cat VALUES (1, 10)").await; db.execute( "SELECT pgtrickle.create_stream_table('test_cat_dc', \ $$SELECT * FROM src_cat$$, '1m', 'FULL', true, 'atomic')", ) .await; // Query it from the info view (which selects st.*) let dc: String = db .query_scalar( "SELECT diamond_consistency FROM pgtrickle.stream_tables_info \ WHERE pgt_name = 'test_cat_dc'", ) .await; assert_eq!(dc, "atomic"); } // ── Diamond Schedule Policy Tests ────────────────────────────────────── /// Verify that the `diamond_schedule_policy` column defaults to `'fastest'` /// when not explicitly specified. #[tokio::test] async fn test_diamond_schedule_policy_default() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src_sp (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_sp VALUES (1, 10)").await; db.create_st("test_default_sp", "SELECT * FROM src_sp", "1m", "FULL") .await; let sp: String = db .query_scalar( "SELECT diamond_schedule_policy FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_default_sp'", ) .await; assert_eq!( sp, "fastest", "expected default diamond_schedule_policy to be 'fastest'" ); } /// Verify that create_stream_table accepts diamond_schedule_policy='slowest'. #[tokio::test] async fn test_diamond_schedule_policy_create_slowest() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src_sp2 (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_sp2 VALUES (1, 10)").await; db.execute( "SELECT pgtrickle.create_stream_table('test_slowest_sp', \ $$SELECT * FROM src_sp2$$, '1m', 'FULL', true, 'atomic', 'slowest')", ) .await; let sp: String = db .query_scalar( "SELECT diamond_schedule_policy FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_slowest_sp'", ) .await; assert_eq!(sp, "slowest"); } /// Verify that alter_stream_table can change diamond_schedule_policy. #[tokio::test] async fn test_diamond_schedule_policy_alter() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src_sp3 (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_sp3 VALUES (1, 10)").await; db.create_st("test_alter_sp", "SELECT * FROM src_sp3", "1m", "FULL") .await; // Should start as 'fastest' let sp: String = db .query_scalar( "SELECT diamond_schedule_policy FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_alter_sp'", ) .await; assert_eq!(sp, "fastest"); // Alter to 'slowest' db.alter_st("test_alter_sp", "diamond_schedule_policy => 'slowest'") .await; let sp: String = db .query_scalar( "SELECT diamond_schedule_policy FROM pgtrickle.pgt_stream_tables \ WHERE pgt_name = 'test_alter_sp'", ) .await; assert_eq!(sp, "slowest"); } /// Verify that diamond_groups() returns the schedule_policy column. #[tokio::test] async fn test_diamond_groups_shows_schedule_policy() { let db = E2eDb::new().await.with_extension().await; setup_diamond(&db, "atomic").await; let sp: String = db .query_scalar("SELECT schedule_policy FROM pgtrickle.diamond_groups() LIMIT 1") .await; assert!( sp == "fastest" || sp == "slowest", "expected schedule_policy to be 'fastest' or 'slowest', got '{}'", sp ); } /// Verify that setting diamond_schedule_policy on the convergence node is /// reflected in diamond_groups(). #[tokio::test] async fn test_diamond_schedule_policy_convergence_override() { let db = E2eDb::new().await.with_extension().await; setup_diamond(&db, "atomic").await; // Set the convergence node (st_d) to 'slowest' db.alter_st("st_d", "diamond_schedule_policy => 'slowest'") .await; // diamond_groups() should show 'slowest' for all members of the group let sp: String = db .query_scalar( "SELECT schedule_policy FROM pgtrickle.diamond_groups() \ WHERE is_convergence = true LIMIT 1", ) .await; assert_eq!( sp, "slowest", "convergence node 'slowest' should be reflected in diamond_groups()" ); } /// Verify that diamond_schedule_policy is visible in the catalog info view. #[tokio::test] async fn test_diamond_schedule_policy_in_catalog_view() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src_sp4 (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_sp4 VALUES (1, 10)").await; db.execute( "SELECT pgtrickle.create_stream_table('test_cat_sp', \ $$SELECT * FROM src_sp4$$, '1m', 'FULL', true, 'atomic', 'slowest')", ) .await; let sp: String = db .query_scalar( "SELECT diamond_schedule_policy FROM pgtrickle.stream_tables_info \ WHERE pgt_name = 'test_cat_sp'", ) .await; assert_eq!(sp, "slowest"); } /// Verify that invalid diamond_schedule_policy values are rejected. #[tokio::test] async fn test_diamond_schedule_policy_invalid_rejected() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE src_sp5 (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO src_sp5 VALUES (1, 10)").await; let result = db .try_execute( "SELECT pgtrickle.create_stream_table('test_bad_sp', \ $$SELECT * FROM src_sp5$$, '1m', 'FULL', true, 'atomic', 'invalid')", ) .await; assert!( result.is_err(), "expected error for invalid diamond_schedule_policy" ); } /// B3-3: Diamond flow property test verification setup. /// When multiple sources in a complex merge flow update simultaneously, /// they emit duplicate correcting rows (`DISTINCT ON` corrupted this by dropping them). /// Z-set integration ensures this mathematical accuracy holds up. #[tokio::test] async fn test_diamond_flow_simultaneous_multi_source_update() { let db = E2eDb::new().await.with_extension().await; // Base table db.execute("CREATE TABLE root_table (id INT PRIMARY KEY, val INT)") .await; db.execute("INSERT INTO root_table VALUES (1, 100)").await; db.create_st( "st_branch_a", "SELECT id, val + 10 as score_a FROM root_table", "1m", "DIFFERENTIAL", ) .await; db.create_st( "st_branch_b", "SELECT id, val - 10 as score_b FROM root_table", "1m", "DIFFERENTIAL", ) .await; // Diamond tip: Joins A and B db.create_st( "st_diamond_tip", "SELECT a.id, a.score_a, b.score_b FROM st_branch_a a JOIN st_branch_b b ON a.id = b.id", "1m", "DIFFERENTIAL", ) .await; db.execute("SELECT pgtrickle.refresh_stream_table('st_diamond_tip')") .await; // DML triggers branches A and B to recalculate db.execute("UPDATE root_table SET val = 200 WHERE id = 1") .await; // Refresh ALL tables systematically db.execute("SELECT pgtrickle.refresh_stream_table('st_branch_a')") .await; db.execute("SELECT pgtrickle.refresh_stream_table('st_branch_b')") .await; db.execute("SELECT pgtrickle.refresh_stream_table('st_diamond_tip')") .await; db.assert_st_matches_query( "st_diamond_tip", "SELECT 1 as id, 210 as score_a, 190 as score_b", ) .await; }