------------------------------------------------------------ -- Schema, tables, records, privileges, indexes, etc ------------------------------------------------------------ -- We don't need to create the `pgmq` schema because it is automatically -- created by postgres due to being declared in extension control file -- Table where queues and metadata about them is stored CREATE TABLE pgmq.meta ( queue_name VARCHAR UNIQUE NOT NULL, is_partitioned BOOLEAN NOT NULL, is_unlogged BOOLEAN NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL ); -- Grant permission to pg_monitor to all tables and sequences GRANT USAGE ON SCHEMA pgmq TO pg_monitor; GRANT SELECT ON ALL TABLES IN SCHEMA pgmq TO pg_monitor; GRANT SELECT ON ALL SEQUENCES IN SCHEMA pgmq TO pg_monitor; ALTER DEFAULT PRIVILEGES IN SCHEMA pgmq GRANT SELECT ON TABLES TO pg_monitor; ALTER DEFAULT PRIVILEGES IN SCHEMA pgmq GRANT SELECT ON SEQUENCES TO pg_monitor; -- This type has the shape of a message in a queue, and is often returned by -- pgmq functions that return messages CREATE TYPE pgmq.message_record AS ( msg_id BIGINT, read_ct INTEGER, enqueued_at TIMESTAMP WITH TIME ZONE, vt TIMESTAMP WITH TIME ZONE, message JSONB, headers JSONB ); CREATE TYPE pgmq.queue_record AS ( queue_name VARCHAR, is_partitioned BOOLEAN, is_unlogged BOOLEAN, created_at TIMESTAMP WITH TIME ZONE ); ------------------------------------------------------------ -- Functions ------------------------------------------------------------ -- a helper to format table names and check for invalid characters CREATE FUNCTION pgmq.format_table_name(queue_name text, prefix text) RETURNS TEXT AS $$ BEGIN IF queue_name ~ '\$|;|--|''' THEN RAISE EXCEPTION 'queue name contains invalid characters: $, ;, --, or \'''; END IF; RETURN lower(prefix || '_' || queue_name); END; $$ LANGUAGE plpgsql; -- read -- reads a number of messages from a queue, setting a visibility timeout on them CREATE FUNCTION pgmq.read( queue_name TEXT, vt INTEGER, qty INTEGER, conditional JSONB DEFAULT '{}' ) RETURNS SETOF pgmq.message_record AS $$ DECLARE sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.%I WHERE vt <= clock_timestamp() AND CASE WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer ELSE 1 END = 1 ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED ) UPDATE pgmq.%I m SET vt = clock_timestamp() + %L, read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers; $QUERY$, qtable, conditional, qtable, make_interval(secs => vt) ); RETURN QUERY EXECUTE sql USING qty; END; $$ LANGUAGE plpgsql; ---- read_with_poll ---- reads a number of messages from a queue, setting a visibility timeout on them CREATE FUNCTION pgmq.read_with_poll( queue_name TEXT, vt INTEGER, qty INTEGER, max_poll_seconds INTEGER DEFAULT 5, poll_interval_ms INTEGER DEFAULT 100, conditional JSONB DEFAULT '{}' ) RETURNS SETOF pgmq.message_record AS $$ DECLARE r pgmq.message_record; stop_at TIMESTAMP; sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds); LOOP IF (SELECT clock_timestamp() >= stop_at) THEN RETURN; END IF; sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.%I WHERE vt <= clock_timestamp() AND CASE WHEN %L != '{}'::jsonb THEN (message @> %2$L)::integer ELSE 1 END = 1 ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED ) UPDATE pgmq.%I m SET vt = clock_timestamp() + %L, read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers; $QUERY$, qtable, conditional, qtable, make_interval(secs => vt) ); FOR r IN EXECUTE sql USING qty LOOP RETURN NEXT r; END LOOP; IF FOUND THEN RETURN; ELSE PERFORM pg_sleep(poll_interval_ms::numeric / 1000); END IF; END LOOP; END; $$ LANGUAGE plpgsql; ---- archive ---- removes a message from the queue, and sends it to the archive, where its ---- saved permanently. CREATE FUNCTION pgmq.archive( queue_name TEXT, msg_id BIGINT ) RETURNS BOOLEAN AS $$ DECLARE sql TEXT; result BIGINT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); atable TEXT := pgmq.format_table_name(queue_name, 'a'); BEGIN sql := FORMAT( $QUERY$ WITH archived AS ( DELETE FROM pgmq.%I WHERE msg_id = $1 RETURNING msg_id, vt, read_ct, enqueued_at, message, headers ) INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) SELECT msg_id, vt, read_ct, enqueued_at, message, headers FROM archived RETURNING msg_id; $QUERY$, qtable, atable ); EXECUTE sql USING msg_id INTO result; RETURN NOT (result IS NULL); END; $$ LANGUAGE plpgsql; ---- archive ---- removes an array of message ids from the queue, and sends it to the archive, ---- where these messages will be saved permanently. CREATE FUNCTION pgmq.archive( queue_name TEXT, msg_ids BIGINT[] ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); atable TEXT := pgmq.format_table_name(queue_name, 'a'); BEGIN sql := FORMAT( $QUERY$ WITH archived AS ( DELETE FROM pgmq.%I WHERE msg_id = ANY($1) RETURNING msg_id, vt, read_ct, enqueued_at, message, headers ) INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) SELECT msg_id, vt, read_ct, enqueued_at, message, headers FROM archived RETURNING msg_id; $QUERY$, qtable, atable ); RETURN QUERY EXECUTE sql USING msg_ids; END; $$ LANGUAGE plpgsql; ---- delete ---- deletes a message id from the queue permanently CREATE FUNCTION pgmq.delete( queue_name TEXT, msg_id BIGINT ) RETURNS BOOLEAN AS $$ DECLARE sql TEXT; result BIGINT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ DELETE FROM pgmq.%I WHERE msg_id = $1 RETURNING msg_id $QUERY$, qtable ); EXECUTE sql USING msg_id INTO result; RETURN NOT (result IS NULL); END; $$ LANGUAGE plpgsql; ---- delete ---- deletes an array of message ids from the queue permanently CREATE FUNCTION pgmq.delete( queue_name TEXT, msg_ids BIGINT[] ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ DELETE FROM pgmq.%I WHERE msg_id = ANY($1) RETURNING msg_id $QUERY$, qtable ); RETURN QUERY EXECUTE sql USING msg_ids; END; $$ LANGUAGE plpgsql; -- send: 2 args, no delay or headers CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp()); $$ LANGUAGE sql; -- send: 3 args with headers CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, headers JSONB ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp()); $$ LANGUAGE sql; -- send: 3 args with integer delay CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, delay INTEGER ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; -- send: 3 args with timestamp CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send(queue_name, msg, NULL, delay); $$ LANGUAGE sql; -- send: 4 args with integer delay CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, headers JSONB, delay INTEGER ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; -- send: actual implementation CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, headers JSONB, delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ INSERT INTO pgmq.%I (vt, message, headers) VALUES ($2, $1, $3) RETURNING msg_id; $QUERY$, qtable ); RETURN QUERY EXECUTE sql USING msg, delay, headers; END; $$ LANGUAGE plpgsql; -- send batch: 2 args CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[] ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp()); $$ LANGUAGE sql; -- send batch: 3 args with headers CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], headers JSONB[] ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp()); $$ LANGUAGE sql; -- send batch: 3 args with integer delay CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], delay INTEGER ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; -- send batch: 3 args with timestamp CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay); $$ LANGUAGE sql; -- send_batch: 4 args with integer delay CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], headers JSONB[], delay INTEGER ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; -- send_batch: actual implementation CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], headers JSONB[], delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ INSERT INTO pgmq.%I (vt, message, headers) SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[])) RETURNING msg_id; $QUERY$, qtable ); RETURN QUERY EXECUTE sql USING msgs, delay, headers; END; $$ LANGUAGE plpgsql; -- returned by pgmq.metrics() and pgmq.metrics_all CREATE TYPE pgmq.metrics_result AS ( queue_name text, queue_length bigint, newest_msg_age_sec int, oldest_msg_age_sec int, total_messages bigint, scrape_time timestamp with time zone, queue_visible_length bigint ); -- get metrics for a single queue CREATE FUNCTION pgmq.metrics(queue_name TEXT) RETURNS pgmq.metrics_result AS $$ DECLARE result_row pgmq.metrics_result; query TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN query := FORMAT( $QUERY$ WITH q_summary AS ( SELECT count(*) as queue_length, count(CASE WHEN vt <= NOW() THEN 1 END) as queue_visible_length, EXTRACT(epoch FROM (NOW() - max(enqueued_at)))::int as newest_msg_age_sec, EXTRACT(epoch FROM (NOW() - min(enqueued_at)))::int as oldest_msg_age_sec, NOW() as scrape_time FROM pgmq.%I ), all_metrics AS ( SELECT CASE WHEN is_called THEN last_value ELSE 0 END as total_messages FROM pgmq.%I ) SELECT %L as queue_name, q_summary.queue_length, q_summary.newest_msg_age_sec, q_summary.oldest_msg_age_sec, all_metrics.total_messages, q_summary.scrape_time, q_summary.queue_visible_length FROM q_summary, all_metrics $QUERY$, qtable, qtable || '_msg_id_seq', queue_name ); EXECUTE query INTO result_row; RETURN result_row; END; $$ LANGUAGE plpgsql; -- get metrics for all queues CREATE FUNCTION pgmq."metrics_all"() RETURNS SETOF pgmq.metrics_result AS $$ DECLARE row_name RECORD; result_row pgmq.metrics_result; BEGIN FOR row_name IN SELECT queue_name FROM pgmq.meta LOOP result_row := pgmq.metrics(row_name.queue_name); RETURN NEXT result_row; END LOOP; END; $$ LANGUAGE plpgsql; -- list queues CREATE FUNCTION pgmq."list_queues"() RETURNS SETOF pgmq.queue_record AS $$ BEGIN RETURN QUERY SELECT * FROM pgmq.meta; END $$ LANGUAGE plpgsql; -- purge queue, deleting all entries in it. CREATE OR REPLACE FUNCTION pgmq."purge_queue"(queue_name TEXT) RETURNS BIGINT AS $$ DECLARE deleted_count INTEGER; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN -- Get the row count before truncating EXECUTE format('SELECT count(*) FROM pgmq.%I', qtable) INTO deleted_count; -- Use TRUNCATE for better performance on large tables EXECUTE format('TRUNCATE TABLE pgmq.%I', qtable); -- Return the number of purged rows RETURN deleted_count; END $$ LANGUAGE plpgsql; -- unassign archive, so it can be kept when a queue is deleted CREATE FUNCTION pgmq."detach_archive"(queue_name TEXT) RETURNS VOID AS $$ DECLARE atable TEXT := pgmq.format_table_name(queue_name, 'a'); BEGIN EXECUTE format('ALTER EXTENSION pgmq DROP TABLE pgmq.%I', atable); END $$ LANGUAGE plpgsql; -- pop a single message CREATE FUNCTION pgmq.pop(queue_name TEXT) RETURNS SETOF pgmq.message_record AS $$ DECLARE sql TEXT; result pgmq.message_record; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.%I WHERE vt <= now() ORDER BY msg_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) DELETE from pgmq.%I WHERE msg_id = (select msg_id from cte) RETURNING *; $QUERY$, qtable, qtable ); RETURN QUERY EXECUTE sql; END; $$ LANGUAGE plpgsql; -- Sets vt of a message, returns it CREATE FUNCTION pgmq.set_vt(queue_name TEXT, msg_id BIGINT, vt INTEGER) RETURNS SETOF pgmq.message_record AS $$ DECLARE sql TEXT; result pgmq.message_record; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ UPDATE pgmq.%I SET vt = (now() + %L) WHERE msg_id = %L RETURNING *; $QUERY$, qtable, make_interval(secs => vt), msg_id ); RETURN QUERY EXECUTE sql; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq._get_pg_partman_schema() RETURNS TEXT AS $$ SELECT extnamespace::regnamespace::text FROM pg_extension WHERE extname = 'pg_partman'; $$ LANGUAGE SQL; CREATE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN) RETURNS BOOLEAN AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); fq_qtable TEXT := 'pgmq.' || qtable; atable TEXT := pgmq.format_table_name(queue_name, 'a'); fq_atable TEXT := 'pgmq.' || atable; BEGIN RAISE WARNING 'drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead'; PERFORM pgmq.drop_queue(queue_name); RETURN TRUE; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.drop_queue(queue_name TEXT) RETURNS BOOLEAN AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); qtable_seq TEXT := qtable || '_msg_id_seq'; fq_qtable TEXT := 'pgmq.' || qtable; atable TEXT := pgmq.format_table_name(queue_name, 'a'); fq_atable TEXT := 'pgmq.' || atable; partitioned BOOLEAN; BEGIN EXECUTE FORMAT( $QUERY$ SELECT is_partitioned FROM pgmq.meta WHERE queue_name = %L $QUERY$, queue_name ) INTO partitioned; EXECUTE FORMAT( $QUERY$ ALTER EXTENSION pgmq DROP TABLE pgmq.%I $QUERY$, qtable ); EXECUTE FORMAT( $QUERY$ ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I $QUERY$, qtable_seq ); EXECUTE FORMAT( $QUERY$ ALTER EXTENSION pgmq DROP TABLE pgmq.%I $QUERY$, atable ); EXECUTE FORMAT( $QUERY$ DROP TABLE IF EXISTS pgmq.%I $QUERY$, qtable ); EXECUTE FORMAT( $QUERY$ DROP TABLE IF EXISTS pgmq.%I $QUERY$, atable ); IF EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_name = 'meta' and table_schema = 'pgmq' ) THEN EXECUTE FORMAT( $QUERY$ DELETE FROM pgmq.meta WHERE queue_name = %L $QUERY$, queue_name ); END IF; IF partitioned THEN EXECUTE FORMAT( $QUERY$ DELETE FROM %I.part_config where parent_table in (%L, %L) $QUERY$, pgmq._get_pg_partman_schema(), fq_qtable, fq_atable ); END IF; RETURN TRUE; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.validate_queue_name(queue_name TEXT) RETURNS void AS $$ BEGIN IF length(queue_name) >= 48 THEN RAISE EXCEPTION 'queue name is too long, maximum length is 48 characters'; END IF; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq._belongs_to_pgmq(table_name TEXT) RETURNS BOOLEAN AS $$ DECLARE sql TEXT; result BOOLEAN; BEGIN SELECT EXISTS ( SELECT 1 FROM pg_depend WHERE refobjid = (SELECT oid FROM pg_extension WHERE extname = 'pgmq') AND objid = ( SELECT oid FROM pg_class WHERE relname = table_name ) ) INTO result; RETURN result; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.create_non_partitioned(queue_name TEXT) RETURNS void AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); qtable_seq TEXT := qtable || '_msg_id_seq'; atable TEXT := pgmq.format_table_name(queue_name, 'a'); BEGIN PERFORM pgmq.validate_queue_name(queue_name); EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB, headers JSONB ) $QUERY$, qtable ); EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT PRIMARY KEY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB, headers JSONB ); $QUERY$, atable ); IF NOT pgmq._belongs_to_pgmq(qtable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); END IF; IF NOT pgmq._belongs_to_pgmq(atable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); END IF; EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (vt ASC); $QUERY$, qtable || '_vt_idx', qtable ); EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at); $QUERY$, 'archived_at_idx_' || queue_name, atable ); EXECUTE FORMAT( $QUERY$ INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged) VALUES (%L, false, false) ON CONFLICT DO NOTHING; $QUERY$, queue_name ); END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.create_unlogged(queue_name TEXT) RETURNS void AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); qtable_seq TEXT := qtable || '_msg_id_seq'; atable TEXT := pgmq.format_table_name(queue_name, 'a'); BEGIN PERFORM pgmq.validate_queue_name(queue_name); EXECUTE FORMAT( $QUERY$ CREATE UNLOGGED TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB, headers JSONB ) $QUERY$, qtable ); EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT PRIMARY KEY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB, headers JSONB ); $QUERY$, atable ); IF NOT pgmq._belongs_to_pgmq(qtable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); END IF; IF NOT pgmq._belongs_to_pgmq(atable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); END IF; EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (vt ASC); $QUERY$, qtable || '_vt_idx', qtable ); EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at); $QUERY$, 'archived_at_idx_' || queue_name, atable ); EXECUTE FORMAT( $QUERY$ INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged) VALUES (%L, false, true) ON CONFLICT DO NOTHING; $QUERY$, queue_name ); END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq._get_partition_col(partition_interval TEXT) RETURNS TEXT AS $$ DECLARE num INTEGER; BEGIN BEGIN num := partition_interval::INTEGER; RETURN 'msg_id'; EXCEPTION WHEN others THEN RETURN 'enqueued_at'; END; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq._ensure_pg_partman_installed() RETURNS void AS $$ DECLARE extension_exists BOOLEAN; BEGIN SELECT EXISTS ( SELECT 1 FROM pg_extension WHERE extname = 'pg_partman' ) INTO extension_exists; IF NOT extension_exists THEN RAISE EXCEPTION 'pg_partman is required for partitioned queues'; END IF; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq._get_pg_partman_major_version() RETURNS INT LANGUAGE SQL AS $$ SELECT split_part(extversion, '.', 1)::INT FROM pg_extension WHERE extname = 'pg_partman' $$; CREATE FUNCTION pgmq.create_partitioned( queue_name TEXT, partition_interval TEXT DEFAULT '10000', retention_interval TEXT DEFAULT '100000' ) RETURNS void AS $$ DECLARE partition_col TEXT; a_partition_col TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); qtable_seq TEXT := qtable || '_msg_id_seq'; atable TEXT := pgmq.format_table_name(queue_name, 'a'); fq_qtable TEXT := 'pgmq.' || qtable; fq_atable TEXT := 'pgmq.' || atable; BEGIN PERFORM pgmq.validate_queue_name(queue_name); PERFORM pgmq._ensure_pg_partman_installed(); SELECT pgmq._get_partition_col(partition_interval) INTO partition_col; EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT GENERATED ALWAYS AS IDENTITY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB, headers JSONB ) PARTITION BY RANGE (%I) $QUERY$, qtable, partition_col ); IF NOT pgmq._belongs_to_pgmq(qtable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); END IF; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md -- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema. EXECUTE FORMAT( $QUERY$ SELECT %I.create_parent( p_parent_table := %L, p_control := %L, p_interval := %L, p_type := case when pgmq._get_pg_partman_major_version() = 5 then 'range' else 'native' end ) $QUERY$, pgmq._get_pg_partman_schema(), fq_qtable, partition_col, partition_interval ); EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (%I); $QUERY$, qtable || '_part_idx', qtable, partition_col ); EXECUTE FORMAT( $QUERY$ UPDATE %I.part_config SET retention = %L, retention_keep_table = false, retention_keep_index = true, automatic_maintenance = 'on' WHERE parent_table = %L; $QUERY$, pgmq._get_pg_partman_schema(), retention_interval, 'pgmq.' || qtable ); EXECUTE FORMAT( $QUERY$ INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged) VALUES (%L, true, false) ON CONFLICT DO NOTHING; $QUERY$, queue_name ); IF partition_col = 'enqueued_at' THEN a_partition_col := 'archived_at'; ELSE a_partition_col := partition_col; END IF; EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT NOT NULL, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB, headers JSONB ) PARTITION BY RANGE (%I); $QUERY$, atable, a_partition_col ); IF NOT pgmq._belongs_to_pgmq(atable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); END IF; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md -- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema. EXECUTE FORMAT( $QUERY$ SELECT %I.create_parent( p_parent_table := %L, p_control := %L, p_interval := %L, p_type := case when pgmq._get_pg_partman_major_version() = 5 then 'range' else 'native' end ) $QUERY$, pgmq._get_pg_partman_schema(), fq_atable, a_partition_col, partition_interval ); EXECUTE FORMAT( $QUERY$ UPDATE %I.part_config SET retention = %L, retention_keep_table = false, retention_keep_index = true, automatic_maintenance = 'on' WHERE parent_table = %L; $QUERY$, pgmq._get_pg_partman_schema(), retention_interval, 'pgmq.' || atable ); EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at); $QUERY$, 'archived_at_idx_' || queue_name, atable ); END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.create(queue_name TEXT) RETURNS void AS $$ BEGIN PERFORM pgmq.create_non_partitioned(queue_name); END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.convert_archive_partitioned( table_name TEXT, partition_interval TEXT DEFAULT '10000', retention_interval TEXT DEFAULT '100000', leading_partition INT DEFAULT 10 ) RETURNS void AS $$ DECLARE a_table_name TEXT := pgmq.format_table_name(table_name, 'a'); a_table_name_old TEXT := pgmq.format_table_name(table_name, 'a') || '_old'; qualified_a_table_name TEXT := format('pgmq.%I', a_table_name); BEGIN PERFORM c.relkind FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = a_table_name AND c.relkind = 'p'; IF FOUND THEN RAISE NOTICE 'Table %s is already partitioned', a_table_name; RETURN; END IF; PERFORM c.relkind FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = a_table_name AND c.relkind = 'r'; IF NOT FOUND THEN RAISE NOTICE 'Table %s does not exists', a_table_name; RETURN; END IF; EXECUTE 'ALTER TABLE ' || qualified_a_table_name || ' RENAME TO ' || a_table_name_old; EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including all) PARTITION BY RANGE (msg_id)', a_table_name, a_table_name_old ); EXECUTE 'ALTER INDEX pgmq.archived_at_idx_' || table_name || ' RENAME TO archived_at_idx_' || table_name || '_old'; EXECUTE 'CREATE INDEX archived_at_idx_'|| table_name || ' ON ' || qualified_a_table_name ||'(archived_at)'; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md -- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema. EXECUTE FORMAT( $QUERY$ SELECT %I.create_parent( p_parent_table := %L, p_control := 'msg_id', p_interval := %L, p_type := case when pgmq._get_pg_partman_major_version() = 5 then 'range' else 'native' end ) $QUERY$, pgmq._get_pg_partman_schema(), qualified_a_table_name, partition_interval ); EXECUTE FORMAT( $QUERY$ UPDATE %I.part_config SET retention = %L, retention_keep_table = false, retention_keep_index = false, infinite_time_partitions = true WHERE parent_table = %L; $QUERY$, pgmq._get_pg_partman_schema(), retention_interval, qualified_a_table_name ); END; $$ LANGUAGE plpgsql;