use rand::Rng; use sqlx::postgres::PgPoolOptions; use sqlx::{Pool, Postgres, Row}; use std::env; use tokio::time::{sleep, Duration}; async fn connect() -> Pool { // get url from environment variable let dburl = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let options = pgmq::util::conn_options(&dburl).expect("failed to parse url"); PgPoolOptions::new() .acquire_timeout(std::time::Duration::from_secs(10)) .max_connections(5) .connect_with(options) .await .unwrap() } #[tokio::test] async fn test_lifecycle() { let conn = connect().await; let mut rng = rand::thread_rng(); let _ = sqlx::query("DROP EXTENSION IF EXISTS pg_later") .execute(&conn) .await .expect("failed to drop extension"); let _ = sqlx::query("CREATE EXTENSION pg_later CASCADE") .execute(&conn) .await .expect("failed to create"); let _ = sqlx::query("SELECT pglater.init()") .execute(&conn) .await .expect("failed to init"); // simple select case let q0 = sqlx::query("SELECT pglater.exec('select 1')") .fetch_one(&conn) .await .expect("failed to exec") .get::(0); assert!(q0 > 0); sleep(Duration::from_secs(7)).await; let row: (serde_json::Value,) = sqlx::query_as(&format!( "SELECT pglater.fetch_results({q0})::json as results" )) .fetch_one(&conn) .await .expect(&format!("failed to fetch {q0}")); let r = row.0; assert_eq!( r.get("query").expect("no query").to_owned(), "select 1".to_string() ); assert_eq!( r.get("status").expect("no query").to_owned(), "success".to_string() ); // invalid query case let invalid_query = "invalid query"; let result = sqlx::query(&format!("SELECT pglater.exec('{}')", invalid_query)) .fetch_one(&conn) .await; assert!( result.is_err(), "Executing an invalid query should result in an error" ); // create table case let test_num = rng.gen_range(0..100000); let table_name = format!("test_table_{}", test_num); let pglater_exec = format!( "SELECT pglater.exec('create table if not exists \"{}\" (x text)')", table_name ); println!("pglater exec: {}", pglater_exec); let q1 = sqlx::query(&pglater_exec) .fetch_one(&conn) .await .expect("failed to exec create table") .get::(0); assert!(q1 > q0, "job ids should increase"); sleep(Duration::from_secs(5)).await; let row: (serde_json::Value,) = sqlx::query_as(&format!( "SELECT pglater.fetch_results({q1})::json as results" )) .fetch_one(&conn) .await .expect("failed to fetch"); let r = row.0; assert_eq!( r.get("status").expect("no query").to_owned(), "success".to_string() ); let exists_query = format!( " SELECT EXISTS ( SELECT 1 FROM pg_tables WHERE tablename = '{}' );", table_name ); println!("exists query: {}", exists_query); let row: (bool,) = sqlx::query_as(&exists_query) .fetch_one(&conn) .await .expect("failed to fetch"); assert!(row.0, "table must exist"); // bypass validation -- any statement will pass through to execution let bypass_validation_query = format!( "SELECT pglater.exec( query => 'CREATE INDEX ON mytable USING hnsw (embedding vector_l2_ops)', validate => false )", ); println!("pglater exec: {}", bypass_validation_query); let q2 = sqlx::query(&bypass_validation_query) .fetch_one(&conn) .await .expect("failed bypass check") .get::(0); assert!(q2 > q1, "job ids should increase"); }