// Copyright (c) 2023-2025 ParadeDB, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
// Tests for ParadeDB's Custom Scan implementation
mod fixtures;
use fixtures::*;
use pretty_assertions::assert_eq;
use rstest::*;
use serde_json::{Number, Value};
use sqlx::PgConnection;
#[rstest]
fn corrupt_targetlist(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id, score) = "select count(*), max(paradedb.score(id)) from paradedb.bm25_search where description @@@ 'keyboard'"
.fetch_one::<(i64, f32)>(&mut conn);
assert_eq!((id, score), (2, 3.2668595));
"PREPARE prep AS select count(*), max(paradedb.score(id)) from paradedb.bm25_search where description @@@ 'keyboard'".execute(&mut conn);
for _ in 0..100 {
"EXECUTE prep".fetch_one::<(i64, f32)>(&mut conn);
assert_eq!((id, score), (2, 3.2668595));
}
}
#[rstest]
fn attribute_1_of_table_has_wrong_type(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id, ) = "SELECT id, description FROM paradedb.bm25_search WHERE description @@@ 'keyboard' OR id = 1 ORDER BY id LIMIT 1"
.fetch_one::<(i32,)>(&mut conn);
assert_eq!(id, 1);
}
#[rstest]
fn generates_custom_scan_for_or(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (plan, ) = "EXPLAIN (ANALYZE, FORMAT JSON) SELECT * FROM paradedb.bm25_search WHERE bm25_search @@@ 'description:keyboard' OR description @@@ 'shoes'".fetch_one::<(Value,)>(&mut conn);
let plan = plan
.get(0)
.unwrap()
.as_object()
.unwrap()
.get("Plan")
.unwrap()
.as_object()
.unwrap()
.get("Plans")
.unwrap()
.get(0)
.unwrap();
eprintln!("{plan:#?}");
assert_eq!(
plan.get("Custom Plan Provider"),
Some(&Value::String(String::from("ParadeDB Scan")))
);
}
#[rstest]
fn generates_custom_scan_for_and(mut conn: PgConnection) {
use serde_json::Value;
SimpleProductsTable::setup().execute(&mut conn);
"SET enable_indexscan TO off;".execute(&mut conn);
let (plan, ) = "EXPLAIN (ANALYZE, FORMAT JSON) SELECT * FROM paradedb.bm25_search WHERE bm25_search @@@ 'description:keyboard' AND description @@@ 'shoes'".fetch_one::<(Value,)>(&mut conn);
let plan = plan.pointer("/0/Plan/Plans/0").unwrap();
eprintln!("{plan:#?}");
assert_eq!(
plan.get("Custom Plan Provider"),
Some(&Value::String(String::from("ParadeDB Scan")))
);
}
#[rstest]
fn includes_segment_count(mut conn: PgConnection) {
use serde_json::Value;
SimpleProductsTable::setup().execute(&mut conn);
"SET enable_indexscan TO off;".execute(&mut conn);
let (plan, ) = "EXPLAIN (ANALYZE, FORMAT JSON) SELECT * FROM paradedb.bm25_search WHERE bm25_search @@@ 'description:keyboard' AND description @@@ 'shoes'".fetch_one::<(Value,)>(&mut conn);
let plan = plan.pointer("/0/Plan/Plans/0").unwrap();
assert!(plan.get("Segment Count").is_some());
}
#[rstest]
fn field_on_left(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id,) =
"SELECT id FROM paradedb.bm25_search WHERE description @@@ 'keyboard' ORDER BY id ASC"
.fetch_one::<(i32,)>(&mut conn);
assert_eq!(id, 1);
}
#[rstest]
fn table_on_left(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id, ) =
"SELECT id FROM paradedb.bm25_search WHERE bm25_search @@@ 'description:keyboard' ORDER BY id ASC"
.fetch_one::<(i32,)>(&mut conn);
assert_eq!(id, 1);
}
#[rstest]
fn scores_project(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id, score) =
"SELECT id, paradedb.score(id) FROM paradedb.bm25_search WHERE description @@@ 'keyboard' ORDER BY paradedb.score(id) DESC LIMIT 1"
.fetch_one::<(i32, f32)>(&mut conn);
assert_eq!(id, 2);
assert_eq!(score, 3.2668595);
}
#[rstest]
fn snippets_project(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id, snippet) =
"SELECT id, paradedb.snippet(description) FROM paradedb.bm25_search WHERE description @@@ 'keyboard' ORDER BY paradedb.score(id) DESC LIMIT 1"
.fetch_one::<(i32, String)>(&mut conn);
assert_eq!(id, 2);
assert_eq!(snippet, String::from("Plastic Keyboard"));
}
#[rstest]
fn scores_and_snippets_project(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id, score, snippet) =
"SELECT id, paradedb.score(id), paradedb.snippet(description) FROM paradedb.bm25_search WHERE description @@@ 'keyboard' ORDER BY paradedb.score(id) DESC LIMIT 1"
.fetch_one::<(i32, f32, String)>(&mut conn);
assert_eq!(id, 2);
assert_eq!(score, 3.2668595);
assert_eq!(snippet, String::from("Plastic Keyboard"));
}
#[rstest]
fn mingets(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let (id, snippet) =
"SELECT id, paradedb.snippet(description, '', '') FROM paradedb.bm25_search WHERE description @@@ 'teddy bear'"
.fetch_one::<(i32, String)>(&mut conn);
assert_eq!(id, 40);
assert_eq!(
snippet,
String::from("Plush teddy bear")
);
}
#[rstest]
fn scores_with_expressions(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let result = r#"
select id,
description,
paradedb.score(id),
rating,
paradedb.score(id) * rating /* testing this, specifically */
from paradedb.bm25_search
where metadata @@@ 'color:white'
order by 5 desc, score desc
limit 1;
"#
.fetch_one::<(i32, String, f32, i32, f64)>(&mut conn);
assert_eq!(
result,
(
25,
"Anti-aging serum".into(),
3.2455924,
4,
12.982369422912598
)
);
}
#[rstest]
fn limit_without_order_by(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
"SET enable_indexscan TO off;".execute(&mut conn);
let (plan, ) = r#"
explain (analyze, format json) select * from paradedb.bm25_search where metadata @@@ 'color:white' limit 1;
"#
.fetch_one::<(Value,)>(&mut conn);
let path = plan.pointer("/0/Plan/Plans/0").unwrap();
assert_eq!(
path.get("Node Type"),
Some(&Value::String(String::from("Custom Scan")))
);
assert_eq!(path.get("Scores"), Some(&Value::Bool(false)));
assert_eq!(
path.get(" Top N Limit"),
Some(&Value::Number(Number::from(1)))
);
}
#[rstest]
fn score_and_limit_without_order_by(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
"SET enable_indexscan TO off;".execute(&mut conn);
let (plan, ) = r#"
explain (analyze, format json) select paradedb.score(id), * from paradedb.bm25_search where metadata @@@ 'color:white' limit 1;
"#
.fetch_one::<(Value,)>(&mut conn);
let path = plan.pointer("/0/Plan/Plans/0").unwrap();
assert_eq!(
path.get("Node Type"),
Some(&Value::String(String::from("Custom Scan")))
);
assert_eq!(path.get("Scores"), Some(&Value::Bool(true)));
assert_eq!(
path.get(" Top N Limit"),
Some(&Value::Number(Number::from(1)))
);
}
#[rstest]
fn simple_join_with_scores_and_both_sides(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let result = r#"
select a.id,
a.score,
b.id,
b.score
from (select paradedb.score(id), * from paradedb.bm25_search) a
inner join (select paradedb.score(id), * from paradedb.bm25_search) b on a.id = b.id
where a.description @@@ 'bear' AND b.description @@@ 'teddy bear';"#
.fetch_one::<(i32, f32, i32, f32)>(&mut conn);
assert_eq!(result, (40, 3.3322046, 40, 6.664409));
}
#[rstest]
fn simple_join_with_scores_on_both_sides(mut conn: PgConnection) {
SimpleProductsTable::setup().execute(&mut conn);
let result = r#"
select a.id,
a.score,
b.id,
b.score
from (select paradedb.score(id), * from paradedb.bm25_search) a
inner join (select paradedb.score(id), * from paradedb.bm25_search) b on a.id = b.id
where a.description @@@ 'bear' OR b.description @@@ 'teddy bear';"#
.fetch_one::<(i32, f32, i32, f32)>(&mut conn);
assert_eq!(result, (40, 3.3322046, 40, 6.664409));
}
#[rstest]
fn add_scores_across_joins_issue1753(mut conn: PgConnection) {
r#"
CALL paradedb.create_bm25_test_table(table_name => 'mock_items', schema_name => 'public');
CREATE INDEX search_idx ON mock_items
USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time)
WITH (key_field='id');
CALL paradedb.create_bm25_test_table(
schema_name => 'public',
table_name => 'orders',
table_type => 'Orders'
);
ALTER TABLE orders
ADD CONSTRAINT foreign_key_product_id
FOREIGN KEY (product_id)
REFERENCES mock_items(id);
CREATE INDEX orders_idx ON orders
USING bm25 (order_id, customer_name)
WITH (key_field='order_id');
"#.execute(&mut conn);
// this one doesn't plan a custom scan at all, so scores come back as NaN
let result = "
SELECT o.order_id, m.description, paradedb.score(o.order_id) + paradedb.score(m.id) as score
FROM orders o JOIN mock_items m ON o.product_id = m.id
WHERE o.customer_name @@@ 'Johnson' AND m.description @@@ 'shoes'
ORDER BY order_id
LIMIT 1"
.fetch_one::<(i32, String, f32)>(&mut conn);
assert_eq!(result, (3, "Sleek running shoes".into(), 5.406531));
}
#[rstest]
fn scores_survive_joins(mut conn: PgConnection) {
r#"
CALL paradedb.create_bm25_test_table(table_name => 'a', schema_name => 'public');
CALL paradedb.create_bm25_test_table(table_name => 'b', schema_name => 'public');
CALL paradedb.create_bm25_test_table(table_name => 'c', schema_name => 'public');
CREATE INDEX idxa ON a USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time) WITH (key_field='id');
CREATE INDEX idxb ON b USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time) WITH (key_field='id');
CREATE INDEX idxc ON c USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time) WITH (key_field='id');
"#.execute(&mut conn);
// this one doesn't plan a custom scan at all, so scores come back as NaN
let result = r#"
SELECT a.description, paradedb.score(a.id)
FROM a
join b on a.id = b.id
join c on a.id = c.id
WHERE a.description @@@ 'shoes'
ORDER BY a.description;"#
.fetch_result::<(String, f32)>(&mut conn)
.expect("query failed");
assert_eq!(
result,
vec![
("Generic shoes".into(), 2.8772602),
("Sleek running shoes".into(), 2.4849067),
("White jogging shoes".into(), 2.4849067),
]
);
}
#[rustfmt::skip]
#[rstest]
fn join_issue_1776(mut conn: PgConnection) {
r#"
CALL paradedb.create_bm25_test_table(
schema_name => 'public',
table_name => 'mock_items'
);
CREATE INDEX search_idx ON mock_items
USING bm25 (id, description, category, rating, in_stock, metadata, created_at)
WITH (key_field='id');
CALL paradedb.create_bm25_test_table(
schema_name => 'public',
table_name => 'orders',
table_type => 'Orders'
);
ALTER TABLE orders
ADD CONSTRAINT foreign_key_product_id
FOREIGN KEY (product_id)
REFERENCES mock_items(id);
CREATE INDEX orders_idx ON orders
USING bm25 (order_id, customer_name)
WITH (key_field='order_id');
"#
.execute(&mut conn);
let results = r#"
SELECT o.order_id, m.description, o.customer_name, paradedb.score(o.order_id) as orders_score, paradedb.score(m.id) as items_score
FROM orders o
JOIN mock_items m ON o.product_id = m.id
WHERE o.customer_name @@@ 'Johnson' AND m.description @@@ 'shoes' OR m.description @@@ 'Smith'
ORDER BY order_id
LIMIT 5;
"#.fetch_result::<(i32, String, String, f32, f32)>(&mut conn).expect("query failed");
assert_eq!(results[0], (3, "Sleek running shoes".into(), "Alice Johnson".into(), 2.9216242, 2.4849067));
assert_eq!(results[1], (6, "White jogging shoes".into(), "Alice Johnson".into(), 2.9216242, 2.4849067));
assert_eq!(results[2], (36,"White jogging shoes".into(), "Alice Johnson".into(), 2.9216242, 2.4849067));
}
#[rustfmt::skip]
#[rstest]
fn join_issue_1826(mut conn: PgConnection) {
r#"
CALL paradedb.create_bm25_test_table(
schema_name => 'public',
table_name => 'mock_items'
);
CREATE INDEX search_idx ON mock_items
USING bm25 (id, description, category, rating, in_stock, metadata, created_at)
WITH (key_field='id');
CALL paradedb.create_bm25_test_table(
schema_name => 'public',
table_name => 'orders',
table_type => 'Orders'
);
ALTER TABLE orders
ADD CONSTRAINT foreign_key_product_id
FOREIGN KEY (product_id)
REFERENCES mock_items(id);
CREATE INDEX orders_idx ON orders
USING bm25 (order_id, customer_name)
WITH (key_field='order_id');
"#
.execute(&mut conn);
let results = r#"
SELECT o.order_id, m.description, o.customer_name, paradedb.score(o.order_id) as orders_score, paradedb.score(m.id) as items_score
FROM orders o
JOIN mock_items m ON o.product_id = m.id
WHERE o.customer_name @@@ 'Johnson' AND m.description @@@ 'shoes' OR m.description @@@ 'Smith'
ORDER BY paradedb.score(m.id) desc, m.id asc
LIMIT 1;
"#.fetch_result::<(i32, String, String, f32, f32)>(&mut conn).expect("query failed");
assert_eq!(results[0], (3, "Sleek running shoes".into(), "Alice Johnson".into(), 2.9216242, 2.4849067));
}
#[rstest]
fn leaky_file_handles(mut conn: PgConnection) {
r#"
CREATE OR REPLACE FUNCTION raise_exception(int, int) RETURNS bool LANGUAGE plpgsql AS $$
DECLARE
BEGIN
IF $1 = $2 THEN
RAISE EXCEPTION 'error! % = %', $1, $2;
END IF;
RETURN false;
END;
$$;
"#
.execute(&mut conn);
let (pid,) = "SELECT pg_backend_pid()".fetch_one::<(i32,)>(&mut conn);
SimpleProductsTable::setup().execute(&mut conn);
// this will raise an error when it hits id #12
let result = "SELECT id, paradedb.score(id), raise_exception(id, 12) FROM paradedb.bm25_search WHERE category @@@ 'electronics' ORDER BY paradedb.score(id) DESC, id LIMIT 10"
.execute_result(&mut conn);
assert!(result.is_err());
assert_eq!(
"error returned from database: error! 12 = 12",
&format!("{}", result.err().unwrap())
);
fn tantivy_files_still_open(pid: i32) -> bool {
let output = std::process::Command::new("lsof")
.arg("-p")
.arg(pid.to_string())
.output()
.expect("`lsof` command should not fail`");
let stdout = String::from_utf8_lossy(&output.stdout);
eprintln!("stdout: {stdout}");
stdout.contains("/tantivy/")
}
// see if there's still some open tantivy files
if tantivy_files_still_open(pid) {
// if there are, they're probably (hopefully!) from where we the postgres connection
// is waiting on merge threads in the background. So we'll give it 5 seconds and try again
eprintln!("sleeping for 5s and checking open files again");
std::thread::sleep(std::time::Duration::from_secs(5));
// this time asserting for real
assert!(!tantivy_files_still_open(pid));
}
}
#[rustfmt::skip]
#[rstest]
fn cte_issue_1951(mut conn: PgConnection) {
r#"
CREATE TABLE t
(
id SERIAL,
data TEXT
);
CREATE TABLE s
(
id SERIAL,
data TEXT
);
insert into t (id, data) select x, md5(x::text) || ' query' from generate_series(1, 100) x;
insert into s (id, data) select x, md5(x::text) from generate_series(1, 100) x;
create index idxt on t using bm25 (id, data) with (key_field = id);
create index idxs on s using bm25 (id, data) with (key_field = id);
"#.execute(&mut conn);
let results = r#"
with cte as (
select id, 1 as score from t
where data @@@ 'query'
limit 1)
select cte.id from s
right join cte on cte.id = s.id
order by cte.score desc;
"#.fetch_result::<(i32, )>(&mut conn).expect("query failed");
assert_eq!(results.len(), 1);
}
#[rstest]
fn without_operator_guc(mut conn: PgConnection) {
r#"
CALL paradedb.create_bm25_test_table(table_name => 'mock_items', schema_name => 'public');
CREATE INDEX search_idx ON mock_items
USING bm25 (id, description)
WITH (key_field='id');
"#
.execute(&mut conn);
"SET enable_indexscan TO OFF;".execute(&mut conn);
fn plan_uses_custom_scan(conn: &mut PgConnection, query_string: &str) -> bool {
let (plan,) = format!("EXPLAIN (FORMAT JSON) {query_string}").fetch_one::<(Value,)>(conn);
eprintln!("{plan:#?}");
format!("{plan:?}").contains("ParadeDB Scan")
}
for custom_scan_without_operator in [true, false] {
format!(
"SET paradedb.enable_custom_scan_without_operator = {custom_scan_without_operator}"
)
.execute(&mut conn);
// Confirm that a plan which doesn't use our operator is affected by the GUC.
let uses_custom_scan =
plan_uses_custom_scan(&mut conn, "SELECT * FROM mock_items WHERE id = 1");
if custom_scan_without_operator {
assert!(
uses_custom_scan,
"Should use the custom scan when the GUC is enabled."
);
} else {
assert!(
!uses_custom_scan,
"Should not the custom scan when the GUC is disabled."
);
}
// And that a plan which does use our operator is not affected by the GUC.
let uses_custom_scan =
plan_uses_custom_scan(&mut conn, "SELECT * FROM mock_items WHERE id @@@ '1'");
assert!(
uses_custom_scan,
"Should use the custom scan when our operator is used, regardless of \
the GUC value ({custom_scan_without_operator})"
);
}
}
#[rstest]
fn top_n_matches(mut conn: PgConnection) {
r#"
DROP TABLE IF EXISTS test;
CREATE TABLE test (
id SERIAL8 NOT NULL PRIMARY KEY,
message TEXT,
severity INTEGER
) WITH (autovacuum_enabled = false);
INSERT INTO test (message, severity) VALUES ('beer wine cheese a', 1);
INSERT INTO test (message, severity) VALUES ('beer wine a', 2);
INSERT INTO test (message, severity) VALUES ('beer cheese a', 3);
INSERT INTO test (message, severity) VALUES ('beer a', 4);
INSERT INTO test (message, severity) VALUES ('wine cheese a', 5);
INSERT INTO test (message, severity) VALUES ('wine a', 6);
INSERT INTO test (message, severity) VALUES ('cheese a', 7);
INSERT INTO test (message, severity) VALUES ('beer wine cheese a', 1);
INSERT INTO test (message, severity) VALUES ('beer wine a', 2);
INSERT INTO test (message, severity) VALUES ('beer cheese a', 3);
INSERT INTO test (message, severity) VALUES ('beer a', 4);
INSERT INTO test (message, severity) VALUES ('wine cheese a', 5);
INSERT INTO test (message, severity) VALUES ('wine a', 6);
INSERT INTO test (message, severity) VALUES ('cheese a', 7);
-- INSERT INTO test (message) SELECT 'space fillter ' || x FROM generate_series(1, 10000000) x;
CREATE INDEX idxtest ON test USING bm25(id, message, severity) WITH (key_field = 'id');
CREATE OR REPLACE FUNCTION assert(a bigint, b bigint) RETURNS bool STABLE STRICT LANGUAGE plpgsql AS $$
DECLARE
current_txid bigint;
BEGIN
-- Get the current transaction ID
current_txid := txid_current();
-- Check if the values are not equal
IF a <> b THEN
RAISE EXCEPTION 'Assertion failed: % <> %. Transaction ID: %', a, b, current_txid;
END IF;
RETURN true;
END;
$$;
"#.execute(&mut conn);
"UPDATE test SET severity = (floor(random() * 10) + 1)::int WHERE id < 10;".execute(&mut conn);
"UPDATE test SET severity = (floor(random() * 10) + 1)::int WHERE id < 10;".execute(&mut conn);
"UPDATE test SET severity = (floor(random() * 10) + 1)::int WHERE id < 10;".execute(&mut conn);
r#"
SET enable_indexonlyscan to OFF;
SET enable_indexscan to OFF;
SET max_parallel_workers = 0;
"#
.execute(&mut conn);
for n in 1..=100 {
let sql = format!("select assert(count(*), LEAST({n}, 8)), count(*) from (select id from test where message @@@ 'beer' order by severity limit {n}) x;");
let (b, count) = sql.fetch_one::<(bool, i64)>(&mut conn);
assert_eq!((b, count), (true, n.min(8)));
}
r#"
SET enable_indexonlyscan to OFF;
SET enable_indexscan to OFF;
SET max_parallel_workers = 32;
"#
.execute(&mut conn);
for n in 1..=100 {
let sql = format!("select assert(count(*), LEAST({n}, 8)), count(*) from (select id from test where message @@@ 'beer' order by severity limit {n}) x;");
let (b, count) = sql.fetch_one::<(bool, i64)>(&mut conn);
assert_eq!((b, count), (true, n.min(8)));
}
}
#[rstest]
fn stable_limit_and_offset(mut conn: PgConnection) {
if pg_major_version(&mut conn) < 16 {
// the `debug_parallel_query` was added in pg16, so we cannot run this test on anything
// less than pg16
return;
}
// We use multiple segments, and force multiple workers to be used.
SimpleProductsTable::setup_multi_segment().execute(&mut conn);
"SET max_parallel_workers = 8;".execute(&mut conn);
"SET debug_parallel_query TO on".execute(&mut conn);
let mut query = |offset: usize, limit: usize| -> Vec<(i32, String, f32)> {
format!(
"SELECT id, description, paradedb.score(id) FROM paradedb.bm25_search WHERE bm25_search @@@ 'category:electronics'
ORDER BY paradedb.score(id), id OFFSET {offset} LIMIT {limit}"
)
.fetch_collect(&mut conn)
};
let mut previous = Vec::new();
for limit in 1..50 {
let current = query(0, limit);
assert_eq!(
previous[0..],
current[..previous.len()],
"With limit {limit}"
);
previous = current;
}
let all_results = query(0, 50);
for (offset, expected) in all_results.into_iter().enumerate() {
let current = query(offset, 1);
assert_eq!(expected, current[0]);
}
}
#[rstest]
fn top_n_exits_at_limit(mut conn: PgConnection) {
if pg_major_version(&mut conn) < 16 {
// Before 16, Postgres would not plan an incremental sort here.
return;
}
// When there are more results than the limit will render, but there is no `Limit` node
// immediately above us in the plan (in this case, we get an `Incremental Sort` instead due to
// the tiebreaker sort, which we can't push down until #2642), Top-N should exit on its own.
r#"
CREATE TABLE exit_at_limit (id SERIAL8 NOT NULL PRIMARY KEY, message TEXT, severity INTEGER);
CREATE INDEX exit_at_limit_index ON exit_at_limit USING bm25 (id, message, severity) WITH (key_field = 'id');
INSERT INTO exit_at_limit (message, severity) VALUES ('beer wine cheese a', 1);
INSERT INTO exit_at_limit (message, severity) VALUES ('beer wine a', 2);
INSERT INTO exit_at_limit (message, severity) VALUES ('beer cheese a', 3);
INSERT INTO exit_at_limit (message, severity) VALUES ('beer a', 4);
INSERT INTO exit_at_limit (message, severity) VALUES ('wine cheese a', 5);
SET max_parallel_workers = 0;
"#.execute(&mut conn);
let (plan,) = r#"
EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON)
SELECT * FROM exit_at_limit
WHERE message @@@ 'beer'
ORDER BY severity, id LIMIT 1;
"#
.fetch_one::<(Value,)>(&mut conn);
eprintln!("{plan:#?}");
// The Incremental Sort node prevents the Limit node from applying early cutoff, so the custom
// scan node must do so itself.
assert_eq!(
plan.pointer("/0/Plan/Plans/0/Node Type"),
Some(&Value::String(String::from("Incremental Sort")))
);
assert_eq!(
plan.pointer("/0/Plan/Plans/0/Plans/0/ Queries"),
Some(&Value::Number(1.into()))
);
}
#[rstest]
fn top_n_completes_issue2511(mut conn: PgConnection) {
r#"
drop table if exists loop;
create table loop (id serial8 not null primary key, message text) with (autovacuum_enabled = false);
create index idxloop on loop using bm25 (id, message) WITH (key_field = 'id', layer_sizes = '1GB, 1GB');
insert into loop (message) select md5(x::text) from generate_series(1, 5000) x;
update loop set message = message || ' beer';
update loop set message = message || ' beer';
update loop set message = message || ' beer';
update loop set message = message || ' beer';
set max_parallel_workers = 1;
"#.execute(&mut conn);
let results = r#"
select * from loop where id @@@ paradedb.all() order by id desc limit 25 offset 0;
"#
.fetch::<(i64, String)>(&mut conn);
assert_eq!(results.len(), 25);
}
#[rstest]
fn parallel_custom_scan_with_jsonb_issue2432(mut conn: PgConnection) {
r#"
DROP TABLE IF EXISTS test;
CREATE TABLE test (
id SERIAL8 NOT NULL PRIMARY KEY,
message TEXT,
severity INTEGER
) WITH (autovacuum_enabled = false);
CREATE INDEX idxtest ON test USING bm25(id, message, severity) WITH (key_field = 'id', layer_sizes = '1GB, 1GB');
INSERT INTO test (message, severity) VALUES ('beer wine cheese a', 1);
INSERT INTO test (message, severity) VALUES ('beer wine a', 2);
INSERT INTO test (message, severity) VALUES ('beer cheese a', 3);
INSERT INTO test (message, severity) VALUES ('beer a', 4);
INSERT INTO test (message, severity) VALUES ('wine cheese a', 5);
INSERT INTO test (message, severity) VALUES ('wine a', 6);
INSERT INTO test (message, severity) VALUES ('cheese a', 7);
INSERT INTO test (message, severity) VALUES ('beer wine cheese a', 1);
INSERT INTO test (message, severity) VALUES ('beer wine a', 2);
INSERT INTO test (message, severity) VALUES ('beer cheese a', 3);
INSERT INTO test (message, severity) VALUES ('beer a', 4);
INSERT INTO test (message, severity) VALUES ('wine cheese a', 5);
INSERT INTO test (message, severity) VALUES ('wine a', 6);
INSERT INTO test (message, severity) VALUES ('cheese a', 7);
"#.execute(&mut conn);
r#"
SET enable_indexonlyscan to OFF;
SET enable_indexscan to OFF;
SET max_parallel_workers = 32;
"#
.execute(&mut conn);
let (plan, ) = r#"
explain (FORMAT json) select id
from test
where message @@@ '{"parse_with_field":{"field":"message","query_string":"beer","lenient":null,"conjunction_mode":null}}'::jsonb
order by paradedb.score(id) desc
limit 10;
"#.fetch_one::<(serde_json::Value, )>(&mut conn);
eprintln!("{plan:#?}");
let node = plan
.pointer("/0/Plan/Plans/0/Plans/0/Parallel Aware")
.unwrap();
let parallel_aware = node
.as_bool()
.expect("should have gotten the `Parallel Aware` node");
assert_eq!(parallel_aware, true);
}
#[rstest]
fn nested_loop_rescan_issue_2472(mut conn: PgConnection) {
// Setup tables and test data
r#"
-- Create extension
DROP EXTENSION IF EXISTS pg_search CASCADE;
CREATE EXTENSION IF NOT EXISTS pg_search;
-- Create tables
CREATE TABLE IF NOT EXISTS company (
id BIGINT PRIMARY KEY,
name TEXT
);
CREATE TABLE IF NOT EXISTS "user" (
id BIGINT PRIMARY KEY,
company_id BIGINT,
status TEXT
);
CREATE TABLE IF NOT EXISTS user_products (
user_id BIGINT,
product_id BIGINT,
deleted_at TIMESTAMP
);
-- Create ParadeDB BM25 index
DROP INDEX IF EXISTS company_name_search_idx;
CREATE INDEX company_name_search_idx ON company
USING bm25 (id, name)
WITH (key_field = 'id');
-- Insert test data
DELETE FROM company;
INSERT INTO company VALUES
(4, 'Testing Company'),
(5, 'Testing Org'),
(13, 'Something else'),
(15, 'Important Testing');
DELETE FROM "user";
INSERT INTO "user" VALUES
(1, 4, 'NORMAL'),
(2, 5, 'NORMAL'),
(3, 13, 'NORMAL'),
(4, 15, 'NORMAL'),
(5, 7, 'NORMAL');
DELETE FROM user_products;
INSERT INTO user_products VALUES
(1, 100, NULL),
(2, 100, NULL),
(3, 200, NULL),
(4, 100, NULL);
"#
.execute(&mut conn);
// Test in non-parallel mode first
r#"
SET max_parallel_workers = 0;
SET max_parallel_workers_per_gather = 0;
"#
.execute(&mut conn);
println!("Testing in non-parallel mode");
// Check if we're running in non-parallel mode
let (plan,) = r#"
EXPLAIN (FORMAT json)
WITH target_users AS (
SELECT u.id, u.company_id
FROM "user" u
WHERE u.status = 'NORMAL'
AND u.company_id in (5, 4, 13, 15)
),
matched_companies AS (
SELECT c.id, paradedb.score(c.id) AS company_score
FROM company c
WHERE c.id @@@ 'name:Testing'
)
SELECT
u.id,
u.company_id,
mc.id as mc_company_id
FROM target_users u
LEFT JOIN matched_companies mc ON u.company_id = mc.id;"#
.fetch_one::<(serde_json::Value,)>(&mut conn);
let node = plan.pointer("/0/Plan").unwrap();
let is_parallel = node.as_object().unwrap().contains_key("Workers Planned");
assert!(!is_parallel, "Query should not use parallel execution");
// First test in non-parallel mode
let complex_results = r#"
-- This reproduces the issue with company_id 15
WITH target_users AS (
SELECT u.id, u.company_id
FROM "user" u
WHERE u.status = 'NORMAL'
AND u.company_id in (5, 4, 13, 15)
),
matched_companies AS (
SELECT c.id, paradedb.score(c.id) AS company_score
FROM company c
WHERE c.id @@@ 'name:Testing'
),
scored_users AS (
SELECT
u.id,
u.company_id,
mc.id as mc_company_id,
COALESCE(MAX(mc.company_score), 0) AS score
FROM target_users u
LEFT JOIN matched_companies mc ON u.company_id = mc.id
LEFT JOIN user_products up ON up.user_id = u.id
GROUP BY u.id, u.company_id, mc.id
)
SELECT su.id, su.company_id, su.mc_company_id, su.score
FROM scored_users su
ORDER BY score DESC;
"#
.fetch_result::<(i64, i64, Option, f32)>(&mut conn)
.expect("complex query failed");
// Test that we get results for all users, including the problematic company_id 15
assert_eq!(complex_results.len(), 4);
let has_company_15 = complex_results
.iter()
.any(|(_, company_id, _, _)| *company_id == 15);
assert!(
has_company_15,
"Results should include user with company_id 15"
);
// The minimal query focusing on the problematic companies in non-parallel mode
let minimal_results = r#"
WITH target_users AS (
SELECT u.id, u.company_id
FROM "user" u
WHERE
u.status = 'NORMAL' AND
u.company_id in (13, 15)
),
matched_companies AS (
SELECT c.id, paradedb.score(c.id) AS company_score
FROM company c
WHERE c.id @@@ 'name:Testing'
)
SELECT
u.id,
u.company_id,
mc.id as mc_company_id,
COALESCE(mc.company_score, 0) AS score
FROM target_users u
LEFT JOIN matched_companies mc ON u.company_id = mc.id;
"#
.fetch_result::<(i64, i64, Option, f32)>(&mut conn)
.expect("minimal query failed");
// Verify both companies in non-parallel mode
assert_eq!(minimal_results.len(), 2);
let has_company_15 = minimal_results
.iter()
.any(|(_, company_id, _, _)| *company_id == 15);
assert!(
has_company_15,
"Results should include user with company_id 15"
);
println!("minimal_results: {minimal_results:?}");
let company_15_result = minimal_results
.iter()
.find(|(_, company_id, _, _)| *company_id == 15)
.unwrap();
assert!(
company_15_result.3 > 0.0,
"Company 15 should have a non-zero score"
);
// Now test in parallel mode
r#"
SET max_parallel_workers = 32;
SET max_parallel_workers_per_gather = 8;
"#
.execute(&mut conn);
println!("Testing in parallel mode");
// Check if we're running in parallel mode
let (plan,) = r#"
EXPLAIN (FORMAT json)
WITH target_users AS (
SELECT u.id, u.company_id
FROM "user" u
WHERE u.status = 'NORMAL'
AND u.company_id in (5, 4, 13, 15)
),
matched_companies AS (
SELECT c.id, paradedb.score(c.id) AS company_score
FROM company c
WHERE c.id @@@ 'name:Testing'
)
SELECT
u.id,
u.company_id,
mc.id as mc_company_id
FROM target_users u
LEFT JOIN matched_companies mc ON u.company_id = mc.id;"#
.fetch_one::<(serde_json::Value,)>(&mut conn);
// Test in parallel mode might not actually use parallelism due to small table sizes
// But the setting is enabled, which is what we're testing
let node = plan.pointer("/0/Plan").unwrap();
let parallel_enabled = node
.pointer("/Parallel Aware")
.map(|v| v.as_bool().unwrap_or(false))
.unwrap_or(false)
|| node.pointer("/Workers Planned").is_some()
|| node.as_object().unwrap().contains_key("Parallel Aware");
println!(
"Plan in parallel mode: {}",
serde_json::to_string_pretty(&plan).unwrap()
);
// Due to small data sizes, PostgreSQL might choose not to use parallelism
// even when the settings allow it, so we don't assert but print info
println!("Parallelism indicators in plan: {parallel_enabled}");
// First test in parallel mode
let parallel_complex_results = r#"
-- This reproduces the issue with company_id 15
WITH target_users AS (
SELECT u.id, u.company_id
FROM "user" u
WHERE u.status = 'NORMAL'
AND u.company_id in (5, 4, 13, 15)
),
matched_companies AS (
SELECT c.id, paradedb.score(c.id) AS company_score
FROM company c
WHERE c.id @@@ 'name:Testing'
),
scored_users AS (
SELECT
u.id,
u.company_id,
mc.id as mc_company_id,
COALESCE(MAX(mc.company_score), 0) AS score
FROM target_users u
LEFT JOIN matched_companies mc ON u.company_id = mc.id
LEFT JOIN user_products up ON up.user_id = u.id
GROUP BY u.id, u.company_id, mc.id
)
SELECT su.id, su.company_id, su.mc_company_id, su.score
FROM scored_users su
ORDER BY score DESC;
"#
.fetch_result::<(i64, i64, Option, f32)>(&mut conn)
.expect("parallel complex query failed");
// Test that we get results for all users in parallel mode
assert_eq!(parallel_complex_results.len(), 4);
let has_company_15 = parallel_complex_results
.iter()
.any(|(_, company_id, _, _)| *company_id == 15);
assert!(
has_company_15,
"Parallel results should include user with company_id 15"
);
// The minimal query focusing on the problematic companies in parallel mode
let parallel_minimal_results = r#"
WITH target_users AS (
SELECT u.id, u.company_id
FROM "user" u
WHERE
u.status = 'NORMAL' AND
u.company_id in (13, 15)
),
matched_companies AS (
SELECT c.id, paradedb.score(c.id) AS company_score
FROM company c
WHERE c.id @@@ 'name:Testing'
)
SELECT
u.id,
u.company_id,
mc.id as mc_company_id,
COALESCE(mc.company_score, 0) AS score
FROM target_users u
LEFT JOIN matched_companies mc ON u.company_id = mc.id;
"#
.fetch_result::<(i64, i64, Option, f32)>(&mut conn)
.expect("parallel minimal query failed");
// Verify both companies in parallel mode
assert_eq!(parallel_minimal_results.len(), 2);
let has_company_15 = parallel_minimal_results
.iter()
.any(|(_, company_id, _, _)| *company_id == 15);
assert!(
has_company_15,
"Parallel results should include user with company_id 15"
);
let company_15_result = parallel_minimal_results
.iter()
.find(|(_, company_id, _, _)| *company_id == 15)
.unwrap();
assert!(
company_15_result.3 > 0.0,
"Company 15 should have a non-zero score in parallel mode"
);
}
#[rstest]
fn uses_max_parallel_workers_per_gather_issue2515(mut conn: PgConnection) {
r#"
SET max_parallel_workers = 8;
SET max_parallel_workers_per_gather = 2;
CREATE TABLE t (id bigint);
INSERT INTO t (id) SELECT x FROM generate_series(1, 1000000) x;
CREATE INDEX t_idx ON t USING bm25(id) WITH (key_field='id');
"#
.execute(&mut conn);
let (plan,) =
"EXPLAIN (ANALYZE, FORMAT JSON) SELECT COUNT(*) FROM t WHERE id @@@ paradedb.all()"
.fetch_one::<(Value,)>(&mut conn);
let plan = plan.pointer("/0/Plan/Plans/0").unwrap();
eprintln!("{plan:#?}");
assert_eq!(
plan.get("Workers Planned"),
Some(&Value::Number(Number::from(2)))
);
"SET paradedb.enable_custom_scan = false".execute(&mut conn);
let (plan,) =
"EXPLAIN (ANALYZE, FORMAT JSON) SELECT COUNT(*) FROM t WHERE id @@@ paradedb.all()"
.fetch_one::<(Value,)>(&mut conn);
let plan = plan.pointer("/0/Plan/Plans/0").unwrap();
eprintln!("{plan:#?}");
assert_eq!(
plan.get("Workers Planned"),
Some(&Value::Number(Number::from(2)))
);
}
#[rstest]
fn join_with_string_fast_fields_issue_2505(mut conn: PgConnection) {
r#"
DROP TABLE IF EXISTS a;
DROP TABLE IF EXISTS b;
CREATE TABLE a (
a_id_pk TEXT,
content TEXT
) WITH (autovacuum_enabled = false);
CREATE TABLE b (
b_id_pk TEXT,
a_id_fk TEXT,
content TEXT
) WITH (autovacuum_enabled = false);
CREATE INDEX idxa ON a USING bm25 (a_id_pk, content) WITH (key_field = 'a_id_pk');
CREATE INDEX idxb ON b USING bm25 (b_id_pk, a_id_fk, content) WITH (key_field = 'b_id_pk',
text_fields = '{ "a_id_fk": { "fast": true, "tokenizer": { "type": "keyword" } } }');
INSERT INTO a (a_id_pk, content) VALUES ('this-is-a-id', 'beer');
INSERT INTO b (b_id_pk, a_id_fk, content) VALUES ('this-is-b-id', 'this-is-a-id', 'wine');
"#
.execute(&mut conn);
"VACUUM a, b; -- needed to get Visibility Map up-to-date".execute(&mut conn);
// This query previously failed with:
// "ERROR: assertion failed: natts == state.exec_tuple_which_fast_fields.len()"
let result = r#"
SELECT a.a_id_pk as my_a_id_pk, b.b_id_pk as my_b_id_pk
FROM b
JOIN a ON a.a_id_pk = b.a_id_fk
WHERE a.content @@@ 'beer' AND b.content @@@ 'wine';
"#
.fetch_result::<(String, String)>(&mut conn)
.expect("JOIN query with string fast fields should execute successfully");
assert_eq!(result.len(), 1);
assert_eq!(
result[0],
("this-is-a-id".to_string(), "this-is-b-id".to_string())
);
"DROP TABLE a; DROP TABLE b;".execute(&mut conn);
}
#[rstest]
fn custom_scan_respects_parentheses_issue2526(mut conn: PgConnection) {
r#"
CALL paradedb.create_bm25_test_table(table_name => 'mock_items', schema_name => 'public');
CREATE INDEX search_idx ON mock_items
USING bm25 (id, description, category, rating, in_stock, metadata, created_at, last_updated_date, latest_available_time)
WITH (key_field='id');
"#.execute(&mut conn);
let result: Vec<(i64,)> = "SELECT COUNT(*) from mock_items WHERE description @@@ 'shoes' AND (description @@@ 'keyboard' OR description @@@ 'hat')".fetch(&mut conn);
assert_eq!(result, vec![(0,)]);
}