// Copyright (c) 2023-2025 ParadeDB, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
mod fixtures;
use chrono::NaiveDate;
use fixtures::*;
use pretty_assertions::assert_eq;
use rstest::*;
use serde_json::Value;
use sqlx::{PgConnection, Row};
fn field_sort_fixture(conn: &mut PgConnection) -> Value {
// ensure our custom scan wins against our small test table
r#"
SET enable_indexscan TO off;
CALL paradedb.create_bm25_test_table(table_name => 'bm25_search', schema_name => 'paradedb');
CREATE INDEX bm25_search_idx ON paradedb.bm25_search
USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time)
WITH (
key_field = 'id',
text_fields = '{
"description": {},
"category": {
"fast": true,
"normalizer": "lowercase"
}
}',
numeric_fields = '{
"rating": {}
}',
boolean_fields = '{
"in_stock": {}
}',
json_fields = '{
"metadata": {}
}',
datetime_fields = '{
"created_at": {},
"last_updated_date": {},
"latest_available_time": {}
}'
);
"#.execute(conn);
let (plan, ) = "EXPLAIN (ANALYZE, FORMAT JSON) SELECT * FROM paradedb.bm25_search WHERE description @@@ 'keyboard OR shoes' ORDER BY lower(category) LIMIT 5".fetch_one::<(Value,)>(conn);
eprintln!("{plan:#?}");
plan
}
#[rstest]
fn sort_by_lower(mut conn: PgConnection) {
let plan = field_sort_fixture(&mut conn);
let plan = plan
.pointer("/0/Plan/Plans/0")
.unwrap()
.as_object()
.unwrap();
assert_eq!(
plan.get(" Sort Field"),
Some(&Value::String(String::from("category")))
);
}
#[rstest]
fn sort_by_lower_parallel(mut conn: PgConnection) {
if pg_major_version(&mut conn) < 17 {
// We cannot reliably force parallel workers to be used without `debug_parallel_query`.
return;
}
"SET max_parallel_workers = 8;".execute(&mut conn);
"SET debug_parallel_query TO on".execute(&mut conn);
let plan = field_sort_fixture(&mut conn);
let plan = plan
.pointer("/0/Plan/Plans/0/Plans/0")
.unwrap()
.as_object()
.unwrap();
assert_eq!(
plan.get(" Sort Field"),
Some(&Value::String(String::from("category")))
);
}
#[rstest]
fn sort_by_raw(mut conn: PgConnection) {
// ensure our custom scan wins against our small test table
r#"
SET enable_indexscan TO off;
CALL paradedb.create_bm25_test_table(table_name => 'bm25_search', schema_name => 'paradedb');
CREATE INDEX bm25_search_idx ON paradedb.bm25_search
USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time)
WITH (
key_field = 'id',
text_fields = '{
"description": {},
"category": {
"fast": true,
"normalizer": "raw"
}
}',
numeric_fields = '{
"rating": {}
}',
boolean_fields = '{
"in_stock": {}
}',
json_fields = '{
"metadata": {}
}',
datetime_fields = '{
"created_at": {},
"last_updated_date": {},
"latest_available_time": {}
}'
);
"#.execute(&mut conn);
let (plan, ) = "EXPLAIN (ANALYZE, FORMAT JSON) SELECT * FROM paradedb.bm25_search WHERE description @@@ 'keyboard OR shoes' ORDER BY category LIMIT 5".fetch_one::<(Value,)>(&mut conn);
eprintln!("{plan:#?}");
let plan = plan
.pointer("/0/Plan/Plans/0")
.unwrap()
.as_object()
.unwrap();
assert_eq!(
plan.get(" Sort Field"),
Some(&Value::String(String::from("category")))
);
}
#[rstest]
fn sort_by_row_return_scores(mut conn: PgConnection) {
// ensure our custom scan wins against our small test table
r#"
SET enable_indexscan TO off;
CALL paradedb.create_bm25_test_table(table_name => 'bm25_search', schema_name => 'paradedb');
CREATE INDEX bm25_search_idx ON paradedb.bm25_search
USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time)
WITH (
key_field = 'id',
text_fields = '{
"description": {},
"category": {
"fast": true,
"normalizer": "raw"
}
}',
numeric_fields = '{
"rating": {}
}',
boolean_fields = '{
"in_stock": {}
}',
json_fields = '{
"metadata": {}
}',
datetime_fields = '{
"created_at": {},
"last_updated_date": {},
"latest_available_time": {}
}'
);
"#.execute(&mut conn);
let (plan, ) = "EXPLAIN (ANALYZE, FORMAT JSON) SELECT paradedb.score(id), * FROM paradedb.bm25_search WHERE description @@@ 'keyboard OR shoes' ORDER BY category LIMIT 5".fetch_one::<(Value,)>(&mut conn);
eprintln!("{plan:#?}");
// Get the first plan node in the plans array
let plan = plan
.pointer("/0/Plan/Plans/0/Plans/0")
.unwrap()
.as_object()
.unwrap();
assert_eq!(plan.get(" Sort Field"), None);
assert_eq!(plan.get("Scores"), Some(&Value::Bool(true)));
}
#[rstest]
async fn test_incremental_sort_with_partial_order(mut conn: PgConnection) {
// Create the partitioned sales table
PartitionedTable::setup().execute(&mut conn);
// Enable debugging logs
sqlx::query("SET client_min_messages TO DEBUG1;")
.execute(&mut conn)
.await
.unwrap();
// Enable additional debug options
sqlx::query("SET debug_print_plan = true;")
.execute(&mut conn)
.await
.unwrap();
sqlx::query("SET debug_pretty_print = true;")
.execute(&mut conn)
.await
.unwrap();
// Check Postgres version - Incremental Sort only exists in PG 16+
let pg_version = pg_major_version(&mut conn);
let pg_supports_incremental_sort = pg_version >= 16;
// Test BM25 with ORDER BY ... LIMIT to confirm sort optimization works
let (explain_bm25,) = sqlx::query_as::<_, (Value,)>(
"EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON)
SELECT description, sale_date, paradedb.score(id) as score FROM sales
WHERE description @@@ 'keyboard'
ORDER BY score, sale_date, amount LIMIT 10;",
)
.fetch_one(&mut conn)
.await
.unwrap();
println!("EXPLAIN OUTPUT: {explain_bm25}");
let plan_json = explain_bm25.to_string();
// Extract the Custom Scan nodes from the JSON plan for inspection
let mut custom_scan_nodes = Vec::new();
if let Ok(plan) = serde_json::from_str::(&plan_json) {
// Navigate through the plan to find Custom Scan nodes
if let Some(main_plan) = plan.pointer("/0/Plan") {
collect_custom_scan_nodes(main_plan, &mut custom_scan_nodes);
}
}
println!("Found {} Custom Scan nodes", custom_scan_nodes.len());
for (i, node) in custom_scan_nodes.iter().enumerate() {
println!("Custom Scan Node #{}: {}", i + 1, node);
}
// Additional debug query - check what happens with a simpler query
let (explain_simple,) = sqlx::query_as::<_, (String,)>(
"EXPLAIN (ANALYZE, VERBOSE)
SELECT description, sale_date, paradedb.score(id) as score FROM sales
WHERE description @@@ 'keyboard'
ORDER BY score, sale_date LIMIT 10;",
)
.fetch_one(&mut conn)
.await
.unwrap();
println!("SIMPLE QUERY EXPLAIN OUTPUT: {explain_simple}");
// Instead of checking for specific node types, check that:
// 1. A Sort node exists to handle the sorting (either regular Sort or Incremental Sort)
// 2. Custom Scan nodes exist that support our search
// 3. Scores are enabled in the Custom Scan
// Check that we have a Sort node somewhere in the plan
let has_sort_node = if pg_supports_incremental_sort {
plan_json.contains("\"Node Type\":\"Incremental Sort\"")
|| explain_simple.contains("Incremental Sort")
} else {
plan_json.contains("\"Node Type\":\"Sort\"")
|| plan_json.contains("\"Node Type\":\"Incremental Sort\"")
|| explain_simple.contains("Sort")
|| explain_simple.contains("Incremental Sort")
};
assert!(
has_sort_node,
"Plan should include an Incremental Sort node to handle ORDER BY"
);
// Check that we have Custom Scan nodes that handle our search
let has_custom_scan = plan_json.contains("\"Node Type\":\"Custom Scan\"")
|| explain_simple.contains("Custom Scan");
assert!(
has_custom_scan,
"Plan should include Custom Scan nodes to perform our search"
);
// Check that the score is requested
let has_scores_enabled = !custom_scan_nodes.is_empty()
&& custom_scan_nodes.iter().any(|node| {
node.get("Scores")
.is_some_and(|v| v.as_bool() == Some(true))
});
assert!(
has_scores_enabled,
"At least one Custom Scan node should have Scores enabled"
);
// Verify we get results and they're in the correct order
let results = sqlx::query(
"SELECT description, sale_date, paradedb.score(id) as score FROM sales
WHERE description @@@ 'keyboard'
ORDER BY score, sale_date, amount LIMIT 10;",
)
.fetch_all(&mut conn)
.await
.unwrap();
// Results might be empty since 'keyboard' is a specific term
// but if we get results, they should be properly sorted
if !results.is_empty() {
// Verify sort order - dates should be ascending
let mut prev_date = None;
for row in &results {
let date: NaiveDate = row.get("sale_date");
if let Some(prev) = prev_date {
assert!(date >= prev, "Results should be sorted by date");
}
prev_date = Some(date);
}
}
}
// Helper function to recursively collect Custom Scan nodes from a plan
fn collect_custom_scan_nodes(plan: &Value, nodes: &mut Vec) {
// Check if this is a Custom Scan node
if let Some(node_type) = plan.get("Node Type").and_then(|v| v.as_str()) {
if node_type == "Custom Scan" {
nodes.push(plan.clone());
}
}
// Recursively check child plans
if let Some(plans) = plan.get("Plans").and_then(|p| p.as_array()) {
for child_plan in plans {
collect_custom_scan_nodes(child_plan, nodes);
}
}
}
#[rstest]
fn sort_partitioned_early_cutoff(mut conn: PgConnection) {
PartitionedTable::setup().execute(&mut conn);
// Insert matching rows into both partitions.
r#"
INSERT INTO sales (sale_date, amount, description) VALUES
('2023-01-10', 150.00, 'Ergonomic metal keyboard'),
('2023-04-01', 250.00, 'Cheap plastic keyboard');
"#
.execute(&mut conn);
"SET max_parallel_workers TO 0;".execute(&mut conn);
// With ORDER BY the partition key: we expect the partitions to be visited sequentially, and
// for cutoff to occur.
let (plan,): (Value,) = r#"
EXPLAIN (ANALYZE, FORMAT JSON)
SELECT description, sale_date
FROM sales
WHERE description @@@ 'keyboard'
ORDER BY sale_date
LIMIT 1;
"#
.fetch_one(&mut conn);
eprintln!("{plan:#?}");
// We expect both partitions to be in the plan, but for only the first one to have been
// executed, because the Append node was able to get enough results from the first partition.
let plans = plan
.pointer("/0/Plan/Plans/0/Plans")
.unwrap()
.as_array()
.unwrap();
assert_eq!(
plans[0].get("Actual Loops").unwrap(),
&serde_json::from_str::("1").unwrap()
);
assert_eq!(
plans[1].get("Actual Loops").unwrap(),
&serde_json::from_str::("0").unwrap()
);
}