// Copyright (c) 2023-2026 ParadeDB, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
// Tests for ColumnarExecState implementation
// Includes both basic functionality tests and corner/edge cases
mod fixtures;
use bigdecimal::BigDecimal;
use fixtures::db::Query;
use fixtures::*;
use pretty_assertions::assert_eq;
use rstest::*;
use serde_json::Value;
use sqlx::PgConnection;
// Helper function to get all execution methods in the plan
fn get_all_exec_methods(plan: &Value) -> Vec {
let mut methods = Vec::new();
extract_methods(plan, &mut methods);
methods
}
// Recursive function to walk the plan tree
fn extract_methods(node: &Value, methods: &mut Vec) {
if let Some(exec_method) = node.get("Exec Method") {
if let Some(method) = exec_method.as_str() {
methods.push(method.to_string());
}
}
// Check child plans
if let Some(plans) = node.get("Plans") {
if let Some(plans_array) = plans.as_array() {
for plan in plans_array {
extract_methods(plan, methods);
}
}
}
// Start from the root if given the root plan
if let Some(root) = node.get(0) {
if let Some(plan_node) = root.get("Plan") {
extract_methods(plan_node, methods);
}
}
}
// Setup for complex aggregation with columnar storage
fn complex_aggregation_setup() -> &'static str {
r#"
DROP TABLE IF EXISTS expected_payments;
CREATE TABLE expected_payments (
id SERIAL PRIMARY KEY,
organization_id UUID NOT NULL,
live_mode BOOLEAN NOT NULL,
status TEXT NOT NULL,
internal_account_id UUID NOT NULL,
amount_range NUMRANGE NOT NULL,
amount_reconciled NUMERIC NOT NULL,
direction TEXT NOT NULL CHECK (direction IN ('credit','debit')),
currency TEXT NOT NULL,
discarded_at TIMESTAMP NULL
);
INSERT INTO expected_payments (
organization_id,
live_mode,
status,
internal_account_id,
amount_range,
amount_reconciled,
direction,
currency,
discarded_at
)
SELECT
organization_id,
live_mode,
status,
internal_account_id,
numrange(lower_val, lower_val + offset_val) AS amount_range,
amount_reconciled,
direction,
currency,
discarded_at
FROM (
SELECT
-- random UUID
(md5(random()::text))::uuid AS organization_id,
-- 50/50 live_mode
(random() < 0.5) AS live_mode,
-- status pick
(ARRAY['unreconciled','partially_reconciled'])
[floor(random()*2 + 1)::int] AS status,
-- another random UUID
(md5(random()::text))::uuid AS internal_account_id,
-- ensure lower ≤ upper by generating an offset
floor(random()*1000)::int AS lower_val,
floor(random()*100)::int + 1 AS offset_val,
-- reconciled amount between –500 and +500
(random()*1000 - 500)::numeric AS amount_reconciled,
-- direction pick
(ARRAY['credit','debit'])[floor(random()*2 + 1)::int] AS direction,
-- currency pick
(ARRAY['USD','EUR','GBP','JPY','AUD'])[floor(random()*5 + 1)::int] AS currency,
-- 10% NULL, else random timestamp in last year
CASE
WHEN random() < 0.10 THEN NULL
ELSE now() - (random() * INTERVAL '365 days')
END AS discarded_at
FROM generate_series(1, 1000)
) sub;
create index expected_payments_idx on expected_payments using bm25 (
id,
organization_id,
live_mode,
status,
internal_account_id,
amount_range,
amount_reconciled,
direction,
currency,
discarded_at
) with (
key_field = 'id',
text_fields = '{"organization_id": {"fast":true}, "status": {"fast": true, "tokenizer": {"type": "keyword"}}, "direction": {"fast": true}, "currency": {"fast": true}}',
boolean_fields = '{"live_mode": {"fast": true}}'
);
"#
}
#[ignore]
#[rstest]
fn test_complex_aggregation_with_columnar(mut conn: PgConnection) {
complex_aggregation_setup().execute(&mut conn);
// Force disable regular index scans to ensure BM25 index is used
"SET enable_indexscan = off;".execute(&mut conn);
// Get execution plan for the complex query
let (plan,) = r#"
EXPLAIN (ANALYZE, FORMAT JSON)
SELECT
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else -(upper(expected_payments.amount_range) - 1) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else -lower(expected_payments.amount_range) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_max_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_max_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -(upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -lower(expected_payments.amount_range) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_max_range,
COUNT(case when expected_payments.direction = 'credit' then 1 else null end) as credit_count,
COUNT(case when expected_payments.direction = 'debit' then 1 else null end) as debit_count,
COUNT(*) as total_count,
COUNT(distinct expected_payments.currency) as currency_count,
(ARRAY_AGG(distinct expected_payments.currency))[1] as currency
FROM expected_payments
WHERE expected_payments.live_mode @@@ 'true'
AND expected_payments.status @@@ 'IN [unreconciled partially_reconciled]'
AND expected_payments.discarded_at IS NULL
LIMIT 1
"#
.fetch_one::<(Value,)>(&mut conn);
// Get execution methods
let methods = get_all_exec_methods(&plan);
println!("Complex aggregation execution methods: {methods:?}");
// Assert that a columnar execution state is used
assert!(
methods.iter().any(|m| m.contains("ColumnarExecState")),
"Expected ColumnarExecState for complex aggregation, got: {methods:?}"
);
// Actually execute the query to verify results
let results = r#"
SELECT
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else -(upper(expected_payments.amount_range) - 1) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else -lower(expected_payments.amount_range) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_max_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_max_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -(upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -lower(expected_payments.amount_range) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_max_range,
COUNT(case when expected_payments.direction = 'credit' then 1 else null end) as credit_count,
COUNT(case when expected_payments.direction = 'debit' then 1 else null end) as debit_count,
COUNT(*) as total_count,
COUNT(distinct expected_payments.currency) as currency_count,
(ARRAY_AGG(distinct expected_payments.currency))[1] as currency
FROM expected_payments
WHERE expected_payments.live_mode @@@ 'true'
AND expected_payments.status @@@ 'IN [unreconciled partially_reconciled]'
AND expected_payments.discarded_at IS NULL
LIMIT 1
"#
.fetch_result::<(
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
i64,
i64,
i64,
i64,
Option,
)>(&mut conn)
.unwrap();
// Assert that we got results (should be at least one row)
assert!(!results.is_empty(), "Expected at least one row of results");
// Get the counts from first result
let (_, _, _, _, _, _, credit_count, debit_count, total_count, currency_count, currency) =
&results[0];
// Verify consistency in counts
assert_eq!(
*total_count,
credit_count + debit_count,
"Total count should equal credit_count + debit_count"
);
// Verify currency count is positive
assert!(
*currency_count > 0,
"Should have at least one currency type"
);
// Check that we have a currency value if currency_count > 0
if *currency_count > 0 {
assert!(
currency.is_some(),
"Should have a currency value when currency_count > 0"
);
}
// Reset setting
"SET enable_indexscan = on;".execute(&mut conn);
}
fn fast_fields_setup() -> &'static str {
r#"
DROP TABLE IF EXISTS mixed_ff_v2;
CREATE TABLE mixed_ff_v2 (
id SERIAL PRIMARY KEY,
title TEXT,
category TEXT,
rating INT,
description TEXT,
content TEXT,
price NUMERIC,
tags TEXT[]
);
INSERT INTO mixed_ff_v2 (title, category, rating, description, content, price, tags)
SELECT
'Title ' || i,
'Category ' || (i % 5),
i,
'Description ' || i,
'Content ' || i,
(i * 1.5)::numeric,
ARRAY['tag' || (i % 3), 'tag' || (i % 5)]
FROM generate_series(1, 100) i;
CREATE INDEX mixed_ff_v2_idx ON mixed_ff_v2 USING bm25 (
id,
title,
content,
price,
tags,
(category::pdb.literal('alias=cat_lit')),
(rating::pdb.alias('rating_alias')),
(description::pdb.literal),
((title || ' ' || category)::pdb.literal('alias=concat_expr')),
((rating + 1)::pdb.alias('rating_plus_one'))
) WITH (key_field = 'id');
"#
}
#[rstest]
#[case::aliased_literal(r#"SELECT category FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)]
#[case::unaliased_literal(r#"SELECT description FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)]
#[case::simple_expression_id(r#"SELECT (id) FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)]
#[case::aliased_integer(r#"SELECT rating FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)]
#[case::output_cast(
r#"SELECT rating::text FROM mixed_ff_v2 WHERE title @@@ 'Title'"#,
false
)]
#[case::default_tokenizer(r#"SELECT content FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, false)]
#[case::expression_mismatch(
r#"SELECT lower(title) FROM mixed_ff_v2 WHERE title @@@ 'Title'"#,
false
)]
#[case::expression_concat(
r#"SELECT title || ' ' || category FROM mixed_ff_v2 WHERE title @@@ 'Title'"#,
true
)]
#[case::expression_arithmetic(
r#"SELECT rating + 1 FROM mixed_ff_v2 WHERE title @@@ 'Title'"#,
true
)]
#[case::numeric_column(r#"SELECT price FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, true)]
#[case::array_column(r#"SELECT tags FROM mixed_ff_v2 WHERE title @@@ 'Title'"#, false)]
fn test_fast_fields_cases(
mut conn: PgConnection,
#[case] query: &str,
#[case] expect_fast_field: bool,
) {
fast_fields_setup().execute(&mut conn);
"SET enable_indexscan = off;".execute(&mut conn);
"SET paradedb.enable_aggregate_custom_scan = on;".execute(&mut conn);
"SET paradedb.enable_columnar_exec = on;".execute(&mut conn);
"SET paradedb.columnar_exec_column_threshold = 10;".execute(&mut conn);
let explain_query = format!("EXPLAIN (ANALYZE, FORMAT JSON) {}", query);
let (plan,) = explain_query.fetch_one::<(Value,)>(&mut conn);
let methods = get_all_exec_methods(&plan);
let has_columnar = methods.iter().any(|m| m.contains("ColumnarExecState"));
assert_eq!(
has_columnar, expect_fast_field,
"Columnar exec usage mismatch for query: '{}'. Methods: {:?}",
query, methods
);
// Execute query to check match count (should be non-empty)
let rows = query.to_string().fetch_dynamic(&mut conn);
assert!(!rows.is_empty(), "Query should return results: {}", query);
}
// ============================================================================
// Sorted Path Tests for ColumnarExecState
// ============================================================================
//
// These tests verify that ColumnarExecState correctly handles the sorted
// path using SortPreservingMergeExec when the index has a sort_by configuration
// AND the query includes an ORDER BY clause matching the sort_by.
//
// Note: The sorted path is only activated when ORDER BY matches sort_by.
/// Helper to check if results are sorted in descending order
fn is_sorted_desc(values: &[T]) -> bool {
values.windows(2).all(|w| w[0] >= w[1])
}
/// Helper to check if results are sorted in ascending order
fn is_sorted_asc(values: &[T]) -> bool {
values.windows(2).all(|w| w[0] <= w[1])
}
/// Test ColumnarExecState with sorted scans across various configurations.
#[rstest]
#[case::desc_serial("DESC", "DESC", false)]
#[case::asc_serial("ASC", "ASC", false)]
#[case::parallel_desc("DESC", "DESC", true)]
fn columnar_sorted_scan(
mut conn: PgConnection,
#[case] sort_by_dir: &str,
#[case] order_by_dir: &str,
#[case] parallel: bool,
) {
if parallel && pg_major_version(&mut conn) < 17 {
return;
}
if parallel {
"SET max_parallel_workers TO 4;".execute(&mut conn);
"SET max_parallel_workers_per_gather TO 4;".execute(&mut conn);
"SET debug_parallel_query TO on;".execute(&mut conn);
} else {
"SET max_parallel_workers TO 0;".execute(&mut conn);
}
"SET paradedb.enable_columnar_exec TO true;".execute(&mut conn);
"SET paradedb.columnar_exec_column_threshold = 10;".execute(&mut conn);
let sort_by = format!(
"score {} NULLS {}",
sort_by_dir,
if sort_by_dir == "ASC" {
"FIRST"
} else {
"LAST"
}
);
let sql = format!(
r#"
CREATE TABLE test_mff_sorted (
id SERIAL PRIMARY KEY,
name TEXT,
category TEXT,
score INTEGER
);
CREATE INDEX test_mff_sorted_idx ON test_mff_sorted
USING bm25 (id, name, category, score)
WITH (
key_field = 'id',
text_fields = '{{"name": {{"fast": true}}, "category": {{"fast": true, "tokenizer": {{"type": "keyword"}}}}}}',
numeric_fields = '{{"score": {{"fast": true}}}}',
sort_by = '{}'
);
"#,
sort_by
);
sql.execute(&mut conn);
// Insert multiple batches to create segments
let num_batches = if parallel { 8 } else { 4 };
let rows_per_batch = if parallel { 40 } else { 30 };
for batch in 1..=num_batches {
let sql = format!(
r#"
INSERT INTO test_mff_sorted (name, category, score)
SELECT
'Item ' || i || ' batch{}',
'Category' || (i % 3),
{} + (random() * 100)::integer
FROM generate_series(1, {}) AS i;
"#,
batch,
batch * 100,
rows_per_batch
);
sql.execute(&mut conn);
}
// Query selecting multiple fast fields with ORDER BY (triggers sorted path)
let query = format!(
r#"
SELECT name, category, score FROM test_mff_sorted
WHERE name @@@ 'Item'
ORDER BY score {} NULLS {}
"#,
order_by_dir,
if order_by_dir == "ASC" {
"FIRST"
} else {
"LAST"
}
);
let explain_query = format!("EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) {}", query);
let (plan,): (Value,) = explain_query.fetch_one(&mut conn);
// Get execution methods
let methods = get_all_exec_methods(&plan);
assert!(
methods.contains(&"ColumnarExecState".to_string()),
"Expected ColumnarExecState, got: {:?}",
methods
);
// Verify that the plan indicates sorted execution
let plan_str = plan.to_string();
assert!(
plan_str.contains("Order By"),
"Plan should contain 'Order By' to indicate sorted execution: {}",
plan_str
);
// Query with ORDER BY and verify results are sorted
let results: Vec<(String, String, i32)> = query.fetch(&mut conn);
assert_eq!(
results.len(),
num_batches * rows_per_batch,
"Should return all results"
);
let scores: Vec = results.iter().map(|(_, _, s)| *s).collect();
if order_by_dir == "DESC" {
assert!(is_sorted_desc(&scores), "Results should be sorted DESC");
} else {
assert!(is_sorted_asc(&scores), "Results should be sorted ASC");
}
}
/// Test that sorting still works when ColumnarExecState is disabled.
///
/// When enable_columnar_exec is false, queries with ORDER BY should
/// still produce correctly sorted results (PostgreSQL will handle sorting).
#[rstest]
fn columnar_disabled_still_works(mut conn: PgConnection) {
"SET max_parallel_workers TO 0;".execute(&mut conn);
"SET paradedb.enable_columnar_exec TO false;".execute(&mut conn);
r#"
CREATE TABLE test_mff_disabled (
id SERIAL PRIMARY KEY,
text_col TEXT,
str_col TEXT,
num_col INTEGER
);
CREATE INDEX test_mff_disabled_idx ON test_mff_disabled
USING bm25 (id, text_col, str_col, num_col)
WITH (
key_field = 'id',
text_fields = '{"text_col": {"fast": true}, "str_col": {"fast": true, "tokenizer": {"type": "keyword"}}}',
numeric_fields = '{"num_col": {"fast": true}}',
sort_by = 'num_col DESC NULLS LAST'
);
"#
.execute(&mut conn);
for batch in 1..=3 {
let sql = format!(
r#"
INSERT INTO test_mff_disabled (text_col, str_col, num_col)
SELECT
'Record ' || i || ' batch{}',
'Str' || (i % 3),
(random() * 50)::integer
FROM generate_series(1, 20) AS i;
"#,
batch
);
sql.execute(&mut conn);
}
// With ORDER BY, PostgreSQL ensures sorted results even with mixed ff exec disabled
let (plan,): (Value,) = r#"
EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON)
SELECT id, num_col FROM test_mff_disabled
WHERE text_col @@@ 'Record'
ORDER BY num_col DESC
"#
.fetch_one(&mut conn);
let methods = get_all_exec_methods(&plan);
assert!(
methods.contains(&"NormalScanExecState".to_string()),
"Expected NormalScanExecState when mixed ff disabled, got: {:?}",
methods
);
let results: Vec<(i32, i32)> = r#"
SELECT id, num_col FROM test_mff_disabled
WHERE text_col @@@ 'Record'
ORDER BY num_col DESC
"#
.fetch(&mut conn);
assert_eq!(results.len(), 60, "Should return all 60 results");
let nums: Vec = results.iter().map(|(_, n)| *n).collect();
assert!(
is_sorted_desc(&nums),
"Results should be sorted with ORDER BY even when mixed ff exec disabled. First 20: {:?}",
&nums[..20.min(nums.len())]
);
}