-- This file was adapted from the schema generated by pgrx -- -- The intention is to add functions that were added in the extension schema -- but were not added in previous update paths. For convenience, we are adding -- all functions, except for pgmq_delete and pgmq_archive, which were dropped then -- added in previous version. -- This drop is required due to a signature change in -- https://github.com/tembo-io/pgmq/commit/5b5aa849db98106f6675596977ba7b5dae01d459 DROP FUNCTION IF EXISTS "pgmq_read"(text, integer, integer); DROP FUNCTION IF EXISTS "pgmq_pop"(text); -- pgmq_send now has 2 required and 1 optional parameter DROP FUNCTION IF EXISTS "pgmq_send"(text, jsonb); -- src/lib.rs:350 -- pgmq::pgmq_set_vt CREATE OR REPLACE FUNCTION "pgmq_set_vt"( "queue_name" TEXT, /* &str */ "msg_id" bigint, /* i64 */ "vt_offset" INT /* i32 */ ) RETURNS TABLE ( "msg_id" bigint, /* i64 */ "read_ct" INT, /* i32 */ "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "message" jsonb /* pgrx::datum::json::JsonB */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_set_vt_wrapper'; -- src/api.rs:75 -- pgmq::api::pgmq_send_batch CREATE OR REPLACE FUNCTION "pgmq_send_batch"( "queue_name" TEXT, /* &str */ "messages" jsonb[], /* alloc::vec::Vec */ "delay" INT DEFAULT 0 /* i32 */ ) RETURNS TABLE ( "msg_id" bigint /* i64 */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_send_batch_wrapper'; -- src/lib.rs:99 -- pgmq::pgmq_send CREATE OR REPLACE FUNCTION "pgmq_send"( "queue_name" TEXT, /* &str */ "message" jsonb, /* pgrx::datum::json::JsonB */ "delay" INT DEFAULT 0 /* i32 */ ) RETURNS bigint /* core::result::Result, pgmq::errors::PgmqExtError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_send_wrapper'; -- src/lib.rs:135 -- pgmq::pgmq_read_with_poll CREATE OR REPLACE FUNCTION "pgmq_read_with_poll"( "queue_name" TEXT, /* &str */ "vt" INT, /* i32 */ "limit" INT, /* i32 */ "poll_timeout_s" INT DEFAULT 5, /* i32 */ "poll_interval_ms" INT DEFAULT 250 /* i32 */ ) RETURNS TABLE ( "msg_id" bigint, /* i64 */ "read_ct" INT, /* i32 */ "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "message" jsonb /* pgrx::datum::json::JsonB */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_read_with_poll_wrapper'; -- src/lib.rs:113 -- pgmq::pgmq_read CREATE OR REPLACE FUNCTION "pgmq_read"( "queue_name" TEXT, /* &str */ "vt" INT, /* i32 */ "limit" INT /* i32 */ ) RETURNS TABLE ( "msg_id" bigint, /* i64 */ "read_ct" INT, /* i32 */ "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "message" jsonb /* pgrx::datum::json::JsonB */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_read_wrapper'; -- src/api.rs:96 -- pgmq::api::pgmq_purge_queue CREATE OR REPLACE FUNCTION "pgmq_purge_queue"( "queue_name" TEXT /* alloc::string::String */ ) RETURNS bigint /* core::result::Result */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_purge_queue_wrapper'; -- src/lib.rs:290 -- pgmq::pgmq_pop CREATE OR REPLACE FUNCTION "pgmq_pop"( "queue_name" TEXT /* &str */ ) RETURNS TABLE ( "msg_id" bigint, /* i64 */ "read_ct" INT, /* i32 */ "enqueued_at" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "vt" timestamp with time zone, /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ "message" jsonb /* pgrx::datum::json::JsonB */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_pop_wrapper'; -- src/metrics.rs:41 -- pgmq::metrics::pgmq_metrics_all CREATE OR REPLACE FUNCTION "pgmq_metrics_all"() RETURNS TABLE ( "queue_name" TEXT, /* alloc::string::String */ "queue_length" bigint, /* i64 */ "newest_msg_age_sec" INT, /* core::option::Option */ "oldest_msg_age_sec" INT, /* core::option::Option */ "total_messages" bigint, /* i64 */ "scrape_time" timestamp with time zone /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_metrics_all_wrapper'; -- src/metrics.rs:20 -- pgmq::metrics::pgmq_metrics CREATE OR REPLACE FUNCTION "pgmq_metrics"( "queue_name" TEXT /* &str */ ) RETURNS TABLE ( "queue_name" TEXT, /* alloc::string::String */ "queue_length" bigint, /* i64 */ "newest_msg_age_sec" INT, /* core::option::Option */ "oldest_msg_age_sec" INT, /* core::option::Option */ "total_messages" bigint, /* i64 */ "scrape_time" timestamp with time zone /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_metrics_wrapper'; -- src/api.rs:43 -- pgmq::api::pgmq_list_queues CREATE OR REPLACE FUNCTION "pgmq_list_queues"() RETURNS TABLE ( "queue_name" TEXT, /* alloc::string::String */ "created_at" timestamp with time zone /* pgrx::datum::time_stamp_with_timezone::TimestampWithTimeZone */ ) STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_list_queues_wrapper'; -- src/api.rs:14 -- pgmq::api::pgmq_drop_queue CREATE OR REPLACE FUNCTION "pgmq_drop_queue"( "queue_name" TEXT, /* alloc::string::String */ "partitioned" bool DEFAULT false /* bool */ ) RETURNS bool /* core::result::Result */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_drop_queue_wrapper'; -- src/lib.rs:210 -- pgmq::pgmq_delete CREATE OR REPLACE FUNCTION "pgmq_delete"( "queue_name" TEXT, /* &str */ "msg_id" bigint /* i64 */ ) RETURNS bool /* core::result::Result, pgmq::errors::PgmqExtError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_delete_wrapper'; -- src/lib.rs:57 -- pgmq::pgmq_create_partitioned CREATE OR REPLACE FUNCTION "pgmq_create_partitioned"( "queue_name" TEXT, /* &str */ "partition_interval" TEXT DEFAULT '10000', /* alloc::string::String */ "retention_interval" TEXT DEFAULT '100000' /* alloc::string::String */ ) RETURNS VOID /* core::result::Result<(), pgmq::errors::PgmqExtError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_create_partitioned_wrapper'; -- src/lib.rs:45 -- pgmq::pgmq_create_non_partitioned CREATE OR REPLACE FUNCTION "pgmq_create_non_partitioned"( "queue_name" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pgmq::errors::PgmqExtError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_create_non_partitioned_wrapper'; -- src/lib.rs:85 -- pgmq::pgmq_create CREATE OR REPLACE FUNCTION "pgmq_create"( "queue_name" TEXT /* &str */ ) RETURNS VOID /* core::result::Result<(), pgmq::errors::PgmqExtError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_create_wrapper'; -- src/lib.rs:250 -- pgmq::pgmq_archive CREATE OR REPLACE FUNCTION "pgmq_archive"( "queue_name" TEXT, /* &str */ "msg_id" bigint /* i64 */ ) RETURNS bool /* core::result::Result, pgmq::errors::PgmqExtError> */ STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'pgmq_archive_wrapper'; -- create indexes on any partitioned queues archives DO $$ DECLARE qname TEXT; BEGIN FOR qname IN (SELECT queue_name FROM public.pgmq_meta) LOOP EXECUTE format('CREATE INDEX IF NOT EXISTS %I ON pgmq_%I_archive(archived_at)', 'archived_at_idx_' || qname, qname); END LOOP; END $$ LANGUAGE plpgsql;