-- pop a single message CREATE OR REPLACE FUNCTION pgmq.pop(queue_name TEXT) RETURNS SETOF pgmq.message_record AS $$ DECLARE sql TEXT; result pgmq.message_record; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.%I WHERE vt <= clock_timestamp() ORDER BY msg_id ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) DELETE from pgmq.%I WHERE msg_id = (select msg_id from cte) RETURNING *; $QUERY$, qtable, qtable ); RETURN QUERY EXECUTE sql; END; $$ LANGUAGE plpgsql; -- Sets vt of a message, returns it CREATE OR REPLACE FUNCTION pgmq.set_vt(queue_name TEXT, msg_id BIGINT, vt INTEGER) RETURNS SETOF pgmq.message_record AS $$ DECLARE sql TEXT; result pgmq.message_record; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( $QUERY$ UPDATE pgmq.%I SET vt = (clock_timestamp() + %L) WHERE msg_id = %L RETURNING *; $QUERY$, qtable, make_interval(secs => vt), msg_id ); RETURN QUERY EXECUTE sql; END; $$ LANGUAGE plpgsql;