-- Dropping rust impls DROP FUNCTION pgmq.read(text, integer, integer); DROP FUNCTION pgmq.read_with_poll(text, integer, integer, integer, integer); DROP FUNCTION pgmq.archive(text, bigint); DROP FUNCTION pgmq.archive(text, bigint[]); DROP FUNCTION pgmq.delete(text, bigint); DROP FUNCTION pgmq.delete(text, bigint[]); -- Adding message record type, required for the functions CREATE TYPE pgmq.message_record AS ( msg_id BIGINT, read_ct INTEGER, enqueued_at TIMESTAMP WITH TIME ZONE, vt TIMESTAMP WITH TIME ZONE, message JSONB ); -- Adding PL/pgSQL impls -- Copy-pasted from pgrx schema CREATE FUNCTION pgmq.read( queue_name TEXT, vt INTEGER, qty INTEGER ) RETURNS SETOF pgmq.message_record AS $$ DECLARE sql TEXT; BEGIN sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.q_%s WHERE vt <= clock_timestamp() ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED ) UPDATE pgmq.q_%s m SET vt = clock_timestamp() + interval '%s seconds', read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; $QUERY$, queue_name, queue_name, vt ); RETURN QUERY EXECUTE sql USING qty; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.read_with_poll( queue_name TEXT, vt INTEGER, qty INTEGER, max_poll_seconds INTEGER DEFAULT 5, poll_interval_ms INTEGER DEFAULT 100 ) RETURNS SETOF pgmq.message_record AS $$ DECLARE r pgmq.message_record; stop_at TIMESTAMP; sql TEXT; BEGIN stop_at := clock_timestamp() + FORMAT('%s seconds', max_poll_seconds)::interval; LOOP IF (SELECT clock_timestamp() >= stop_at) THEN RETURN; END IF; sql := FORMAT( $QUERY$ WITH cte AS ( SELECT msg_id FROM pgmq.q_%s WHERE vt <= clock_timestamp() ORDER BY msg_id ASC LIMIT $1 FOR UPDATE SKIP LOCKED ) UPDATE pgmq.q_%s t SET vt = clock_timestamp() + interval '%s seconds', read_ct = read_ct + 1 FROM cte WHERE t.msg_id=cte.msg_id RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; $QUERY$, queue_name, queue_name, vt ); FOR r IN EXECUTE sql USING qty LOOP RETURN NEXT r; END LOOP; IF FOUND THEN RETURN; ELSE PERFORM pg_sleep(poll_interval_ms / 1000); END IF; END LOOP; END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.archive( queue_name TEXT, msg_id BIGINT ) RETURNS BOOLEAN AS $$ DECLARE sql TEXT; result BIGINT; BEGIN sql := FORMAT( $QUERY$ WITH archived AS ( DELETE FROM pgmq.q_%s WHERE msg_id = $1 RETURNING msg_id, vt, read_ct, enqueued_at, message ) INSERT INTO pgmq.a_%s (msg_id, vt, read_ct, enqueued_at, message) SELECT msg_id, vt, read_ct, enqueued_at, message FROM archived RETURNING msg_id; $QUERY$, queue_name, queue_name ); EXECUTE sql USING msg_id INTO result; RETURN NOT (result IS NULL); END; $$ LANGUAGE plpgsql; CREATE FUNCTION pgmq.archive( queue_name TEXT, msg_id BIGINT[] ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; BEGIN sql := FORMAT( $QUERY$ WITH archived AS ( DELETE FROM pgmq.q_%s WHERE msg_id = ANY($1) RETURNING msg_id, vt, read_ct, enqueued_at, message ) INSERT INTO pgmq.a_%s (msg_id, vt, read_ct, enqueued_at, message) SELECT msg_id, vt, read_ct, enqueued_at, message FROM archived RETURNING msg_id; $QUERY$, queue_name, queue_name ); RETURN QUERY EXECUTE sql USING msg_id; END; $$ LANGUAGE plpgsql; ---- delete ---- deletes a message id from the queue permanently CREATE FUNCTION pgmq.delete( queue_name TEXT, msg_id BIGINT ) RETURNS BOOLEAN AS $$ DECLARE sql TEXT; result BIGINT; BEGIN sql := FORMAT( $QUERY$ DELETE FROM pgmq.q_%s WHERE msg_id = $1 RETURNING msg_id $QUERY$, queue_name ); EXECUTE sql USING msg_id INTO result; RETURN NOT (result IS NULL); END; $$ LANGUAGE plpgsql; ---- delete ---- deletes an array of message ids from the queue permanently CREATE FUNCTION pgmq.delete( queue_name TEXT, msg_id BIGINT[] ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; BEGIN sql := FORMAT( $QUERY$ DELETE FROM pgmq.q_%s WHERE msg_id = ANY($1) RETURNING msg_id $QUERY$, queue_name ); RETURN QUERY EXECUTE sql USING msg_id; END; $$ LANGUAGE plpgsql;