// Copyright (c) 2023-2025 ParadeDB, Inc. // // This file is part of ParadeDB - Postgres for Search and Analytics // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . #![allow(unused_variables, unused_imports)] mod fixtures; use std::path::PathBuf; use fixtures::*; use pretty_assertions::assert_eq; use rstest::*; use serde_json::Value; use sqlx::PgConnection; fn fmt_err(err: T) -> String { format!("unexpected error, received: {err}") } #[rstest] fn invalid_create_index(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'public')" .execute(&mut conn); match r#"CREATE INDEX index_config_index ON index_config USING bm25 (id) "# .execute_result(&mut conn) { Ok(_) => panic!("should fail with no key_field"), Err(err) => assert_eq!( err.to_string(), "error returned from database: index should have a `WITH (key_field='...')` option" ), }; } #[rstest] fn prevent_duplicate(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description) WITH (key_field='id')"# .execute(&mut conn); match r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description) WITH (key_field='id')"# .execute_result(&mut conn) { Ok(_) => panic!("should fail with relation already exists"), Err(err) => assert!( err.to_string().contains("already exists"), "{}", fmt_err(err) ), }; } #[rstest] async fn drop_column(mut conn: PgConnection) { r#" CREATE TABLE f_table ( id SERIAL PRIMARY KEY, category TEXT ); CREATE TABLE test_table ( id SERIAL PRIMARY KEY, fkey INTEGER REFERENCES f_table ON UPDATE CASCADE ON DELETE RESTRICT, fulltext TEXT ); INSERT INTO f_table (category) VALUES ('cat_a'), ('cat_b'), ('cat_c'); INSERT INTO test_table (fkey, fulltext) VALUES (1, 'abc'), (1, 'def'), (2, 'ghi'), (3, 'jkl'); "# .execute(&mut conn); r#"CREATE INDEX test_index ON test_table USING bm25 (id, fulltext) WITH (key_field='id')"# .execute(&mut conn); r#"DROP INDEX test_index CASCADE; ALTER TABLE test_table DROP COLUMN fkey; CREATE INDEX test_index ON test_table USING bm25 (id, fulltext) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('test_index')".fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("fulltext".into(), "Str".into())); assert_eq!(rows[2], ("id".into(), "I64".into())); } #[rstest] fn default_text_field(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("description".into(), "Str".into())); assert_eq!(rows[2], ("id".into(), "I64".into())); } #[rstest] fn text_field_with_options(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description) WITH (key_field='id', text_fields='{"description": {"tokenizer": {"type": "en_stem", "normalizer": "raw"}, "record": "freq", "fast": true}}'); "# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("description".into(), "Str".into())); assert_eq!(rows[2], ("id".into(), "I64".into())); } #[rstest] fn multiple_text_fields(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description, category) WITH ( key_field='id', text_fields='{"description": {"tokenizer": {"type": "en_stem", "normalizer": "raw"}, "record": "freq", "fast": true}}' ); "# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("category".into(), "Str".into())); assert_eq!(rows[1], ("ctid".into(), "U64".into())); assert_eq!(rows[2], ("description".into(), "Str".into())); assert_eq!(rows[3], ("id".into(), "I64".into())); } #[rstest] fn default_numeric_field(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, rating) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("id".into(), "I64".into())); assert_eq!(rows[2], ("rating".into(), "I64".into())); } #[rstest] fn numeric_field_with_options(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, rating) WITH (key_field='id', numeric_fields='{"rating": {"fast": true}}')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("id".into(), "I64".into())); assert_eq!(rows[2], ("rating".into(), "I64".into())); } #[rstest] fn default_boolean_field(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, in_stock) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("id".into(), "I64".into())); assert_eq!(rows[2], ("in_stock".into(), "Bool".into())); } #[rstest] fn boolean_field_with_options(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, in_stock) WITH (key_field='id', boolean_fields='{"in_stock": {"fast": false}}')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("id".into(), "I64".into())); assert_eq!(rows[2], ("in_stock".into(), "Bool".into())); } #[rstest] fn default_json_field(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, metadata) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("id".into(), "I64".into())); assert_eq!(rows[2], ("metadata".into(), "JsonObject".into())); } #[rstest] fn json_field_with_options(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, metadata) WITH ( key_field='id', json_fields='{"metadata": {"fast": true, "expand_dots": false, "tokenizer": {"type": "raw", "normalizer": "raw"}}}' )"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("ctid".into(), "U64".into())); assert_eq!(rows[1], ("id".into(), "I64".into())); assert_eq!(rows[2], ("metadata".into(), "JsonObject".into())); } #[rstest] fn default_datetime_field(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, created_at, last_updated_date) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("created_at".into(), "Date".into())); assert_eq!(rows[1], ("ctid".into(), "U64".into())); assert_eq!(rows[2], ("id".into(), "I64".into())); assert_eq!(rows[3], ("last_updated_date".into(), "Date".into())); } #[rstest] fn datetime_field_with_options(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, created_at, last_updated_date) WITH (key_field='id', datetime_fields='{"created_at": {"fast": true}, "last_updated_date": {"fast": false}}')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("created_at".into(), "Date".into())); assert_eq!(rows[1], ("ctid".into(), "U64".into())); assert_eq!(rows[2], ("id".into(), "I64".into())); assert_eq!(rows[3], ("last_updated_date".into(), "Date".into())); } #[rstest] fn multiple_fields(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description, category, rating, in_stock, metadata) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index')" .fetch(&mut conn); assert_eq!(rows[0], ("category".into(), "Str".into())); assert_eq!(rows[1], ("ctid".into(), "U64".into())); assert_eq!(rows[2], ("description".into(), "Str".into())); assert_eq!(rows[3], ("id".into(), "I64".into())); assert_eq!(rows[4], ("in_stock".into(), "Bool".into())); assert_eq!(rows[5], ("metadata".into(), "JsonObject".into())); assert_eq!(rows[6], ("rating".into(), "I64".into())); } #[rstest] fn missing_schema_index(mut conn: PgConnection) { match "SELECT paradedb.schema('paradedb.missing_bm25_index')".fetch_result::<(i64,)>(&mut conn) { Err(err) => assert!(err .to_string() .contains(r#"relation "paradedb.missing_bm25_index" does not exist"#)), _ => panic!("non-existing index should throw an error"), } } #[rstest] fn null_values(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); "INSERT INTO paradedb.index_config (description, category, rating) VALUES ('Null Item 1', NULL, NULL), ('Null Item 2', NULL, 2)" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description, category, rating, in_stock, metadata) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, Option, Option)> = " SELECT description, category, rating FROM paradedb.index_config WHERE index_config @@@ 'description:\"Null Item\"' ORDER BY id" .fetch(&mut conn); assert_eq!(rows.len(), 2); assert_eq!(rows[0], ("Null Item 1".into(), None, None)); assert_eq!(rows[1], ("Null Item 2".into(), None, Some(2))); let rows: Vec<(bool,)> = "SELECT in_stock FROM paradedb.index_config WHERE index_config @@@ 'in_stock:false'" .fetch(&mut conn); assert_eq!(rows.len(), 13); } #[rstest] fn null_key_field_build(mut conn: PgConnection) { "CREATE TABLE paradedb.index_config(id INTEGER, description TEXT)".execute(&mut conn); "INSERT INTO paradedb.index_config VALUES (NULL, 'Null Item 1'), (2, 'Null Item 2')" .execute(&mut conn); match r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description) WITH (key_field='id')"# .execute_result(&mut conn) { Ok(_) => panic!("should fail with null key_field"), Err(err) => assert_eq!( err.to_string(), "error returned from database: key_field column 'id' cannot be NULL" ), }; } #[rstest] fn null_key_field_insert(mut conn: PgConnection) { "CREATE TABLE paradedb.index_config(id INTEGER, description TEXT)".execute(&mut conn); "INSERT INTO paradedb.index_config VALUES (1, 'Null Item 1'), (2, 'Null Item 2')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description) WITH (key_field='id')"# .execute(&mut conn); match "INSERT INTO paradedb.index_config VALUES (NULL, 'Null Item 3')".execute_result(&mut conn) { Ok(_) => panic!("should fail with null key_field"), Err(err) => assert_eq!( err.to_string(), "error returned from database: key_field column 'id' cannot be NULL" ), }; } #[rstest] fn column_name_camelcase(mut conn: PgConnection) { "CREATE TABLE paradedb.index_config(\"IdName\" INTEGER, \"ColumnName\" TEXT)" .execute(&mut conn); "INSERT INTO paradedb.index_config VALUES (1, 'Plastic Keyboard'), (2, 'Bluetooth Headphones')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 ("IdName", "ColumnName") WITH (key_field='IdName')"# .execute(&mut conn); let rows: Vec<(i32, String)> = "SELECT * FROM paradedb.index_config WHERE index_config @@@ 'ColumnName:keyboard'" .fetch(&mut conn); assert_eq!(rows.len(), 1); assert_eq!(rows[0], (1, "Plastic Keyboard".into())); } #[rstest] fn multi_index_insert_in_transaction(mut conn: PgConnection) { "CREATE TABLE paradedb.index_config1(id INTEGER, description TEXT)".execute(&mut conn); "CREATE TABLE paradedb.index_config2(id INTEGER, description TEXT)".execute(&mut conn); r#"CREATE INDEX index_config1_index ON paradedb.index_config1 USING bm25 (id, description) WITH (key_field='id')"# .execute(&mut conn); r#"CREATE INDEX index_config2_index ON paradedb.index_config2 USING bm25 (id, description) WITH (key_field='id')"# .execute(&mut conn); "BEGIN".execute(&mut conn); "INSERT INTO paradedb.index_config1 VALUES (1, 'Item 1'), (2, 'Item 2')".execute(&mut conn); "INSERT INTO paradedb.index_config2 VALUES (1, 'Item 1'), (2, 'Item 2')".execute(&mut conn); "COMMIT".execute(&mut conn); let rows: Vec<(i32, String)> = "SELECT * FROM paradedb.index_config1 WHERE index_config1 @@@ 'description:item'" .fetch(&mut conn); assert_eq!(rows.len(), 2); let rows: Vec<(i32, String)> = "SELECT * FROM paradedb.index_config2 WHERE index_config2 @@@ 'description:item'" .fetch(&mut conn); assert_eq!(rows.len(), 2); } #[rstest] fn partitioned_schema(mut conn: PgConnection) { PartitionedTable::setup().execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('sales_index') ORDER BY name" .fetch(&mut conn); assert_eq!(rows[0], ("amount".into(), "F64".into())); assert_eq!(rows[1], ("ctid".into(), "U64".into())); assert_eq!(rows[2], ("description".into(), "Str".into())); assert_eq!(rows[3], ("id".into(), "I64".into())); assert_eq!(rows[4], ("sale_date".into(), "Date".into())); } #[rstest] fn partitioned_info(mut conn: PgConnection) { PartitionedTable::setup().execute(&mut conn); // Insert rows into both partitions. r#" INSERT INTO sales (sale_date, amount, description) VALUES ('2023-01-10', 150.00, 'Ergonomic metal keyboard'), ('2023-04-01', 250.00, 'Modern wall clock'); "# .execute(&mut conn); // And validate that we see at least one segment for each. let segments_per_partition: Vec<(String, i64)> = " SELECT index_name, COUNT(*) FROM paradedb.index_info('sales_index') GROUP BY index_name " .fetch(&mut conn); assert_eq!(segments_per_partition.len(), 2); for (index_name, segment_count) in segments_per_partition { assert!( segment_count > 0, "Got {segment_count} for index partition {index_name}" ); } // Just cover `index_layer_info`. let segments_per_partition: Vec<(String, String, i64)> = "SELECT relname::text, layer_size, count FROM paradedb.index_layer_info".fetch(&mut conn); assert!(!segments_per_partition.is_empty()); } #[rstest] fn partitioned_all(mut conn: PgConnection) { PartitionedTable::setup().execute(&mut conn); let schema_rows: Vec<(String, String)> = "SELECT id from sales WHERE id @@@ paradedb.all()".fetch(&mut conn); assert_eq!(schema_rows.len(), 0); r#" INSERT INTO sales (sale_date, amount, description) VALUES ('2023-01-10', 150.00, 'Ergonomic metal keyboard'), ('2023-04-01', 250.00, 'Modern wall clock'); "# .execute(&mut conn); let schema_rows: Vec<(i32,)> = "SELECT id from sales WHERE id @@@ paradedb.all()".fetch(&mut conn); assert_eq!(schema_rows.len(), 2); } #[rstest] fn partitioned_query(mut conn: PgConnection) { // Set up the partitioned table with two partitions and a BM25 index. PartitionedTable::setup().execute(&mut conn); // Insert some data. r#" INSERT INTO sales (sale_date, amount, description) VALUES ('2023-01-10', 150.00, 'Ergonomic metal keyboard'), ('2023-01-15', 200.00, 'Plastic keyboard'), ('2023-02-05', 300.00, 'Sleek running shoes'), ('2023-03-12', 175.50, 'Bluetooth speaker'), ('2023-03-25', 225.75, 'Artistic ceramic vase'); INSERT INTO sales (sale_date, amount, description) VALUES ('2023-04-01', 250.00, 'Modern wall clock'), ('2023-04-18', 180.00, 'Designer wall paintings'), ('2023-05-09', 320.00, 'Handcrafted wooden frame'); "# .execute(&mut conn); // Test: Verify data is partitioned correctly by querying each partition let rows_q1: Vec<(i32, String, String)> = r#" SELECT id, description, sale_date::text FROM sales_2023_q1 "# .fetch(&mut conn); assert_eq!(rows_q1.len(), 5, "Expected 5 rows in Q1 partition"); let rows_q2: Vec<(i32, String, String)> = r#" SELECT id, description, sale_date::text FROM sales_2023_q2 "# .fetch(&mut conn); assert_eq!(rows_q2.len(), 3, "Expected 3 rows in Q2 partition"); // Test: Search using the bm25 index against both the parent and child tables. for table in ["sales", "sales_2023_q1"] { let search_results: Vec<(i32, String)> = format!( r#" SELECT id, description FROM {table} WHERE id @@@ 'description:keyboard' "# ) .fetch(&mut conn); assert_eq!(search_results.len(), 2, "Expected 2 items with 'keyboard'"); } // Test: Retrieve items by a numeric range (amount field) and verify bm25 compatibility for (table, expected) in [("sales", 5), ("sales_2023_q1", 3)] { let amount_results: Vec<(i32, String, f32)> = format!( r#" SELECT id, description, amount FROM {table} WHERE amount @@@ '[175 TO 250]' ORDER BY amount ASC "# ) .fetch(&mut conn); assert_eq!( amount_results.len(), expected, "Expected {expected} items with amount in range 175-250" ); } } #[rstest] fn partitioned_uses_custom_scan(mut conn: PgConnection) { PartitionedTable::setup().execute(&mut conn); r#" INSERT INTO sales (sale_date, amount, description) VALUES ('2023-01-10', 150.00, 'Ergonomic metal keyboard'), ('2023-04-01', 250.00, 'Modern wall clock'); "# .execute(&mut conn); "SET max_parallel_workers TO 0;".execute(&mut conn); // Without the partition key. let (plan,) = r#" EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) SELECT count(*) FROM sales WHERE id @@@ '1'; "# .fetch_one::<(Value,)>(&mut conn); eprintln!("{plan:#?}"); let per_partition_plans = plan .pointer("/0/Plan/Plans/0/Plans") .unwrap() .as_array() .unwrap(); assert_eq!( per_partition_plans.len(), 2, "Expected 2 partitions to be scanned." ); for per_partition_plan in per_partition_plans { pretty_assertions::assert_eq!( per_partition_plan.get("Node Type"), Some(&Value::String(String::from("Custom Scan"))) ); } // With the partition key: we expect the partition to be filtered, and for // us to apply pushdown. let (plan,) = r#" EXPLAIN (ANALYZE, VERBOSE, FORMAT JSON) SELECT count(*) FROM sales WHERE description @@@ 'keyboard' and sale_date = '2023-01-10'; "# .fetch_one::<(Value,)>(&mut conn); eprintln!("{plan:#?}"); let per_partition_plans = plan.pointer("/0/Plan/Plans").unwrap().as_array().unwrap(); assert_eq!( per_partition_plans.len(), 1, "Expected 1 partition to be scanned." ); for per_partition_plan in per_partition_plans { pretty_assertions::assert_eq!( per_partition_plan.get("Node Type"), Some(&Value::String(String::from("Custom Scan"))) ); let query = per_partition_plan.get("Tantivy Query").unwrap().to_string(); assert!( query.to_string().contains("2023-01-10"), "Expected sale_date to be pushed down into query: {query:?}", ); } } #[rstest] fn custom_enum_term(mut conn: PgConnection) { r#" CREATE TYPE color AS ENUM ('red', 'green', 'blue'); CREATE TABLE paradedb.index_config(id INTEGER, description TEXT, color color); INSERT INTO paradedb.index_config VALUES (1, 'Item 1', 'red'), (2, 'Item 2', 'green'); "# .execute(&mut conn); r#" CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description, color) WITH (key_field='id'); "# .execute(&mut conn); let rows: Vec<(i32, String)> = "SELECT id, description FROM paradedb.index_config WHERE id @@@ paradedb.term('color', 'red'::color)".fetch(&mut conn); assert_eq!(rows, vec![(1, "Item 1".into())]); } #[rstest] fn custom_enum_parse(mut conn: PgConnection) { r#" CREATE TYPE color AS ENUM ('red', 'green', 'blue'); CREATE TABLE paradedb.index_config(id INTEGER, description TEXT, color color); INSERT INTO paradedb.index_config VALUES (1, 'Item 1', 'red'), (2, 'Item 2', 'green'); "# .execute(&mut conn); r#" CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, description, color) WITH (key_field='id'); "# .execute(&mut conn); let rows: Vec<(i32, String)> = "SELECT id, description FROM paradedb.index_config WHERE id @@@ paradedb.parse('color:1.0')".fetch(&mut conn); assert_eq!(rows, vec![(1, "Item 1".into())]); } #[rstest] fn long_text_key_field_issue2198(mut conn: PgConnection) { "CREATE TABLE issue2198 (id TEXT, value TEXT)".execute(&mut conn); "CREATE INDEX idxissue2198 ON issue2198 USING bm25 (id, value) WITH (key_field='id')" .execute(&mut conn); let long_string = "a".repeat(10000); format!("INSERT INTO issue2198(id) VALUES ('{long_string}')").execute(&mut conn); let (count,) = format!("SELECT count(*) FROM issue2198 WHERE id @@@ '{long_string}'") .fetch_one::<(i64,)>(&mut conn); assert_eq!(count, 1); let (count,) = format!("SELECT count(*) FROM issue2198 WHERE id @@@ paradedb.term('id', '{long_string}')") .fetch_one::<(i64,)>(&mut conn); assert_eq!(count, 1); } #[rstest] fn uuid_as_raw_issue2199(mut conn: PgConnection) { "CREATE TABLE issue2199 (id SERIAL8 NOT NULL PRIMARY KEY, value uuid);".execute(&mut conn); "CREATE INDEX idxissue2199 ON issue2199 USING bm25 (id, value) WITH (key_field='id');" .execute(&mut conn); let uuid = uuid::Uuid::new_v4(); format!("INSERT INTO issue2199(value) VALUES ('{uuid}')").execute(&mut conn); let (count,) = format!("SELECT count(*) FROM issue2199 WHERE value @@@ '{uuid}'") .fetch_one::<(i64,)>(&mut conn); assert_eq!(count, 1); let (count,) = format!("SELECT count(*) FROM issue2199 WHERE id @@@ paradedb.term('value', '{uuid}')") .fetch_one::<(i64,)>(&mut conn); assert_eq!(count, 1); } /// Common setup function for partitioned and non-partitioned table tests fn setup_table_for_order_by_limit_test(conn: &mut PgConnection, is_partitioned: bool) { // Common settings for all tests r#" SET enable_indexscan TO off; SET enable_bitmapscan TO off; SET max_parallel_workers TO 0; "# .execute(conn); if is_partitioned { // Set up a partitioned table r#" DROP TABLE IF EXISTS sales; CREATE TABLE sales ( id SERIAL, product_name TEXT, amount DECIMAL, sale_date DATE ) PARTITION BY RANGE (sale_date); CREATE TABLE sales_2023 PARTITION OF sales FOR VALUES FROM ('2023-01-01') TO ('2024-01-01'); CREATE TABLE sales_2024 PARTITION OF sales FOR VALUES FROM ('2024-01-01') TO ('2025-01-01'); INSERT INTO sales (product_name, amount, sale_date) VALUES ('Laptop', 1200.00, '2023-01-15'), ('Smartphone', 800.00, '2023-03-10'), ('Headphones', 150.00, '2023-05-20'), ('Monitor', 300.00, '2023-07-05'), ('Keyboard', 80.00, '2023-09-12'), ('Mouse', 40.00, '2023-11-25'), ('Tablet', 500.00, '2024-01-05'), ('Printer', 200.00, '2024-02-18'), ('Camera', 600.00, '2024-04-22'), ('Speaker', 120.00, '2024-06-30'); CREATE INDEX idx_sales_bm25 ON sales USING bm25 (id, product_name, amount, sale_date) WITH ( key_field = 'id', text_fields = '{"product_name": {}}', numeric_fields = '{"amount": {}}', datetime_fields = '{"sale_date": {"fast": true}}' ); "# .execute(conn); } else { // Set up two separate tables (not partitioned) r#" DROP TABLE IF EXISTS products_2023; DROP TABLE IF EXISTS products_2024; -- Create two separate tables with similar schema CREATE TABLE products_2023 ( id SERIAL, product_name TEXT, amount DECIMAL, sale_date DATE ); CREATE TABLE products_2024 ( id SERIAL, product_name TEXT, amount DECIMAL, sale_date DATE ); -- Insert similar data to both tables INSERT INTO products_2023 (product_name, amount, sale_date) VALUES ('Laptop', 1200.00, '2023-01-15'), ('Smartphone', 800.00, '2023-03-10'), ('Headphones', 150.00, '2023-05-20'), ('Monitor', 300.00, '2023-07-05'), ('Keyboard', 80.00, '2023-09-12'); INSERT INTO products_2024 (product_name, amount, sale_date) VALUES ('Mouse', 40.00, '2024-01-25'), ('Tablet', 500.00, '2024-01-05'), ('Printer', 200.00, '2024-02-18'), ('Camera', 600.00, '2024-04-22'), ('Speaker', 120.00, '2024-06-30'); -- Create BM25 indexes for both tables CREATE INDEX idx_products_2023_bm25 ON products_2023 USING bm25 (id, product_name, amount, sale_date) WITH ( key_field = 'id', text_fields = '{"product_name": {}}', numeric_fields = '{"amount": {}}', datetime_fields = '{"sale_date": {"fast": true}}' ); CREATE INDEX idx_products_2024_bm25 ON products_2024 USING bm25 (id, product_name, amount, sale_date) WITH ( key_field = 'id', text_fields = '{"product_name": {}}', numeric_fields = '{"amount": {}}', datetime_fields = '{"sale_date": {"fast": true}}' ); "# .execute(conn); } } /// Setup function for view tests fn setup_view_for_order_by_limit_test(conn: &mut PgConnection) { // First drop any existing tables or views r#" DROP VIEW IF EXISTS products_view; DROP TABLE IF EXISTS products_2023_view; DROP TABLE IF EXISTS products_2024_view; SET enable_indexscan TO off; SET enable_bitmapscan TO off; SET max_parallel_workers TO 0; -- Create two separate tables with similar schema CREATE TABLE products_2023_view ( id SERIAL, product_name TEXT, amount DECIMAL, sale_date DATE ); CREATE TABLE products_2024_view ( id SERIAL, product_name TEXT, amount DECIMAL, sale_date DATE ); -- Insert data to both tables INSERT INTO products_2023_view (product_name, amount, sale_date) VALUES ('Laptop', 1200.00, '2023-01-15'), ('Smartphone', 800.00, '2023-03-10'), ('Headphones', 150.00, '2023-05-20'), ('Monitor', 300.00, '2023-07-05'), ('Keyboard', 80.00, '2023-09-12'); INSERT INTO products_2024_view (product_name, amount, sale_date) VALUES ('Mouse', 40.00, '2024-01-25'), ('Tablet', 500.00, '2024-01-05'), ('Printer', 200.00, '2024-02-18'), ('Camera', 600.00, '2024-04-22'), ('Speaker', 120.00, '2024-06-30'); -- Create BM25 indexes for both tables CREATE INDEX idx_products_2023_view_bm25 ON products_2023_view USING bm25 (id, product_name, amount, sale_date) WITH ( key_field = 'id', text_fields = '{"product_name": {}}', numeric_fields = '{"amount": {}}', datetime_fields = '{"sale_date": {"fast": true}}' ); CREATE INDEX idx_products_2024_view_bm25 ON products_2024_view USING bm25 (id, product_name, amount, sale_date) WITH ( key_field = 'id', text_fields = '{"product_name": {}}', numeric_fields = '{"amount": {}}', datetime_fields = '{"sale_date": {"fast": true}}' ); -- Create view combining both tables CREATE VIEW products_view AS SELECT * FROM products_2023_view UNION ALL SELECT * FROM products_2024_view; "# .execute(conn); } #[rstest] fn partitioned_order_by_limit_pushdown(mut conn: PgConnection) { setup_table_for_order_by_limit_test(&mut conn, true); // Get the explain plan let explain_output = r#" EXPLAIN (ANALYZE, VERBOSE) SELECT * FROM sales WHERE product_name @@@ 'laptop OR smartphone OR headphones' ORDER BY sale_date LIMIT 5; "# .fetch::<(String,)>(&mut conn) .into_iter() .map(|(line,)| line) .collect::>() .join("\n"); // Check for TopNScanExecState in the plan assert!( explain_output.contains("TopNScanExecState"), "Expected TopNScanExecState in the execution plan" ); // Verify sort field and direction assert!( explain_output.contains("TopN Order By: sale_date asc"), "Expected sort field to be sale_date" ); // Verify the limit is pushed down assert!( explain_output.contains("TopN Limit: 5"), "Expected limit 5 to be pushed down" ); // Also test that we get the correct sorted results let results: Vec<(String, String)> = r#" SELECT product_name, sale_date::text FROM sales WHERE product_name @@@ 'laptop OR smartphone OR headphones' ORDER BY sale_date LIMIT 5; "# .fetch(&mut conn); // Verify we got the right number of results assert_eq!(results.len(), 3, "Expected 3 matching results"); // Verify they're in the correct order (ordered by sale_date) assert_eq!(results[0].0, "Laptop"); assert_eq!(results[1].0, "Smartphone"); assert_eq!(results[2].0, "Headphones"); // Check the dates are in ascending order assert_eq!(results[0].1, "2023-01-15"); assert_eq!(results[1].1, "2023-03-10"); assert_eq!(results[2].1, "2023-05-20"); } #[rstest] fn non_partitioned_no_order_by_limit_pushdown(mut conn: PgConnection) { setup_table_for_order_by_limit_test(&mut conn, false); // Get the explain plan for a UNION query with ORDER BY LIMIT let explain_output = r#" EXPLAIN (ANALYZE, VERBOSE) SELECT * FROM ( SELECT * FROM products_2023 WHERE product_name @@@ 'laptop OR smartphone OR headphones' UNION ALL SELECT * FROM products_2024 WHERE product_name @@@ 'tablet OR printer' ) combined_products ORDER BY sale_date LIMIT 5; "# .fetch::<(String,)>(&mut conn) .into_iter() .map(|(line,)| line) .collect::>() .join("\n"); // Verify NormalScanExecState is used. We can't use TopN because there the limit occurs _after_ // the union. And we can't use fast fields, because there are non-fast fields. assert!( explain_output.contains("NormalScanExecState"), "Expected NormalScanExecState in the execution plan" ); assert!( !explain_output.contains("TopNScanExecState"), "TopNScanExecState should not be present in the execution plan" ); // Even without the optimization, verify the query returns correct results let results: Vec<(String, String)> = r#" SELECT product_name, sale_date::text FROM ( SELECT * FROM products_2023 WHERE product_name @@@ 'laptop OR smartphone OR headphones' UNION ALL SELECT * FROM products_2024 WHERE product_name @@@ 'tablet OR printer' ) combined_products ORDER BY sale_date LIMIT 5; "# .fetch(&mut conn); // Verify we got the right number of results and correct order assert!(results.len() <= 5, "Expected at most 5 matching results"); // Check that the first result is the earliest date if !results.is_empty() { let mut prev_date = &results[0].1; for result in &results[1..] { assert!( &result.1 >= prev_date, "Results should be sorted by date in ascending order" ); prev_date = &result.1; } } } #[rstest] #[should_panic] // This test is broken until issue #2441 is fixed fn view_no_order_by_limit_pushdown(mut conn: PgConnection) { setup_view_for_order_by_limit_test(&mut conn); // Verify the tables and indexes were created properly let table_check: Vec<(String,)> = r#" SELECT tablename FROM pg_tables WHERE tablename IN ('products_2023_view', 'products_2024_view') ORDER BY tablename; "# .fetch(&mut conn); assert_eq!(table_check.len(), 2, "Both tables should exist"); let index_check: Vec<(String,)> = r#" SELECT indexname FROM pg_indexes WHERE indexname IN ('idx_products_2023_view_bm25', 'idx_products_2024_view_bm25') ORDER BY indexname; "# .fetch(&mut conn); assert_eq!(index_check.len(), 2, "Both indexes should exist"); // Verify the view was created let view_check: Vec<(String,)> = r#" SELECT viewname FROM pg_views WHERE viewname = 'products_view'; "# .fetch(&mut conn); assert_eq!(view_check.len(), 1, "View should exist"); // Verify direct table queries work let test_query: Vec<(String,)> = r#" SELECT product_name FROM products_2023_view WHERE product_name @@@ 'laptop' LIMIT 1; "# .fetch(&mut conn); assert_eq!(test_query.len(), 1, "Direct table query should work"); // Get the explain plan for a view query with ORDER BY LIMIT let explain_output = r#" EXPLAIN (ANALYZE, VERBOSE) SELECT * FROM products_view WHERE product_name @@@ 'laptop OR smartphone OR headphones OR tablet OR printer' ORDER BY sale_date LIMIT 5; "# .fetch::<(String,)>(&mut conn) .into_iter() .map(|(line,)| line) .collect::>() .join("\n"); // Print the explain plan for debugging println!("EXPLAIN output:\n{explain_output}"); // Verify NormalScanExecState is used (not TopNScanExecState) assert!( explain_output.contains("NormalScanExecState"), "Expected NormalScanExecState in the execution plan" ); assert!( !explain_output.contains("TopNScanExecState"), "TopNScanExecState should not be present in the execution plan" ); // Ensure the query works and returns correct results let results: Vec<(String, String)> = r#" SELECT product_name, sale_date::text FROM products_view WHERE product_name @@@ 'laptop OR smartphone OR headphones OR tablet OR printer' ORDER BY sale_date LIMIT 5; "# .fetch(&mut conn); println!("Query results: {results:?}"); // Verify we got the right number of results and correct order assert_eq!(results.len(), 5, "Expected 5 matching results"); // Check that results are sorted by date if !results.is_empty() { let mut prev_date = &results[0].1; for result in &results[1..] { assert!( &result.1 >= prev_date, "Results should be sorted by date in ascending order" ); prev_date = &result.1; } } } #[rstest] fn expression_with_options(mut conn: PgConnection) { "CALL paradedb.create_bm25_test_table(table_name => 'index_config', schema_name => 'paradedb')" .execute(&mut conn); r#"CREATE INDEX index_config_index ON paradedb.index_config USING bm25 (id, lower(description)) WITH (key_field='id')"# .execute(&mut conn); let rows: Vec<(String, String)> = "SELECT name, field_type FROM paradedb.schema('paradedb.index_config_index') ORDER BY name" .fetch(&mut conn); assert_eq!(rows[0], ("_pg_search_1".into(), "Str".into())); assert_eq!(rows[1], ("ctid".into(), "U64".into())); assert_eq!(rows[2], ("id".into(), "I64".into())); }