CREATE OR REPLACE FUNCTION pgmq.read_with_poll( queue_name TEXT, vt INTEGER, qty INTEGER, max_poll_seconds INTEGER DEFAULT 5, poll_interval_ms INTEGER DEFAULT 100 ) RETURNS SETOF pgmq.message_record AS $$ DECLARE r pgmq.message_record; stop_at TIMESTAMP; sql TEXT; qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN stop_at := clock_timestamp() + make_interval(secs => max_poll_seconds); LOOP IF (SELECT clock_timestamp() >= stop_at) THEN RETURN; END IF; sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.%I WHERE vt <= clock_timestamp() ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED ) UPDATE pgmq.%I m SET vt = clock_timestamp() + %L, read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; $QUERY$, qtable, qtable, make_interval(secs => vt) ); FOR r IN EXECUTE sql USING qty LOOP RETURN NEXT r; END LOOP; IF FOUND THEN RETURN; ELSE PERFORM pg_sleep(poll_interval_ms::numeric / 1000); END IF; END LOOP; END; $$ LANGUAGE plpgsql;