pgmq: Queues
pgmq is a lightweight message queue built on Postgres.
Features
- Lightweight - No background worker or external dependencies, just Postgres functions packaged in an extension
- "exactly once" delivery of messages to a consumer within a visibility timeout
- API parity with AWS SQS and RSMQ
- Messages stay in the queue until explicitly removed
- Messages can be archived, instead of deleted, for long-term retention and replayability
Enable the extension
_10create extension pgmq;
Usage
Sending Messages
send
Send a single message to a queue.
_10pgmq.send(_10 queue_name text,_10 msg jsonb,_10 delay integer default 0_10)_10returns setof bigint
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
msg | jsonb | The message to send to the queue |
delay | integer | Time in seconds before the message becomes visible. Defaults to 0. |
Example:
_10select * from pgmq.send('my_queue', '{"hello": "world"}');_10 send _10------_10 4
send_batch
Send 1 or more messages to a queue.
_10pgmq.send_batch(_10 queue_name text,_10 msgs jsonb[],_10 delay integer default 0_10)_10returns setof bigint
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
msgs | jsonb[] | Array of messages to send to the queue |
delay | integer | Time in seconds before the messages becomes visible. Defaults to 0. |
_11select * from pgmq.send_batch(_11 'my_queue',_11 array[_11 '{"hello": "world_0"}'::jsonb,_11 '{"hello": "world_1"}'::jsonb_11 ]_11);_11 send_batch _11------------_11 1_11 2
Reading Messages
read
Read 1 or more messages from a queue. The VT specifies the delay in seconds between reading and the message becoming invisible to other consumers.
_10pgmq.read(_10 queue_name text,_10 vt integer,_10 qty integer_10)_10_10returns setof pgmq.message_record
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
vt | integer | Time in seconds that the message become invisible after reading |
qty | integer | The number of messages to read from the queue. Defaults to 1 |
Example:
_10select * from pgmq.read('my_queue', 10, 2);_10 msg_id | read_ct | enqueued_at | vt | message _10--------+---------+-------------------------------+-------------------------------+----------------------_10 1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"}_10 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}_10(2 rows)
read_with_poll
Same as read(). Also provides convenient long-poll functionality.
When there are no messages in the queue, the function call will wait for max_poll_seconds
in duration before returning.
If messages reach the queue during that duration, they will be read and returned immediately.
_10 pgmq.read_with_poll(_10 queue_name text,_10 vt integer,_10 qty integer,_10 max_poll_seconds integer default 5,_10 poll_interval_ms integer default 100_10)_10returns setof pgmq.message_record
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
vt | integer | Time in seconds that the message become invisible after reading. |
qty | integer | The number of messages to read from the queue. Defaults to 1. |
max_poll_seconds | integer | Time in seconds to wait for new messages to reach the queue. Defaults to 5. |
poll_interval_ms | integer | Milliseconds between the internal poll operations. Defaults to 100. |
Example:
_10select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100);_10 msg_id | read_ct | enqueued_at | vt | message _10--------+---------+-------------------------------+-------------------------------+--------------------_10 1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
pop
Reads a single message from a queue and deletes it upon read.
Note: utilization of pop() results in at-most-once delivery semantics if the consuming application does not guarantee processing of the message.
_10pgmq.pop(queue_name text)_10returns setof pgmq.message_record
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
Example:
_10pgmq=# select * from pgmq.pop('my_queue');_10 msg_id | read_ct | enqueued_at | vt | message _10--------+---------+-------------------------------+-------------------------------+--------------------_10 1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
Deleting/Archiving Messages
delete (single)
Deletes a single message from a queue.
_10pgmq.delete (queue_name text, msg_id: bigint)_10returns boolean
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
msg_id | bigint | Message ID of the message to delete |
Example:
_10select pgmq.delete('my_queue', 5);_10 delete _10--------_10 t
delete (batch)
Delete one or many messages from a queue.
_10pgmq.delete (queue_name text, msg_ids: bigint[])_10returns setof bigint
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
msg_ids | bigint[] | Array of message IDs to delete |
Examples:
Delete two messages that exist.
_10select * from pgmq.delete('my_queue', array[2, 3]);_10 delete _10--------_10 2_10 3
Delete two messages, one that exists and one that does not. Message 999
does not exist.
_10select * from pgmq.delete('my_queue', array[6, 999]);_10 delete_10--------_10 6
purge_queue
Permanently deletes all messages in a queue. Returns the number of messages that were deleted.
_10purge_queue(queue_name text)_10returns bigint
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
Example:
Purge the queue when it contains 8 messages;
_10select * from pgmq.purge_queue('my_queue');_10 purge_queue _10-------------_10 8
archive (single)
Removes a single requested message from the specified queue and inserts it into the queue's archive.
_10pgmq.archive(queue_name text, msg_id bigint)_10returns boolean
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
msg_id | bigint | Message ID of the message to archive |
Returns Boolean value indicating success or failure of the operation.
Example; remove message with ID 1 from queue my_queue
and archive it:
_10select * from pgmq.archive('my_queue', 1);_10 archive _10---------_10 t
archive (batch)
Deletes a batch of requested messages from the specified queue and inserts them into the queue's archive. Returns an array of message ids that were successfully archived.
_10pgmq.archive(queue_name text, msg_ids bigint[])_10RETURNS SETOF bigint
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
msg_ids | bigint[] | Array of message IDs to archive |
Examples:
Delete messages with ID 1 and 2 from queue my_queue
and move to the archive.
_10select * from pgmq.archive('my_queue', array[1, 2]);_10 archive _10---------_10 1_10 2
Delete messages 4, which exists and 999, which does not exist.
_10select * from pgmq.archive('my_queue', array[4, 999]);_10 archive _10---------_10 4
Queue Management
create
Create a new queue.
_10pgmq.create(queue_name text)_10returns void
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
Example:
_10select from pgmq.create('my_queue');_10 create _10--------
create_partitioned
Create a partitioned queue.
_10pgmq.create_partitioned (_10 queue-ue_name text,_10 partition_interval text default '10000'::text,_10 retention_interval text default '100000'::text_10)_10returns void
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
partition_interval | text | The name of the queue |
retention_interval | text | The name of the queue |
Example:
Create a queue with 100,000 messages per partition, and will retain 10,000,000 messages on old partitions. Partitions greater than this will be deleted.
_10select * from pgmq.create_partitioned(_10 'my_partitioned_queue',_10 '100000',_10 '10000000'_10);_10 create_partitioned _10--------------------
create_unlogged
Creates an unlogged table. This is useful when write throughput is more important that durability. See Postgres documentation for unlogged tables for more information.
_10pgmq.create_unlogged(queue_name text)_10returns void
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
Example:
_10select pgmq.create_unlogged('my_unlogged');_10 create_unlogged _10-----------------
detach_archive
Drop the queue's archive table as a member of the PGMQ extension. Useful for preventing the queue's archive table from being drop when drop extension pgmq
is executed.
This does not prevent the further archives() from appending to the archive table.
_10pgmq.detach_archive(queue_name text)
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
Example:
_10select * from pgmq.detach_archive('my_queue');_10 detach_archive _10----------------
drop_queue
Deletes a queue and its archive table.
_10pgmq.drop_queue(queue_name text)_10returns boolean
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
Example:
_10select * from pgmq.drop_queue('my_unlogged');_10 drop_queue _10------------_10 t
Utilities
set_vt
Sets the visibility timeout of a message to a specified time duration in the future. Returns the record of the message that was updated.
_10pgmq.set_vt(_10 queue_name text,_10 msg_id bigint,_10 vt_offset integer_10)_10returns pgmq.message_record
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
msg_id | bigint | ID of the message to set visibility time |
vt_offset | integer | Duration from now, in seconds, that the message's VT should be set to |
Example:
Set the visibility timeout of message 1 to 30 seconds from now.
_10select * from pgmq.set_vt('my_queue', 11, 30);_10 msg_id | read_ct | enqueued_at | vt | message_10--------+---------+-------------------------------+-------------------------------+----------------------_10 1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"}
list_queues
List all the queues that currently exist.
_10list_queues()_10RETURNS TABLE(_10 queue_name text,_10 created_at timestamp with time zone,_10 is_partitioned boolean,_10 is_unlogged boolean_10)
Example:
_10select * from pgmq.list_queues();_10 queue_name | created_at | is_partitioned | is_unlogged _10----------------------+-------------------------------+----------------+-------------_10 my_queue | 2023-10-28 14:13:17.092576-05 | f | f_10 my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f_10 my_unlogged | 2023-10-28 20:02:30.976109-05 | f | t
metrics
Get metrics for a specific queue.
_10pgmq.metrics(queue_name: text)_10returns table(_10 queue_name text,_10 queue_length bigint,_10 newest_msg_age_sec integer,_10 oldest_msg_age_sec integer,_10 total_messages bigint,_10 scrape_time timestamp with time zone_10)
Parameters:
Parameter | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
Returns:
Attribute | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
queue_length | bigint | Number of messages currently in the queue |
newest_msg_age_sec | integer | null | Age of the newest message in the queue, in seconds |
oldest_msg_age_sec | integer | null | Age of the oldest message in the queue, in seconds |
total_messages | bigint | Total number of messages that have passed through the queue over all time |
scrape_time | timestamp with time zone | The current timestamp |
Example:
_10select * from pgmq.metrics('my_queue');_10 queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time _10------------+--------------+--------------------+--------------------+----------------+-------------------------------_10 my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05
metrics_all
Get metrics for all existing queues.
_10pgmq.metrics_all()_10RETURNS TABLE(_10 queue_name text,_10 queue_length bigint,_10 newest_msg_age_sec integer,_10 oldest_msg_age_sec integer,_10 total_messages bigint,_10 scrape_time timestamp with time zone_10)
Returns:
Attribute | Type | Description |
---|---|---|
queue_name | text | The name of the queue |
queue_length | bigint | Number of messages currently in the queue |
newest_msg_age_sec | integer | null | Age of the newest message in the queue, in seconds |
oldest_msg_age_sec | integer | null | Age of the oldest message in the queue, in seconds |
total_messages | bigint | Total number of messages that have passed through the queue over all time |
scrape_time | timestamp with time zone | The current timestamp |
_10select * from pgmq.metrics_all();_10 queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time _10----------------------+--------------+--------------------+--------------------+----------------+-------------------------------_10 my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05_10 my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05_10 my_unlogged | 1 | 3 | 3 | 1 | 2023-10-28 20:25:07.016413-05
Types
message_record
The complete representation of a message in a queue.
Attribute Name | Type | Description |
---|---|---|
msg_id | bigint | Unique ID of the message |
read_ct | bigint | Number of times the message has been read. Increments on read(). |
enqueued_at | timestamp with time zone | time that the message was inserted into the queue |
vt | timestamp with time zone | Timestamp when the message will become available for consumers to read |
message | jsonb | The message payload |
Example:
_10 msg_id | read_ct | enqueued_at | vt | message _10--------+---------+-------------------------------+-------------------------------+--------------------_10 1 | 1 | 2023-10-28 19:06:19.941509-05 | 2023-10-28 19:06:27.419392-05 | {"hello": "world"}
Resources
- Official Docs: pgmq/api