// Copyright (c) 2023-2026 ParadeDB, Inc. // // This file is part of ParadeDB - Postgres for Search and Analytics // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . // Tests for ColumnarExecState implementation // Includes both basic functionality tests and corner/edge cases mod fixtures; use bigdecimal::BigDecimal; use fixtures::db::Query; use fixtures::*; use pretty_assertions::assert_eq; use rstest::*; use serde_json::Value; use sqlx::PgConnection; // Helper function to get all execution methods in the plan fn get_all_exec_methods(plan: &Value) -> Vec { let mut methods = Vec::new(); extract_methods(plan, &mut methods); methods } // Recursive function to walk the plan tree fn extract_methods(node: &Value, methods: &mut Vec) { if let Some(exec_method) = node.get("Exec Method") { if let Some(method) = exec_method.as_str() { methods.push(method.to_string()); } } // Check child plans if let Some(plans) = node.get("Plans") { if let Some(plans_array) = plans.as_array() { for plan in plans_array { extract_methods(plan, methods); } } } // Start from the root if given the root plan if let Some(root) = node.get(0) { if let Some(plan_node) = root.get("Plan") { extract_methods(plan_node, methods); } } } // Setup for complex aggregation with columnar storage fn complex_aggregation_setup() -> &'static str { r#" DROP TABLE IF EXISTS expected_payments; CREATE TABLE expected_payments ( id SERIAL PRIMARY KEY, organization_id UUID NOT NULL, live_mode BOOLEAN NOT NULL, status TEXT NOT NULL, internal_account_id UUID NOT NULL, amount_range NUMRANGE NOT NULL, amount_reconciled NUMERIC NOT NULL, direction TEXT NOT NULL CHECK (direction IN ('credit','debit')), currency TEXT NOT NULL, discarded_at TIMESTAMP NULL ); INSERT INTO expected_payments ( organization_id, live_mode, status, internal_account_id, amount_range, amount_reconciled, direction, currency, discarded_at ) SELECT organization_id, live_mode, status, internal_account_id, numrange(lower_val, lower_val + offset_val) AS amount_range, amount_reconciled, direction, currency, discarded_at FROM ( SELECT -- random UUID (md5(random()::text))::uuid AS organization_id, -- 50/50 live_mode (random() < 0.5) AS live_mode, -- status pick (ARRAY['unreconciled','partially_reconciled']) [floor(random()*2 + 1)::int] AS status, -- another random UUID (md5(random()::text))::uuid AS internal_account_id, -- ensure lower ≤ upper by generating an offset floor(random()*1000)::int AS lower_val, floor(random()*100)::int + 1 AS offset_val, -- reconciled amount between –500 and +500 (random()*1000 - 500)::numeric AS amount_reconciled, -- direction pick (ARRAY['credit','debit'])[floor(random()*2 + 1)::int] AS direction, -- currency pick (ARRAY['USD','EUR','GBP','JPY','AUD'])[floor(random()*5 + 1)::int] AS currency, -- 10% NULL, else random timestamp in last year CASE WHEN random() < 0.10 THEN NULL ELSE now() - (random() * INTERVAL '365 days') END AS discarded_at FROM generate_series(1, 1000) ) sub; create index expected_payments_idx on expected_payments using bm25 ( id, organization_id, live_mode, status, internal_account_id, amount_range, amount_reconciled, direction, currency, discarded_at ) with ( key_field = 'id', text_fields = '{"organization_id": {"fast":true}, "status": {"fast": true, "tokenizer": {"type": "keyword"}}, "direction": {"fast": true}, "currency": {"fast": true}}', boolean_fields = '{"live_mode": {"fast": true}}' ); "# } #[ignore] #[rstest] fn test_complex_aggregation_with_columnar(mut conn: PgConnection) { complex_aggregation_setup().execute(&mut conn); // Force disable regular index scans to ensure BM25 index is used "SET enable_indexscan = off;".execute(&mut conn); // Get execution plan for the complex query let (plan,) = r#" EXPLAIN (ANALYZE, FORMAT JSON) SELECT COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else -(upper(expected_payments.amount_range) - 1) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_min_range, COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else -lower(expected_payments.amount_range) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_max_range, COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_min_range, COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_max_range, COALESCE(SUM(case when expected_payments.direction = 'debit' then -(upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_min_range, COALESCE(SUM(case when expected_payments.direction = 'debit' then -lower(expected_payments.amount_range) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_max_range, COUNT(case when expected_payments.direction = 'credit' then 1 else null end) as credit_count, COUNT(case when expected_payments.direction = 'debit' then 1 else null end) as debit_count, COUNT(*) as total_count, COUNT(distinct expected_payments.currency) as currency_count, (ARRAY_AGG(distinct expected_payments.currency))[1] as currency FROM expected_payments WHERE expected_payments.live_mode @@@ 'true' AND expected_payments.status @@@ 'IN [unreconciled partially_reconciled]' AND expected_payments.discarded_at IS NULL LIMIT 1 "# .fetch_one::<(Value,)>(&mut conn); // Get execution methods let methods = get_all_exec_methods(&plan); println!("Complex aggregation execution methods: {methods:?}"); // Assert that a columnar execution state is used assert!( methods.iter().any(|m| m.contains("ColumnarExecState")), "Expected ColumnarExecState for complex aggregation, got: {methods:?}" ); // Actually execute the query to verify results let results = r#" SELECT COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else -(upper(expected_payments.amount_range) - 1) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_min_range, COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else -lower(expected_payments.amount_range) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_max_range, COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_min_range, COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_max_range, COALESCE(SUM(case when expected_payments.direction = 'debit' then -(upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_min_range, COALESCE(SUM(case when expected_payments.direction = 'debit' then -lower(expected_payments.amount_range) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_max_range, COUNT(case when expected_payments.direction = 'credit' then 1 else null end) as credit_count, COUNT(case when expected_payments.direction = 'debit' then 1 else null end) as debit_count, COUNT(*) as total_count, COUNT(distinct expected_payments.currency) as currency_count, (ARRAY_AGG(distinct expected_payments.currency))[1] as currency FROM expected_payments WHERE expected_payments.live_mode @@@ 'true' AND expected_payments.status @@@ 'IN [unreconciled partially_reconciled]' AND expected_payments.discarded_at IS NULL LIMIT 1 "# .fetch_result::<( BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, BigDecimal, i64, i64, i64, i64, Option, )>(&mut conn) .unwrap(); // Assert that we got results (should be at least one row) assert!(!results.is_empty(), "Expected at least one row of results"); // Get the counts from first result let (_, _, _, _, _, _, credit_count, debit_count, total_count, currency_count, currency) = &results[0]; // Verify consistency in counts assert_eq!( *total_count, credit_count + debit_count, "Total count should equal credit_count + debit_count" ); // Verify currency count is positive assert!( *currency_count > 0, "Should have at least one currency type" ); // Check that we have a currency value if currency_count > 0 if *currency_count > 0 { assert!( currency.is_some(), "Should have a currency value when currency_count > 0" ); } // Reset setting "SET enable_indexscan = on;".execute(&mut conn); } fn fast_fields_setup() -> &'static str { r#" DROP TABLE IF EXISTS mixed_ff_v2; CREATE TABLE mixed_ff_v2 ( id SERIAL PRIMARY KEY, title TEXT, category TEXT, rating INT, description TEXT, content TEXT, price NUMERIC, tags TEXT[] ); INSERT INTO mixed_ff_v2 (title, category, rating, description, content, price, tags) SELECT 'Title ' || i, 'Category ' || (i % 5), i, 'Description ' || i, 'Content ' || i, (i * 1.5)::numeric, ARRAY['tag' || (i % 3), 'tag' || (i % 5)] FROM generate_series(1, 100) i; CREATE INDEX mixed_ff_v2_idx ON mixed_ff_v2 USING bm25 ( id, title, content, price, tags, (category::pdb.literal('alias=cat_lit')), (rating::pdb.alias('rating_alias')), (description::pdb.literal), ((title || ' ' || category)::pdb.literal('alias=concat_expr')), ((rating + 1)::pdb.alias('rating_plus_one')) ) WITH (key_field = 'id'); "# } #[rstest] #[case::aliased_literal(r#"SELECT category FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)] #[case::unaliased_literal(r#"SELECT description FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)] #[case::simple_expression_id(r#"SELECT (id) FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)] #[case::aliased_integer(r#"SELECT rating FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)] #[case::output_cast( r#"SELECT rating::text FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, false )] #[case::default_tokenizer(r#"SELECT content FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, false)] #[case::expression_mismatch( r#"SELECT lower(title) FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, false )] #[case::expression_concat( r#"SELECT title || ' ' || category FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true )] #[case::expression_arithmetic( r#"SELECT rating + 1 FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true )] #[case::numeric_column(r#"SELECT price FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)] #[case::array_column(r#"SELECT tags FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, false)] fn test_fast_fields_cases( mut conn: PgConnection, #[case] query: &str, #[case] expect_fast_field: bool, ) { fast_fields_setup().execute(&mut conn); "SET enable_indexscan = off;".execute(&mut conn); "SET paradedb.enable_aggregate_custom_scan = on;".execute(&mut conn); "SET paradedb.enable_columnar_exec = on;".execute(&mut conn); "SET paradedb.columnar_exec_column_threshold = 10;".execute(&mut conn); let explain_query = format!("EXPLAIN (ANALYZE, FORMAT JSON) {}", query); let (plan,) = explain_query.fetch_one::<(Value,)>(&mut conn); let methods = get_all_exec_methods(&plan); let has_columnar = methods.iter().any(|m| m.contains("ColumnarExecState")); assert_eq!( has_columnar, expect_fast_field, "Columnar exec usage mismatch for query: '{}'. Methods: {:?}", query, methods ); // Execute query to check match count (should be non-empty) let rows = query.to_string().fetch_dynamic(&mut conn); assert!(!rows.is_empty(), "Query should return results: {}", query); } // ============================================================================ // Sorted Path Tests for ColumnarExecState // ============================================================================ // // These tests verify that ColumnarExecState correctly handles the sorted // path using SortPreservingMergeExec when the index has a sort_by configuration // AND the query includes an ORDER BY clause matching the sort_by. // // Note: The sorted path is only activated when ORDER BY matches sort_by. /// Helper to check if results are sorted in descending order fn is_sorted_desc(values: &[T]) -> bool { values.windows(2).all(|w| w[0] >= w[1]) } /// Helper to check if results are sorted in ascending order fn is_sorted_asc(values: &[T]) -> bool { values.windows(2).all(|w| w[0] <= w[1]) } /// Test ColumnarExecState with sorted scans across various configurations. #[rstest] #[case::desc_serial("DESC", "DESC", false)] #[case::asc_serial("ASC", "ASC", false)] #[case::parallel_desc("DESC", "DESC", true)] fn columnar_sorted_scan( mut conn: PgConnection, #[case] sort_by_dir: &str, #[case] order_by_dir: &str, #[case] parallel: bool, ) { if parallel && pg_major_version(&mut conn) < 17 { return; } if parallel { "SET max_parallel_workers TO 4;".execute(&mut conn); "SET max_parallel_workers_per_gather TO 4;".execute(&mut conn); "SET debug_parallel_query TO on;".execute(&mut conn); } else { "SET max_parallel_workers TO 0;".execute(&mut conn); } "SET paradedb.enable_columnar_exec TO true;".execute(&mut conn); "SET paradedb.columnar_exec_column_threshold = 10;".execute(&mut conn); let sort_by = format!( "score {} NULLS {}", sort_by_dir, if sort_by_dir == "ASC" { "FIRST" } else { "LAST" } ); let sql = format!( r#" CREATE TABLE test_mff_sorted ( id SERIAL PRIMARY KEY, name TEXT, category TEXT, score INTEGER ); CREATE INDEX test_mff_sorted_idx ON test_mff_sorted USING bm25 (id, name, category, score) WITH ( key_field = 'id', text_fields = '{{"name": {{"fast": true}}, "category": {{"fast": true, "tokenizer": {{"type": "keyword"}}}}}}', numeric_fields = '{{"score": {{"fast": true}}}}', sort_by = '{}' ); "#, sort_by ); sql.execute(&mut conn); // Insert multiple batches to create segments let num_batches = if parallel { 8 } else { 4 }; let rows_per_batch = if parallel { 40 } else { 30 }; for batch in 1..=num_batches { let sql = format!( r#" INSERT INTO test_mff_sorted (name, category, score) SELECT 'Item ' || i || ' batch{}', 'Category' || (i % 3), {} + (random() * 100)::integer FROM generate_series(1, {}) AS i; "#, batch, batch * 100, rows_per_batch ); sql.execute(&mut conn); } // Query selecting multiple fast fields with ORDER BY (triggers sorted path) let query = format!( r#" SELECT name, category, score FROM test_mff_sorted WHERE name @@@ 'Item' ORDER BY score {} NULLS {} "#, order_by_dir, if order_by_dir == "ASC" { "FIRST" } else { "LAST" } ); let explain_query = format!("EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) {}", query); let (plan,): (Value,) = explain_query.fetch_one(&mut conn); // Get execution methods let methods = get_all_exec_methods(&plan); assert!( methods.contains(&"ColumnarExecState".to_string()), "Expected ColumnarExecState, got: {:?}", methods ); // Verify that the plan indicates sorted execution let plan_str = plan.to_string(); assert!( plan_str.contains("Order By"), "Plan should contain 'Order By' to indicate sorted execution: {}", plan_str ); // Query with ORDER BY and verify results are sorted let results: Vec<(String, String, i32)> = query.fetch(&mut conn); assert_eq!( results.len(), num_batches * rows_per_batch, "Should return all results" ); let scores: Vec = results.iter().map(|(_, _, s)| *s).collect(); if order_by_dir == "DESC" { assert!(is_sorted_desc(&scores), "Results should be sorted DESC"); } else { assert!(is_sorted_asc(&scores), "Results should be sorted ASC"); } } /// Test that sorting still works when ColumnarExecState is disabled. /// /// When enable_columnar_exec is false, queries with ORDER BY should /// still produce correctly sorted results (PostgreSQL will handle sorting). #[rstest] fn columnar_disabled_still_works(mut conn: PgConnection) { "SET max_parallel_workers TO 0;".execute(&mut conn); "SET paradedb.enable_columnar_exec TO false;".execute(&mut conn); r#" CREATE TABLE test_mff_disabled ( id SERIAL PRIMARY KEY, text_col TEXT, str_col TEXT, num_col INTEGER ); CREATE INDEX test_mff_disabled_idx ON test_mff_disabled USING bm25 (id, text_col, str_col, num_col) WITH ( key_field = 'id', text_fields = '{"text_col": {"fast": true}, "str_col": {"fast": true, "tokenizer": {"type": "keyword"}}}', numeric_fields = '{"num_col": {"fast": true}}', sort_by = 'num_col DESC NULLS LAST' ); "# .execute(&mut conn); for batch in 1..=3 { let sql = format!( r#" INSERT INTO test_mff_disabled (text_col, str_col, num_col) SELECT 'Record ' || i || ' batch{}', 'Str' || (i % 3), (random() * 50)::integer FROM generate_series(1, 20) AS i; "#, batch ); sql.execute(&mut conn); } // With ORDER BY, PostgreSQL ensures sorted results even with mixed ff exec disabled let (plan,): (Value,) = r#" EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) SELECT id, num_col FROM test_mff_disabled WHERE text_col @@@ 'Record' ORDER BY num_col DESC "# .fetch_one(&mut conn); let methods = get_all_exec_methods(&plan); assert!( methods.contains(&"NormalScanExecState".to_string()), "Expected NormalScanExecState when mixed ff disabled, got: {:?}", methods ); let results: Vec<(i32, i32)> = r#" SELECT id, num_col FROM test_mff_disabled WHERE text_col @@@ 'Record' ORDER BY num_col DESC "# .fetch(&mut conn); assert_eq!(results.len(), 60, "Should return all 60 results"); let nums: Vec = results.iter().map(|(_, n)| *n).collect(); assert!( is_sorted_desc(&nums), "Results should be sorted with ORDER BY even when mixed ff exec disabled. First 20: {:?}", &nums[..20.min(nums.len())] ); }