CREATE OR REPLACE FUNCTION pgmq.format_table_name(queue_name text, prefix text) RETURNS TEXT AS $$ BEGIN IF queue_name ~ '\$|;|--|''' THEN RAISE EXCEPTION 'queue name contains invalid characters: $, ;, --, or \'''; END IF; RETURN lower(prefix || '_' || queue_name); END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.create_non_partitioned(queue_name TEXT) RETURNS void AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); atable TEXT := pgmq.format_table_name(queue_name, 'a'); BEGIN PERFORM pgmq.validate_queue_name(queue_name); EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB ) $QUERY$, qtable ); EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT PRIMARY KEY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB ); $QUERY$, atable ); IF NOT pgmq._belongs_to_pgmq(qtable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); END IF; IF NOT pgmq._belongs_to_pgmq(atable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); END IF; EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (vt ASC); $QUERY$, qtable || '_vt_idx', qtable ); EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at); $QUERY$, 'archived_at_idx_' || queue_name, atable ); EXECUTE FORMAT( $QUERY$ INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged) VALUES (%L, false, false) ON CONFLICT DO NOTHING; $QUERY$, queue_name ); END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.create_partitioned( queue_name TEXT, partition_interval TEXT DEFAULT '10000', retention_interval TEXT DEFAULT '100000' ) RETURNS void AS $$ DECLARE partition_col TEXT; a_partition_col TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); atable TEXT := pgmq.format_table_name(queue_name, 'a'); fq_qtable TEXT := 'pgmq.' || qtable; fq_atable TEXT := 'pgmq.' || atable; BEGIN PERFORM pgmq.validate_queue_name(queue_name); PERFORM pgmq._ensure_pg_partman_installed(); SELECT pgmq._get_partition_col(partition_interval) INTO partition_col; EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT GENERATED ALWAYS AS IDENTITY, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB ) PARTITION BY RANGE (%I) $QUERY$, qtable, partition_col ); IF NOT pgmq._belongs_to_pgmq(qtable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); END IF; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md -- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema. PERFORM public.create_parent( fq_qtable, partition_col, 'native', partition_interval ); EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (%I); $QUERY$, qtable || '_part_idx', qtable, partition_col ); EXECUTE FORMAT( $QUERY$ UPDATE public.part_config SET retention = %L, retention_keep_table = false, retention_keep_index = true, automatic_maintenance = 'on' WHERE parent_table = %L; $QUERY$, retention_interval, 'pgmq.' || qtable ); EXECUTE FORMAT( $QUERY$ INSERT INTO pgmq.meta (queue_name, is_partitioned, is_unlogged) VALUES (%L, true, false) ON CONFLICT DO NOTHING; $QUERY$, queue_name ); IF partition_col = 'enqueued_at' THEN a_partition_col := 'archived_at'; ELSE a_partition_col := partition_col; END IF; EXECUTE FORMAT( $QUERY$ CREATE TABLE IF NOT EXISTS pgmq.%I ( msg_id BIGINT, read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, message JSONB ) PARTITION BY RANGE (%I); $QUERY$, atable, a_partition_col ); IF NOT pgmq._belongs_to_pgmq(atable) THEN EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); END IF; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md -- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema. PERFORM public.create_parent( fq_atable, a_partition_col, 'native', partition_interval ); EXECUTE FORMAT( $QUERY$ UPDATE public.part_config SET retention = %L, retention_keep_table = false, retention_keep_index = true, automatic_maintenance = 'on' WHERE parent_table = %L; $QUERY$, retention_interval, 'pgmq.' || atable ); EXECUTE FORMAT( $QUERY$ CREATE INDEX IF NOT EXISTS %I ON pgmq.%I (archived_at); $QUERY$, 'archived_at_idx_' || queue_name, atable ); END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.drop_queue(queue_name TEXT, partitioned BOOLEAN DEFAULT FALSE) RETURNS BOOLEAN AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); fq_qtable TEXT := 'pgmq.' || qtable; atable TEXT := pgmq.format_table_name(queue_name, 'a'); fq_atable TEXT := 'pgmq.' || atable; BEGIN EXECUTE FORMAT( $QUERY$ ALTER EXTENSION pgmq DROP TABLE pgmq.%I $QUERY$, qtable ); EXECUTE FORMAT( $QUERY$ ALTER EXTENSION pgmq DROP TABLE pgmq.%I $QUERY$, atable ); EXECUTE FORMAT( $QUERY$ DROP TABLE IF EXISTS pgmq.%I $QUERY$, qtable ); EXECUTE FORMAT( $QUERY$ DROP TABLE IF EXISTS pgmq.%I $QUERY$, atable ); IF EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_name = 'meta' and table_schema = 'pgmq' ) THEN EXECUTE FORMAT( $QUERY$ DELETE FROM pgmq.meta WHERE queue_name = %L $QUERY$, queue_name ); END IF; IF partitioned THEN EXECUTE FORMAT( $QUERY$ DELETE FROM public.part_config where parent_table in (%L, %L) $QUERY$, fq_qtable, fq_atable ); END IF; RETURN TRUE; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.convert_archive_partitioned(table_name TEXT, partition_interval TEXT DEFAULT '10000', retention_interval TEXT DEFAULT '100000', leading_partition INT DEFAULT 10) RETURNS void AS $$ DECLARE a_table_name TEXT := pgmq.format_table_name(table_name, 'a'); a_table_name_old TEXT := pgmq.format_table_name(table_name, 'a') || '_old'; qualified_a_table_name TEXT := format('pgmq.%I', a_table_name); BEGIN PERFORM c.relkind FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = a_table_name AND c.relkind = 'p'; IF FOUND THEN RAISE NOTICE 'Table %s is already partitioned', a_table_name; RETURN; END IF; PERFORM c.relkind FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = a_table_name AND c.relkind = 'r'; IF NOT FOUND THEN RAISE NOTICE 'Table %s does not exists', a_table_name; RETURN; END IF; EXECUTE 'ALTER TABLE ' || qualified_a_table_name || ' RENAME TO ' || a_table_name_old; EXECUTE format( 'CREATE TABLE pgmq.%I (LIKE pgmq.%I including all) PARTITION BY RANGE (msg_id)', a_table_name, a_table_name_old ); EXECUTE 'ALTER INDEX pgmq.archived_at_idx_' || table_name || ' RENAME TO archived_at_idx_' || table_name || '_old'; EXECUTE 'CREATE INDEX archived_at_idx_'|| table_name || ' ON ' || qualified_a_table_name ||'(archived_at)'; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md -- p_parent_table - the existing parent table. MUST be schema qualified, even if in public schema. PERFORM create_parent(qualified_a_table_name, 'msg_id', 'native', partition_interval, p_premake := leading_partition); UPDATE part_config SET retention = retention_interval, retention_keep_table = false, retention_keep_index = false, infinite_time_partitions = true WHERE parent_table = qualified_a_table_name; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.metrics(queue_name TEXT) RETURNS pgmq.metrics_result AS $$ DECLARE result_row pgmq.metrics_result; query TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN query := FORMAT( $QUERY$ WITH q_summary AS ( SELECT count(*) as queue_length, EXTRACT(epoch FROM (NOW() - max(enqueued_at)))::int as newest_msg_age_sec, EXTRACT(epoch FROM (NOW() - min(enqueued_at)))::int as oldest_msg_age_sec, NOW() as scrape_time FROM pgmq.%I ), all_metrics AS ( SELECT CASE WHEN is_called THEN last_value ELSE 0 END as total_messages FROM pgmq.%I ) SELECT %L as queue_name, q_summary.queue_length, q_summary.newest_msg_age_sec, q_summary.oldest_msg_age_sec, all_metrics.total_messages, q_summary.scrape_time FROM q_summary, all_metrics $QUERY$, qtable, qtable || '_msg_id_seq', queue_name ); EXECUTE query INTO result_row; RETURN result_row; END; $$ LANGUAGE plpgsql;