CREATE OR REPLACE FUNCTION pgmq.notify_queue_listeners() RETURNS TRIGGER AS $$ BEGIN PERFORM PG_NOTIFY('pgmq.' || TG_TABLE_NAME || '.' || TG_OP, NULL); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.enable_notify_insert(queue_name TEXT) RETURNS void AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN PERFORM pgmq.disable_notify_insert(queue_name); EXECUTE FORMAT( $QUERY$ CREATE CONSTRAINT TRIGGER trigger_notify_queue_insert_listeners AFTER INSERT ON pgmq.%I DEFERRABLE FOR EACH ROW EXECUTE PROCEDURE pgmq.notify_queue_listeners() $QUERY$, qtable ); END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION pgmq.disable_notify_insert(queue_name TEXT) RETURNS void AS $$ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN EXECUTE FORMAT( $QUERY$ DROP TRIGGER IF EXISTS trigger_notify_queue_insert_listeners ON pgmq.%I; $QUERY$, qtable ); END; $$ LANGUAGE plpgsql; -- replace pop function with new version that adds optional multi-message pop DROP FUNCTION pgmq.pop(queue_name TEXT); CREATE FUNCTION pgmq.pop(queue_name TEXT, qty INTEGER DEFAULT 1) RETURNS SETOF pgmq.message_record AS $$ DECLARE sql TEXT; result pgmq.message_record; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.%I WHERE vt <= clock_timestamp() ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED ) DELETE from pgmq.%I WHERE msg_id IN (select msg_id from cte) RETURNING *; $QUERY$, qtable, qtable ); RETURN QUERY EXECUTE sql USING qty; END; $$ LANGUAGE plpgsql;