// Copyright (c) 2023-2025 ParadeDB, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
mod fixtures;
use fixtures::*;
use futures::executor::block_on;
use lockfree_object_pool::MutexObjectPool;
use proptest::prelude::*;
use proptest_derive::Arbitrary;
use rstest::*;
use sqlx::PgConnection;
use std::fmt::Debug;
#[derive(Debug, Clone, Copy, Arbitrary)]
enum Message {
Match,
NoMatch,
}
impl Message {
fn as_str(&self) -> &'static str {
match self {
Message::Match => "cheese",
Message::NoMatch => "bread",
}
}
}
#[derive(Debug, Clone, Arbitrary)]
enum Action {
Insert(Message),
Update {
#[proptest(strategy = "any::()")]
index: prop::sample::Index,
new_message: Message,
},
Delete(#[proptest(strategy = "any::()")] prop::sample::Index),
Vacuum,
}
fn setup(conn: &mut PgConnection, mutable_segment_rows: usize) {
format!(r#"
CREATE EXTENSION IF NOT EXISTS pg_search;
SET log_error_verbosity TO VERBOSE;
SET paradedb.global_mutable_segment_rows TO 0;
DROP TABLE IF EXISTS test_table;
CREATE TABLE test_table (id SERIAL8 PRIMARY KEY, message TEXT);
CREATE INDEX idx_test_table ON test_table USING bm25 (id, message)
WITH (key_field = 'id', text_fields='{{"message": {{ "tokenizer": {{"type": "default"}} }} }}', mutable_segment_rows={mutable_segment_rows});
ANALYZE test_table;
"#)
.execute(conn);
}
#[rstest]
#[tokio::test]
async fn mutable_segment_correctness(database: Db) {
let pool = MutexObjectPool::::new(
move || block_on(async { database.connection().await }),
|_| {},
);
proptest!(|(
actions in proptest::collection::vec(any::(), 1..32),
mutable_segment_rows in prop_oneof![Just(0), Just(1), Just(10)],
)| {
let mut conn = pool.pull();
setup(&mut conn, mutable_segment_rows);
let mut model: Vec<(i64, Message)> = Vec::new();
for (i, action) in actions.into_iter().enumerate() {
match action {
Action::Insert(message) => {
let (id,): (i64,) = format!(
"INSERT INTO test_table (message) VALUES ('{}') RETURNING id",
message.as_str()
).fetch_one(&mut conn);
model.push((id, message));
}
Action::Update { index, new_message } => {
if model.is_empty() {
continue;
}
let idx = index.index(model.len());
let (id_to_update, _) = model[idx];
format!(
"UPDATE test_table SET message = '{}' WHERE id = {};",
new_message.as_str(),
id_to_update,
).execute(&mut conn);
model[idx].1 = new_message;
}
Action::Delete(index) => {
if model.is_empty() {
continue;
}
let idx = index.index(model.len());
let (id_to_delete, _) = model[idx];
format!(
"DELETE FROM test_table WHERE id = {};",
id_to_delete,
).execute(&mut conn);
model.remove(idx);
}
Action::Vacuum => {
"VACUUM test_table;".execute(&mut conn);
}
}
let count_query = r#"SELECT COUNT(*) FROM test_table WHERE message @@@ 'cheese';"#;
let (result_count,): (i64,) = count_query.fetch_one(&mut conn);
let expected_count = model.iter().filter(|(_, m)| matches!(m, Message::Match)).count() as i64;
prop_assert_eq!(
result_count,
expected_count,
"Mismatch after action #{}: {:?}\nModel: {:?}",
i,
action,
model
);
}
});
}