//! E2E tests for Row-Level Security (RLS) interaction with stream tables. //! //! Validates: //! - R5: RLS on source tables does not affect stream table content //! - R7: RLS on stream tables filters reads per role //! - R8: IMMEDIATE mode + RLS on stream table //! - R10: ENABLE/DISABLE RLS on source table triggers reinit //! //! Prerequisites: `just build-e2e-image` mod e2e; use e2e::E2eDb; /// R5: RLS enabled on a source table must not filter the stream table content. /// The refresh engine bypasses RLS, so the stream table always contains all rows. #[tokio::test] async fn test_rls_on_source_does_not_filter_stream_table() { let db = E2eDb::new().await.with_extension().await; // Create source table with tenant data db.execute( "CREATE TABLE rls_src (id INT PRIMARY KEY, tenant_id INT NOT NULL, val TEXT NOT NULL)", ) .await; db.execute("INSERT INTO rls_src VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c')") .await; // Enable RLS on the source table db.execute("ALTER TABLE rls_src ENABLE ROW LEVEL SECURITY") .await; // Create a restrictive policy that only allows tenant_id = 10 db.execute( "CREATE POLICY tenant_policy ON rls_src \ USING (tenant_id = 10)", ) .await; // Create stream table on the RLS-protected source db.create_st( "rls_src_st", "SELECT id, tenant_id, val FROM rls_src", "1m", "FULL", ) .await; // Refresh the stream table — should contain ALL rows despite RLS db.refresh_st("rls_src_st").await; let count: i64 = db.count("public.rls_src_st").await; assert_eq!( count, 3, "stream table should contain all 3 rows despite RLS on source" ); } /// R5 variant: RLS on source table with DIFFERENTIAL refresh mode. #[tokio::test] async fn test_rls_on_source_differential_mode() { let db = E2eDb::new().await.with_extension().await; db.execute( "CREATE TABLE rls_diff_src (id INT PRIMARY KEY, tenant_id INT NOT NULL, val TEXT NOT NULL)", ) .await; db.execute("INSERT INTO rls_diff_src VALUES (1, 10, 'a'), (2, 20, 'b')") .await; // Enable RLS with restrictive policy db.execute("ALTER TABLE rls_diff_src ENABLE ROW LEVEL SECURITY") .await; db.execute("CREATE POLICY tenant_only ON rls_diff_src USING (tenant_id = 10)") .await; // Create stream table with DIFFERENTIAL mode db.create_st( "rls_diff_st", "SELECT id, tenant_id, val FROM rls_diff_src", "1m", "DIFFERENTIAL", ) .await; db.refresh_st("rls_diff_st").await; let count: i64 = db.count("public.rls_diff_st").await; assert_eq!( count, 2, "DIFFERENTIAL stream table should contain all rows despite RLS" ); // Insert another row (tenant_id = 20, would be filtered by RLS) db.execute("INSERT INTO rls_diff_src VALUES (3, 20, 'c')") .await; db.refresh_st("rls_diff_st").await; let count: i64 = db.count("public.rls_diff_st").await; assert_eq!( count, 3, "DIFFERENTIAL refresh should see all rows including RLS-filtered ones" ); } /// R7: RLS on the stream table itself filters reads per role. #[tokio::test] async fn test_rls_on_stream_table_filters_reads() { let db = E2eDb::new().await.with_extension().await; // Create source and populate db.execute( "CREATE TABLE rls_st_src (id INT PRIMARY KEY, tenant_id INT NOT NULL, val TEXT NOT NULL)", ) .await; db.execute( "INSERT INTO rls_st_src VALUES (1, 10, 'a'), (2, 20, 'b'), (3, 10, 'c'), (4, 30, 'd')", ) .await; // Create stream table db.create_st( "rls_st_test", "SELECT id, tenant_id, val FROM rls_st_src", "1m", "FULL", ) .await; db.refresh_st("rls_st_test").await; // Verify all rows present before RLS let total: i64 = db.count("public.rls_st_test").await; assert_eq!(total, 4, "stream table should have all 4 rows"); // Enable RLS on the stream table db.execute("ALTER TABLE public.rls_st_test ENABLE ROW LEVEL SECURITY") .await; // Create a test role and grant access db.execute("CREATE ROLE rls_reader LOGIN").await; db.execute("GRANT USAGE ON SCHEMA public TO rls_reader") .await; db.execute("GRANT SELECT ON public.rls_st_test TO rls_reader") .await; // Create a policy that only allows tenant_id = 10 db.execute( "CREATE POLICY tenant_filter ON public.rls_st_test \ FOR SELECT TO rls_reader \ USING (tenant_id = 10)", ) .await; // As superuser we still see all rows (superuser bypasses RLS by default) let superuser_count: i64 = db .query_scalar("SELECT count(*) FROM public.rls_st_test") .await; assert_eq!( superuser_count, 4, "superuser should bypass RLS and see all rows" ); // Actually query as the restricted role to verify RLS filtering works. // SET LOCAL ROLE within a transaction applies for that transaction only. let filtered_count: i64 = { let mut txn = db.pool.begin().await.unwrap(); sqlx::query("SET LOCAL ROLE rls_reader") .execute(&mut *txn) .await .unwrap(); let cnt: i64 = sqlx::query_scalar("SELECT count(*) FROM public.rls_st_test") .fetch_one(&mut *txn) .await .unwrap(); txn.rollback().await.unwrap(); cnt }; assert_eq!( filtered_count, 2, "rls_reader should only see 2 rows where tenant_id = 10 (RLS policy enforced)" ); // Verify the policy exists and is correctly configured let policy_count: i64 = db .query_scalar( "SELECT count(*) FROM pg_policies \ WHERE tablename = 'rls_st_test' AND policyname = 'tenant_filter'", ) .await; assert_eq!(policy_count, 1, "policy should exist on the stream table"); } /// R8: IMMEDIATE mode stream table works correctly with RLS enabled on it. #[tokio::test] async fn test_rls_on_stream_table_immediate_mode() { let db = E2eDb::new().await.with_extension().await; // Create source table db.execute( "CREATE TABLE rls_imm_src (id INT PRIMARY KEY, tenant_id INT NOT NULL, val TEXT NOT NULL)", ) .await; db.execute("INSERT INTO rls_imm_src VALUES (1, 10, 'a'), (2, 20, 'b')") .await; // Create IMMEDIATE mode stream table db.create_st( "rls_imm_st", "SELECT id, tenant_id, val FROM rls_imm_src", "1m", "IMMEDIATE", ) .await; // Verify the initial population let count: i64 = db.count("public.rls_imm_st").await; assert_eq!(count, 2, "initial population should have 2 rows"); // Enable RLS on the stream table db.execute("ALTER TABLE public.rls_imm_st ENABLE ROW LEVEL SECURITY") .await; // Create a policy db.execute( "CREATE POLICY imm_tenant ON public.rls_imm_st \ USING (tenant_id = 10)", ) .await; // Insert a new row into the source — IVM trigger should update the stream table // The trigger runs as SECURITY DEFINER so it sees all rows for delta computation db.execute("INSERT INTO rls_imm_src VALUES (3, 10, 'c')") .await; // As superuser, we bypass RLS and should see all 3 rows let total: i64 = db.count("public.rls_imm_st").await; assert_eq!( total, 3, "IMMEDIATE mode should propagate insert despite RLS on stream table" ); // Insert a row for tenant 20 — should also appear in the stream table db.execute("INSERT INTO rls_imm_src VALUES (4, 20, 'd')") .await; let total: i64 = db.count("public.rls_imm_st").await; assert_eq!( total, 4, "all rows should be in stream table regardless of RLS policy" ); } /// R2: Verify that change buffer tables have RLS explicitly disabled. #[tokio::test] async fn test_change_buffer_rls_disabled() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE rls_buf_src (id INT PRIMARY KEY, val TEXT)") .await; db.execute("INSERT INTO rls_buf_src VALUES (1, 'hello')") .await; db.create_st( "rls_buf_st", "SELECT id, val FROM rls_buf_src", "1m", "DIFFERENTIAL", ) .await; // Check that the change buffer table has RLS disabled let rls_enabled: bool = db .query_scalar( "SELECT relrowsecurity FROM pg_class \ WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'pgtrickle_changes') \ AND relname LIKE 'changes_%' \ LIMIT 1", ) .await; assert!( !rls_enabled, "change buffer table should have RLS disabled (relrowsecurity = false)" ); } /// R4: Verify that IVM trigger functions are SECURITY DEFINER. #[tokio::test] async fn test_ivm_trigger_functions_security_definer() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE rls_secdef_src (id INT PRIMARY KEY, val TEXT NOT NULL)") .await; db.execute("INSERT INTO rls_secdef_src VALUES (1, 'a')") .await; // Create an IMMEDIATE mode stream table (creates IVM triggers) db.create_st( "rls_secdef_st", "SELECT id, val FROM rls_secdef_src", "1m", "IMMEDIATE", ) .await; // Check that the IVM trigger functions are SECURITY DEFINER let secdef_count: i64 = db .query_scalar( "SELECT count(*) FROM pg_proc \ WHERE proname LIKE 'pgt_ivm_%' \ AND prosecdef = true", ) .await; assert!( secdef_count > 0, "IVM trigger functions should be SECURITY DEFINER (prosecdef = true), found {secdef_count}" ); // Also verify search_path is set (proconfig should contain search_path) let has_search_path: bool = db .query_scalar( "SELECT EXISTS( \ SELECT 1 FROM pg_proc \ WHERE proname LIKE 'pgt_ivm_%' \ AND prosecdef = true \ AND proconfig @> ARRAY['search_path=pg_catalog, pgtrickle, pgtrickle_changes, public'] \ )", ) .await; assert!( has_search_path, "IVM SECURITY DEFINER functions should have a locked search_path" ); } // ══════════════════════════════════════════════════════════════════════ // R10 — ENABLE/DISABLE RLS on source table triggers reinit // ══════════════════════════════════════════════════════════════════════ /// R10: ENABLE ROW LEVEL SECURITY on a source table should mark stream tables /// for reinit via the DDL event trigger hook. #[tokio::test] async fn test_enable_rls_on_source_triggers_reinit() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE rls_ddl_src (id INT PRIMARY KEY, val TEXT)") .await; db.execute("INSERT INTO rls_ddl_src VALUES (1, 'a'), (2, 'b')") .await; db.create_st( "rls_ddl_st", "SELECT id, val FROM rls_ddl_src", "1m", "DIFFERENTIAL", ) .await; // Initial refresh so the snapshot is stored. db.refresh_st("rls_ddl_st").await; let count: i64 = db.count("public.rls_ddl_st").await; assert_eq!(count, 2, "initial refresh should populate 2 rows"); // Verify not marked for reinit before the DDL. let before: bool = db .query_scalar( "SELECT needs_reinit FROM pgtrickle.pgt_stream_tables WHERE pgt_name = 'rls_ddl_st'", ) .await; assert!(!before, "ST should NOT need reinit before ENABLE RLS"); // ENABLE RLS on the source table — should trigger reinit. db.execute("ALTER TABLE rls_ddl_src ENABLE ROW LEVEL SECURITY") .await; let after: bool = db .query_scalar( "SELECT needs_reinit FROM pgtrickle.pgt_stream_tables WHERE pgt_name = 'rls_ddl_st'", ) .await; assert!( after, "ST should be marked for reinit after ENABLE RLS on source" ); // Reinit (refresh) should succeed and the stream table should still // contain all rows (superuser context bypasses RLS). db.refresh_st("rls_ddl_st").await; let count: i64 = db.count("public.rls_ddl_st").await; assert_eq!( count, 2, "stream table should still contain all rows after reinit" ); } /// R10 variant: DISABLE ROW LEVEL SECURITY on a source table that previously /// had RLS enabled should also trigger reinit. #[tokio::test] async fn test_disable_rls_on_source_triggers_reinit() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE rls_dis_src (id INT PRIMARY KEY, val TEXT)") .await; db.execute("INSERT INTO rls_dis_src VALUES (1, 'x'), (2, 'y')") .await; // Enable RLS first, then create the ST so the snapshot stores rls_enabled=true. db.execute("ALTER TABLE rls_dis_src ENABLE ROW LEVEL SECURITY") .await; db.create_st( "rls_dis_st", "SELECT id, val FROM rls_dis_src", "1m", "DIFFERENTIAL", ) .await; db.refresh_st("rls_dis_st").await; // DISABLE RLS — snapshot had rls_enabled=true, now it's false. db.execute("ALTER TABLE rls_dis_src DISABLE ROW LEVEL SECURITY") .await; let reinit: bool = db .query_scalar( "SELECT needs_reinit FROM pgtrickle.pgt_stream_tables WHERE pgt_name = 'rls_dis_st'", ) .await; assert!( reinit, "ST should be marked for reinit after DISABLE RLS on source" ); } /// R10 variant: FORCE ROW LEVEL SECURITY on a source table triggers reinit. #[tokio::test] async fn test_force_rls_on_source_triggers_reinit() { let db = E2eDb::new().await.with_extension().await; db.execute("CREATE TABLE rls_force_src (id INT PRIMARY KEY, val TEXT)") .await; db.execute("INSERT INTO rls_force_src VALUES (1, 'p')") .await; db.create_st( "rls_force_st", "SELECT id, val FROM rls_force_src", "1m", "FULL", ) .await; db.refresh_st("rls_force_st").await; // FORCE ROW LEVEL SECURITY — should trigger reinit. db.execute("ALTER TABLE rls_force_src FORCE ROW LEVEL SECURITY") .await; let reinit: bool = db .query_scalar( "SELECT needs_reinit FROM pgtrickle.pgt_stream_tables WHERE pgt_name = 'rls_force_st'", ) .await; assert!( reinit, "ST should be marked for reinit after FORCE RLS on source" ); }