# Functions ## Sending Messages ### send Send a single message to a queue. ```text pgmq.send( queue_name text, msg jsonb, delay integer DEFAULT 0 ) RETURNS SETOF bigint ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg | jsonb | The message to send to the queue | | delay | integer | Time in seconds before the message becomes visible. Defaults to 0. | Example: ```sql select * from pgmq.send('my_queue', '{"hello": "world"}'); send ------ 4 ``` --- ### send_batch Send 1 or more messages to a queue. ```text pgmq.send_batch( queue_name text, msgs jsonb[], delay integer DEFAULT 0 ) RETURNS SETOF bigint ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msgs | jsonb[] | Array of messages to send to the queue | | delay | integer | Time in seconds before the messages becomes visible. Defaults to 0. | ```sql select * from pgmq.send_batch('my_queue', ARRAY[ '{"hello": "world_0"}'::jsonb, '{"hello": "world_1"}'::jsonb] ); send_batch ------------ 1 2 ``` --- ## Reading Messages ### read Read 1 or more messages from a queue. The VT specifies the amount of time in seconds that the message will be invisible to other consumers after reading.
 
pgmq.read(
    queue_name text,
    vt integer,
    qty integer)

RETURNS SETOF pgmq.message_record
 
**Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | vt | integer | Time in seconds that the message become invisible after reading | | qty | integer | The number of messages to read from the queue. Defaults to 1 | Example: ```sql select * from pgmq.read('my_queue', 10, 2); msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"} 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"} (2 rows) ``` --- ### read_with_poll Same as read(). Also provides convenient long-poll functionality. When there are no messages in the queue, the function call will wait for `max_poll_seconds` in duration before returning. If messages reach the queue during that duration, they will be read and returned immediately.
 
 pgmq.read_with_poll(
    queue_name text,
    vt integer,
    qty integer,
    max_poll_seconds integer DEFAULT 5,
    poll_interval_ms integer DEFAULT 100
)
RETURNS SETOF pgmq.message_record
 
**Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | vt | integer | Time in seconds that the message become invisible after reading. | | qty | integer | The number of messages to read from the queue. Defaults to 1. | | max_poll_seconds | integer | Time in seconds to wait for new messages to reach the queue. Defaults to 5. | | poll_interval_ms | integer | Milliseconds between the internal poll operations. Defaults to 100. | Example: ```sql select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100); msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"} ``` --- ### pop Reads a single message from a queue and deletes it upon read. Note: utilization of pop() results in at-most-once delivery semantics if the consuming application does not guarantee processing of the message.
 
pgmq.pop(queue_name text)
RETURNS SETOF pgmq.message_record
 
**Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | Example: ```sql pgmq=# select * from pgmq.pop('my_queue'); msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+-------------------- 1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"} ``` --- ## Deleting/Archiving Messages ### delete (single) Deletes a single message from a queue. ```text pgmq.delete (queue_name text, msg_id: bigint) RETURNS boolean ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_id | bigint | Message ID of the message to delete | Example: ```sql select pgmq.delete('my_queue', 5); delete -------- t ``` --- ### delete (batch) Delete one or many messages from a queue. ```text pgmq.delete (queue_name text, msg_ids: bigint[]) RETURNS SETOF bigint ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_ids | bigint[] | Array of message IDs to delete | Examples: Delete two messages that exist. ```sql select * from pgmq.delete('my_queue', ARRAY[2, 3]); delete -------- 2 3 ``` Delete two messages, one that exists and one that does not. Message `999` does not exist. ```sql select * from pgmq.delete('my_queue', ARRAY[6, 999]); delete -------- 6 ``` --- ### purge_queue Permanently deletes all messages in a queue. Returns the number of messages that were deleted. ```text purge_queue(queue_name text) RETURN bigint ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | Example: Purge the queue when it contains 8 messages; ```sql select * from pgmq.purge_queue('my_queue'); purge_queue ------------- 8 ``` --- ### archive (single) Removes a single requested message from the specified queue and inserts it into the queue's archive. ```text pgmq.archive(queue_name text, msg_id bigint) RETURNS boolean ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_id | bigint | Message ID of the message to archive | Returns Boolean value indicating success or failure of the operation. Example; remove message with ID 1 from queue `my_queue` and archive it: ```sql SELECT * FROM pgmq.archive('my_queue', 1); archive --------- t ``` --- ### archive (batch) Deletes a batch of requested messages from the specified queue and inserts them into the queue's archive. Returns an ARRAY of message ids that were successfully archived. ```text pgmq.archive(queue_name text, msg_ids bigint[]) RETURNS SETOF bigint ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_ids | bigint[] | Array of message IDs to archive | Examples: Delete messages with ID 1 and 2 from queue `my_queue` and move to the archive. ```sql SELECT * FROM pgmq.archive('my_queue', ARRAY[1, 2]); archive --------- 1 2 ``` Delete messages 4, which exists and 999, which does not exist. ```sql select * from pgmq.archive('my_queue', ARRAY[4, 999]); archive --------- 4 ``` --- ## Queue Management ### create Create a new queue. ```text pgmq.create(queue_name text) RETURNS VOID ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | Example: ```sql select from pgmq.create('my_queue'); create -------- ``` --- ### create_partitioned Create a partitioned queue. ```text pgmq.create_partitioned ( queue-ue_name text, partition_interval text DEFAULT '10000'::text, retention_interval text DEFAULT '100000'::text ) RETURNS void ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | partition_interval | text | The name of the queue | | retention_interval | text | The name of the queue | Example: Create a queue with 100,000 messages per partition, and will retain 10,000,000 messages on old partitions. Partitions greater than this will be deleted. ```sql select from pgmq.create_partitioned( 'my_partitioned_queue', '100000', '10000000' ); create_partitioned -------------------- ``` --- ### create_unlogged Creates an unlogged table. This is useful when write throughput is more important that durability. See Postgres documentation for [unlogged tables](https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-UNLOGGED) for more information. ```text pgmq.create_unlogged(queue_name text) RETURNS void ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | Example: ```sql select pgmq.create_unlogged('my_unlogged'); create_unlogged ----------------- ``` --- ### detach_archive Drop the queue's archive table as a member of the PGMQ extension. Useful for preventing the queue's archive table from being drop when `DROP EXTENSION pgmq` is executed. This does not prevent the further archives() from appending to the archive table. ```text pgmq.detach_archive(queue_name text) ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | Example: ```sql select * from pgmq.detach_archive('my_queue'); detach_archive ---------------- ``` --- ### drop_queue Deletes a queue and its archive table. ```text pgmq.drop_queue(queue_name text) RETURNS boolean ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | Example: ```sql select * from pgmq.drop_queue('my_unlogged'); drop_queue ------------ t ``` ## Utilities ### set_vt Sets the visibility timeout of a message to a specified time duration in the future. Returns the record of the message that was updated. ```text pgmq.set_vt( queue_name text, msg_id bigint, vt_offset integer ) RETURNS pgmq.message_record ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg_id | bigint | ID of the message to set visibility time | | vt_offset | integer | Duration from now, in seconds, that the message's VT should be set to | Example: Set the visibility timeout of message 1 to 30 seconds from now. ```sql select * from pgmq.set_vt('my_queue', 11, 30); msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"} ``` --- ### list_queues List all the queues that currently exist. ```sql list_queues() RETURNS TABLE( queue_name text, created_at timestamp with time zone, is_partitioned boolean, is_unlogged boolean ) ``` Example: ```sql select * from pgmq.list_queues(); queue_name | created_at | is_partitioned | is_unlogged ----------------------+-------------------------------+----------------+------------- my_queue | 2023-10-28 14:13:17.092576-05 | f | f my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f my_unlogged | 2023-10-28 20:02:30.976109-05 | f | t ``` --- ### metrics Get metrics for a specific queue. ```text pgmq.metrics(queue_name: text) RETURNS TABLE( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone ) ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | **Returns:** | Attribute | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | queue_length | bigint | Number of messages currently in the queue | | newest_msg_age_sec | integer \| null | Age of the newest message in the queue, in seconds | | oldest_msg_age_sec | integer \| null | Age of the oldest message in the queue, in seconds | | total_messages | bigint | Total number of messages that have passed through the queue over all time | | scrape_time | timestamp with time zone | The current timestamp | Example: ```sql select * from pgmq.metrics('my_queue'); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time ------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05 ``` --- ### metrics_all Get metrics for all existing queues. ```text pgmq.metrics_all() RETURNS TABLE( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone ) ``` **Returns:** | Attribute | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | queue_length | bigint | Number of messages currently in the queue | | newest_msg_age_sec | integer \| null | Age of the newest message in the queue, in seconds | | oldest_msg_age_sec | integer \| null | Age of the oldest message in the queue, in seconds | | total_messages | bigint | Total number of messages that have passed through the queue over all time | | scrape_time | timestamp with time zone | The current timestamp | ```sql select * from pgmq.metrics_all(); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time ----------------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05 my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05 my_unlogged | 1 | 3 | 3 | 1 | 2023-10-28 20:25:07.016413-05 ```