// Copyright (c) 2023-2025 ParadeDB, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
// Tests for MixedFastFieldExecState implementation
// Includes both basic functionality tests and corner/edge cases
mod fixtures;
use bigdecimal::BigDecimal;
use fixtures::db::Query;
use fixtures::*;
use pretty_assertions::assert_eq;
use rstest::*;
use serde_json::Value;
use sqlx::PgConnection;
// Helper function to get all execution methods in the plan
fn get_all_exec_methods(plan: &Value) -> Vec {
let mut methods = Vec::new();
extract_methods(plan, &mut methods);
methods
}
// Recursive function to walk the plan tree
fn extract_methods(node: &Value, methods: &mut Vec) {
if let Some(exec_method) = node.get("Exec Method") {
if let Some(method) = exec_method.as_str() {
methods.push(method.to_string());
}
}
// Check child plans
if let Some(plans) = node.get("Plans") {
if let Some(plans_array) = plans.as_array() {
for plan in plans_array {
extract_methods(plan, methods);
}
}
}
// Start from the root if given the root plan
if let Some(root) = node.get(0) {
if let Some(plan_node) = root.get("Plan") {
extract_methods(plan_node, methods);
}
}
}
// Setup for complex aggregation with mixed fast fields
struct TestComplexAggregation;
impl TestComplexAggregation {
fn setup() -> impl Query {
r#"
DROP TABLE IF EXISTS expected_payments;
CREATE TABLE expected_payments (
id SERIAL PRIMARY KEY,
organization_id UUID NOT NULL,
live_mode BOOLEAN NOT NULL,
status TEXT NOT NULL,
internal_account_id UUID NOT NULL,
amount_range NUMRANGE NOT NULL,
amount_reconciled NUMERIC NOT NULL,
direction TEXT NOT NULL CHECK (direction IN ('credit','debit')),
currency TEXT NOT NULL,
discarded_at TIMESTAMP NULL
);
INSERT INTO expected_payments (
organization_id,
live_mode,
status,
internal_account_id,
amount_range,
amount_reconciled,
direction,
currency,
discarded_at
)
SELECT
organization_id,
live_mode,
status,
internal_account_id,
numrange(lower_val, lower_val + offset_val) AS amount_range,
amount_reconciled,
direction,
currency,
discarded_at
FROM (
SELECT
-- random UUID
(md5(random()::text))::uuid AS organization_id,
-- 50/50 live_mode
(random() < 0.5) AS live_mode,
-- status pick
(ARRAY['unreconciled','partially_reconciled'])
[floor(random()*2 + 1)::int] AS status,
-- another random UUID
(md5(random()::text))::uuid AS internal_account_id,
-- ensure lower ≤ upper by generating an offset
floor(random()*1000)::int AS lower_val,
floor(random()*100)::int + 1 AS offset_val,
-- reconciled amount between –500 and +500
(random()*1000 - 500)::numeric AS amount_reconciled,
-- direction pick
(ARRAY['credit','debit'])[floor(random()*2 + 1)::int] AS direction,
-- currency pick
(ARRAY['USD','EUR','GBP','JPY','AUD'])[floor(random()*5 + 1)::int] AS currency,
-- 10% NULL, else random timestamp in last year
CASE
WHEN random() < 0.10 THEN NULL
ELSE now() - (random() * INTERVAL '365 days')
END AS discarded_at
FROM generate_series(1, 1000)
) sub;
create index expected_payments_idx on expected_payments using bm25 (
id,
organization_id,
live_mode,
status,
internal_account_id,
amount_range,
amount_reconciled,
direction,
currency,
discarded_at
) with (
key_field = 'id',
text_fields = '{"organization_id": {"fast":true}, "status": {"fast": true, "tokenizer": {"type": "keyword"}}, "direction": {"fast": true}, "currency": {"fast": true}}',
boolean_fields = '{"live_mode": {"fast": true}}'
);
"#
}
}
#[ignore]
#[rstest]
fn test_complex_aggregation_with_mixed_fast_fields(mut conn: PgConnection) {
TestComplexAggregation::setup().execute(&mut conn);
// Force disable regular index scans to ensure BM25 index is used
"SET enable_indexscan = off;".execute(&mut conn);
// Get execution plan for the complex query
let (plan,) = r#"
EXPLAIN (ANALYZE, FORMAT JSON)
SELECT
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else -(upper(expected_payments.amount_range) - 1) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else -lower(expected_payments.amount_range) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_max_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_max_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -(upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -lower(expected_payments.amount_range) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_max_range,
COUNT(case when expected_payments.direction = 'credit' then 1 else null end) as credit_count,
COUNT(case when expected_payments.direction = 'debit' then 1 else null end) as debit_count,
COUNT(*) as total_count,
COUNT(distinct expected_payments.currency) as currency_count,
(ARRAY_AGG(distinct expected_payments.currency))[1] as currency
FROM expected_payments
WHERE expected_payments.live_mode @@@ 'true'
AND expected_payments.status @@@ 'IN [unreconciled partially_reconciled]'
AND expected_payments.discarded_at IS NULL
LIMIT 1
"#
.fetch_one::<(Value,)>(&mut conn);
// Get execution methods
let methods = get_all_exec_methods(&plan);
println!("Complex aggregation execution methods: {methods:?}");
// Assert that a fast field execution state is used
assert!(
methods.iter().any(|m| m.contains("FastFieldExecState")),
"Expected a FastFieldExecState to be used for complex aggregation, got: {methods:?}"
);
// Actually execute the query to verify results
let results = r#"
SELECT
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else -(upper(expected_payments.amount_range) - 1) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else -lower(expected_payments.amount_range) end), 0) - COALESCE(SUM(amount_reconciled), 0) total_max_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then lower(expected_payments.amount_range) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'credit' then (upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(GREATEST(amount_reconciled, 0)) credit_max_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -(upper(expected_payments.amount_range) - 1) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_min_range,
COALESCE(SUM(case when expected_payments.direction = 'debit' then -lower(expected_payments.amount_range) else 0 end), 0) - SUM(LEAST(amount_reconciled, 0)) debit_max_range,
COUNT(case when expected_payments.direction = 'credit' then 1 else null end) as credit_count,
COUNT(case when expected_payments.direction = 'debit' then 1 else null end) as debit_count,
COUNT(*) as total_count,
COUNT(distinct expected_payments.currency) as currency_count,
(ARRAY_AGG(distinct expected_payments.currency))[1] as currency
FROM expected_payments
WHERE expected_payments.live_mode @@@ 'true'
AND expected_payments.status @@@ 'IN [unreconciled partially_reconciled]'
AND expected_payments.discarded_at IS NULL
LIMIT 1
"#
.fetch_result::<(
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
BigDecimal,
i64,
i64,
i64,
i64,
Option,
)>(&mut conn)
.unwrap();
// Assert that we got results (should be at least one row)
assert!(!results.is_empty(), "Expected at least one row of results");
// Get the counts from first result
let (_, _, _, _, _, _, credit_count, debit_count, total_count, currency_count, currency) =
&results[0];
// Verify consistency in counts
assert_eq!(
*total_count,
credit_count + debit_count,
"Total count should equal credit_count + debit_count"
);
// Verify currency count is positive
assert!(
*currency_count > 0,
"Should have at least one currency type"
);
// Check that we have a currency value if currency_count > 0
if *currency_count > 0 {
assert!(
currency.is_some(),
"Should have a currency value when currency_count > 0"
);
}
// Reset setting
"SET enable_indexscan = on;".execute(&mut conn);
}