# Functions ## Sending Messages ### send Send a single message to a queue. ```text pgmq.send( queue_name text, msg jsonb, delay integer DEFAULT 0 ) RETURNS SETOF bigint ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg | jsonb | The message to send to the queue | | delay | integer | Time in seconds before the message becomes visible. Defaults to 0. | Example: ```sql select * from pgmq.send('my_queue', '{"hello": "world"}'); send ------ 4 ``` --- ### send_batch Send 1 or more messages to a queue. ```text pgmq.send_batch( queue_name text, msgs jsonb[], delay integer DEFAULT 0 ) RETURNS SETOF bigint ``` **Parameters:** | Parameter | Type | Description | | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msgs | jsonb[] | Array of messages to send to the queue | | delay | integer | Time in seconds before the messages becomes visible. Defaults to 0. | ```sql select * from pgmq.send_batch('my_queue', ARRAY[ '{"hello": "world_0"}'::jsonb, '{"hello": "world_1"}'::jsonb] ); send_batch ------------ 1 2 ``` --- ## Reading Messages ### read Read 1 or more messages from a queue. The VT specifies the amount of time in seconds that the message will be invisible to other consumers after reading.
pgmq.read(
queue_name text,
vt integer,
qty integer)
RETURNS SETOF pgmq.message_record
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| vt | integer | Time in seconds that the message become invisible after reading |
| qty | integer | The number of messages to read from the queue. Defaults to 1 |
Example:
```sql
select * from pgmq.read('my_queue', 10, 2);
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+----------------------
1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"}
2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}
(2 rows)
```
---
### read_with_poll
Same as read(). Also provides convenient long-poll functionality.
When there are no messages in the queue, the function call will wait for `max_poll_seconds` in duration before returning.
If messages reach the queue during that duration, they will be read and returned immediately.
pgmq.read_with_poll(
queue_name text,
vt integer,
qty integer,
max_poll_seconds integer DEFAULT 5,
poll_interval_ms integer DEFAULT 100
)
RETURNS SETOF pgmq.message_record
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| vt | integer | Time in seconds that the message become invisible after reading. |
| qty | integer | The number of messages to read from the queue. Defaults to 1. |
| max_poll_seconds | integer | Time in seconds to wait for new messages to reach the queue. Defaults to 5. |
| poll_interval_ms | integer | Milliseconds between the internal poll operations. Defaults to 100. |
Example:
```sql
select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100);
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+--------------------
1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
```
---
### pop
Reads a single message from a queue and deletes it upon read.
Note: utilization of pop() results in at-most-once delivery semantics if the consuming application does not guarantee processing of the message.
pgmq.pop(queue_name text)
RETURNS SETOF pgmq.message_record
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
Example:
```sql
pgmq=# select * from pgmq.pop('my_queue');
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+--------------------
1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
```
---
## Deleting/Archiving Messages
### delete (single)
Deletes a single message from a queue.
```text
pgmq.delete (queue_name text, msg_id: bigint)
RETURNS boolean
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msg_id | bigint | Message ID of the message to delete |
Example:
```sql
select pgmq.delete('my_queue', 5);
delete
--------
t
```
---
### delete (batch)
Delete one or many messages from a queue.
```text
pgmq.delete (queue_name text, msg_ids: bigint[])
RETURNS SETOF bigint
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msg_ids | bigint[] | Array of message IDs to delete |
Examples:
Delete two messages that exist.
```sql
select * from pgmq.delete('my_queue', ARRAY[2, 3]);
delete
--------
2
3
```
Delete two messages, one that exists and one that does not. Message `999` does not exist.
```sql
select * from pgmq.delete('my_queue', ARRAY[6, 999]);
delete
--------
6
```
---
### purge_queue
Permanently deletes all messages in a queue. Returns the number of messages that were deleted.
```text
purge_queue(queue_name text)
RETURN bigint
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
Example:
Purge the queue when it contains 8 messages;
```sql
select * from pgmq.purge_queue('my_queue');
purge_queue
-------------
8
```
---
### archive (single)
Removes a single requested message from the specified queue and inserts it into the queue's archive.
```text
pgmq.archive(queue_name text, msg_id bigint)
RETURNS boolean
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msg_id | bigint | Message ID of the message to archive |
Returns
Boolean value indicating success or failure of the operation.
Example; remove message with ID 1 from queue `my_queue` and archive it:
```sql
SELECT * FROM pgmq.archive('my_queue', 1);
archive
---------
t
```
---
### archive (batch)
Deletes a batch of requested messages from the specified queue and inserts them into the queue's archive.
Returns an ARRAY of message ids that were successfully archived.
```text
pgmq.archive(queue_name text, msg_ids bigint[])
RETURNS SETOF bigint
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msg_ids | bigint[] | Array of message IDs to archive |
Examples:
Delete messages with ID 1 and 2 from queue `my_queue` and move to the archive.
```sql
SELECT * FROM pgmq.archive('my_queue', ARRAY[1, 2]);
archive
---------
1
2
```
Delete messages 4, which exists and 999, which does not exist.
```sql
select * from pgmq.archive('my_queue', ARRAY[4, 999]);
archive
---------
4
```
---
## Queue Management
### create
Create a new queue.
```text
pgmq.create(queue_name text)
RETURNS VOID
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
Example:
```sql
select from pgmq.create('my_queue');
create
--------
```
---
### create_partitioned
Create a partitioned queue.
```text
pgmq.create_partitioned (
queue-ue_name text,
partition_interval text DEFAULT '10000'::text,
retention_interval text DEFAULT '100000'::text
)
RETURNS void
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| partition_interval | text | The name of the queue |
| retention_interval | text | The name of the queue |
Example:
Create a queue with 100,000 messages per partition, and will retain 10,000,000 messages on old partitions. Partitions greater than this will be deleted.
```sql
select from pgmq.create_partitioned(
'my_partitioned_queue',
'100000',
'10000000'
);
create_partitioned
--------------------
```
---
### create_unlogged
Creates an unlogged table. This is useful when write throughput is more important that durability.
See Postgres documentation for [unlogged tables](https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-UNLOGGED) for more information.
```text
pgmq.create_unlogged(queue_name text)
RETURNS void
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
Example:
```sql
select pgmq.create_unlogged('my_unlogged');
create_unlogged
-----------------
```
---
### detach_archive
Drop the queue's archive table as a member of the PGMQ extension. Useful for preventing the queue's archive table from being drop when `DROP EXTENSION pgmq` is executed.
This does not prevent the further archives() from appending to the archive table.
```text
pgmq.detach_archive(queue_name text)
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
Example:
```sql
select * from pgmq.detach_archive('my_queue');
detach_archive
----------------
```
---
### drop_queue
Deletes a queue and its archive table.
```text
pgmq.drop_queue(queue_name text)
RETURNS boolean
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
Example:
```sql
select * from pgmq.drop_queue('my_unlogged');
drop_queue
------------
t
```
## Utilities
### set_vt
Sets the visibility timeout of a message to a specified time duration in the future. Returns the record of the message that was updated.
```text
pgmq.set_vt(
queue_name text,
msg_id bigint,
vt_offset integer
)
RETURNS pgmq.message_record
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msg_id | bigint | ID of the message to set visibility time |
| vt_offset | integer | Duration from now, in seconds, that the message's VT should be set to |
Example:
Set the visibility timeout of message 1 to 30 seconds from now.
```sql
select * from pgmq.set_vt('my_queue', 11, 30);
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+----------------------
1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"}
```
---
### list_queues
List all the queues that currently exist.
```sql
list_queues()
RETURNS TABLE(
queue_name text,
created_at timestamp with time zone,
is_partitioned boolean,
is_unlogged boolean
)
```
Example:
```sql
select * from pgmq.list_queues();
queue_name | created_at | is_partitioned | is_unlogged
----------------------+-------------------------------+----------------+-------------
my_queue | 2023-10-28 14:13:17.092576-05 | f | f
my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f
my_unlogged | 2023-10-28 20:02:30.976109-05 | f | t
```
---
### metrics
Get metrics for a specific queue.
```text
pgmq.metrics(queue_name: text)
RETURNS TABLE(
queue_name text,
queue_length bigint,
newest_msg_age_sec integer,
oldest_msg_age_sec integer,
total_messages bigint,
scrape_time timestamp with time zone
)
```
**Parameters:**
| Parameter | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
**Returns:**
| Attribute | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| queue_length | bigint | Number of messages currently in the queue |
| newest_msg_age_sec | integer \| null | Age of the newest message in the queue, in seconds |
| oldest_msg_age_sec | integer \| null | Age of the oldest message in the queue, in seconds |
| total_messages | bigint | Total number of messages that have passed through the queue over all time |
| scrape_time | timestamp with time zone | The current timestamp |
Example:
```sql
select * from pgmq.metrics('my_queue');
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
------------+--------------+--------------------+--------------------+----------------+-------------------------------
my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05
```
---
### metrics_all
Get metrics for all existing queues.
```text
pgmq.metrics_all()
RETURNS TABLE(
queue_name text,
queue_length bigint,
newest_msg_age_sec integer,
oldest_msg_age_sec integer,
total_messages bigint,
scrape_time timestamp with time zone
)
```
**Returns:**
| Attribute | Type | Description |
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| queue_length | bigint | Number of messages currently in the queue |
| newest_msg_age_sec | integer \| null | Age of the newest message in the queue, in seconds |
| oldest_msg_age_sec | integer \| null | Age of the oldest message in the queue, in seconds |
| total_messages | bigint | Total number of messages that have passed through the queue over all time |
| scrape_time | timestamp with time zone | The current timestamp |
```sql
select * from pgmq.metrics_all();
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
----------------------+--------------+--------------------+--------------------+----------------+-------------------------------
my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05
my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05
my_unlogged | 1 | 3 | 3 | 1 | 2023-10-28 20:25:07.016413-05
```