A lightweight distributed message queue. Like AWS SQS and RSMQ but on Postgres.

Related tags

Utilities pgmq
Overview

Postgres Message Queue (PGMQ)

A lightweight distributed message queue. Like AWS SQS and RSMQ but on Postgres.

Features

  • Lightweight - Built with Rust and Postgres only
  • Guaranteed "exactly once" delivery of messages consumer within a visibility timeout
  • API parity with AWS SQS and RSMQ
  • Messages stay in the queue until deleted
  • Messages can be archived, instead of deleted, for long-term retention and replayability
  • Table (bloat) maintenance automated with pg_partman
  • High performance operations with index-only scans.

Table of Contents

Installation

The fastest way to get started is by running the CoreDB docker image, where PGMQ comes pre-installed.

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/coredb/pgmq-pg:latest

Client Libraries

SQL Examples

# Connect to Postgres
psql postgres://postgres:[email protected]:5432/postgres
-- create the extension, pg_partman is also required
CREATE EXTENSION pgmq CASCADE;

Creating a queue

Every queue is its own table in Postgres. The table name is the queue name prefixed with pgmq_. For example, pgmq_my_queue is the table for the queue my_queue.

Optionally, the partition_interval and retention_interval can be configured. See Configuration.

-- creates the queue

-- params
-- queue_name: text
-- partition_interval: text DEFAULT 'daily'::text
-- retention_interval: text DEFAULT '5 days'::text
SELECT pgmq_create('my_queue');

 pgmq_create
-------------

Send two messages

-- messages are sent as JSON
pgmq=# 
SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}');
-- the message id is returned from the send function
 pgmq_send 
-----------
         1
(1 row)

 pgmq_send 
-----------
         2
(1 row)

Read messages

Read 2 message from the queue. Make them invisible for 30 seconds. If the messages are not deleted or archived within 30 seconds, they will become visible again and can be read by another consumer.

pgmq=# SELECT * from pgmq_read('my_queue', 30, 2);

 msg_id | read_ct |              vt               |          enqueued_at          |    message
--------+---------+-------------------------------+-------------------------------+---------------
      1 |       1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar1"}
      2 |       1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar2"}

If the queue is empty, or if all messages are currently invisible, no rows will be returned.

pgmq=# SELECT * from pgmq_read('my_queue', 30, 1);
 msg_id | read_ct | vt | enqueued_at | message
--------+---------+----+-------------+---------

Pop a message

-- Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
pgmq=# SELECT * from pgmq_pop('my_queue');

 msg_id | read_ct |              vt               |          enqueued_at          |    message
--------+---------+-------------------------------+-------------------------------+---------------
      1 |       2 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar1"}

Archive a message

-- Archiving a message removes it from the queue, and inserts it to the archive table.
-- archive message with msg_id=2
pgmq=# SELECT * from pgmq_archive('my_queue', 2);
pgmq=#  SELECT * from pgmq_my_queue_archive;
 msg_id | read_ct |         enqueued_at          |          deleted_at           |              vt               |     message     
--------+---------+------------------------------+-------------------------------+-------------------------------+-----------------
      2 |       1 | 2023-04-25 00:55:40.68417-05 | 2023-04-25 00:56:35.937594-05 | 2023-04-25 00:56:20.532012-05 | {"foo": "bar2"}```

Delete a message

-- Delete a message id `3` from queue named `my_queue`.
pgmq=# SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}');
pgmq=# SELECT pgmq_delete('my_queue', 3);
 pgmq_delete
-------------
 t

Configuration

Partitioned Queues

pgmq queue tables are partitioned by default. pg_partman handles all maintenance of queue tables. This includes creating new partitions and dropping old partitions.

Partitions behavior is configured at the time queues are created, via pgmq_create(). This function has a three parameters:

queue_name: text : The name of the queue. Queues are Postgres tables prepended with pgmq_. For example, pgmq_my_queue.

partition_interval: text - The interval at which partitions are created. This can be either any valid Postgres Duration supported by pg_partman, or an integer value. When it is a duration, queues are partitioned by the time at which messages are sent to the table (enqueued_at). A value of daily' would create a new partition each day. When it is an integer value, queues are partitioned by the msg_id. A value of '100' will create a new partition every 100 messages. The value must agree with retention_interval (time based or numeric). The default value is daily.

retention_interval: text - The interval for retaining partitions. This can be either any valid Postgres Duration supported by pg_partman, or an integer value. When it is a duration, partitions containing data greater than the duration will be dropped. When it is an integer value,any messages that have a msg_id less than max(msg_id) - retention_interval will be dropped. For example, if the max msg_id is 100 and the retention_interval is 60, any partitions with msg_id values less than 40 will be dropped. The value must agree with partition_interval (time based or numeric). The default is '5 days'. Note: retention_interval does not apply to messages that have been deleted via pgmq_delete() or archived with pgmq_archive(). pgmq_delete() removes messages forever and pgmq_archive() moves messages to a the corresponding archive table forever (for example, pgmq_my_queue_archive).

In order for automatic partition maintenance to take place, several settings must be added to the postgresql.conf file, which is typically located in the postgres DATADIR. pg_partman_bgw.interval in postgresql.conf. Below are the default configuration values set in CoreDB docker images.

Add the following to postgresql.conf. Note, changing shared_preload_libraries requires a restart of Postgres.

pg_partman_bgw.interval sets the interval at which pg_partman conducts maintenance. This creates new partitions and dropping of partitions falling out of the retention_interval. By default, pg_partman will keep 4 partitions "ahead" of the currently active partition.

shared_preload_libraries = 'pg_partman_bgw' # requires restart of Postgrs
pg_partman_bgw.interval = 60
pg_partman_bgw.role = 'postgres'
pg_partman_bgw.dbname = 'postgres'

Visibility Timeout (vt)

pgmq guarantees exactly once delivery of a message within a visibility timeout. The visibility timeout is the amount of time a message is invisible to other consumers after it has been read by a consumer. If the message is NOT deleted or archived within the visibility timeout, it will become visible again and can be read by another consumer. The visibility timeout is set when a message is read from the queue, via pgmq_read(). It is recommended to set a vt value that is greater than the expected time it takes to process a message. After the application successfully processes the message, it should call pgmq_delete() to completely remove the message from the queue or pgmq_archive() to move it to the archive table for the queue.

You might also like...
Easy switch between AWS Profiles and Regions
Easy switch between AWS Profiles and Regions

AWSP - CLI To Manage your AWS Profiles! AWSP provides an interactive terminal to interact with your AWS Profiles. The aim of this project is to make i

Cookiecutter templates for Serverless applications using AWS SAM and the Rust programming language.

Cookiecutter SAM template for Lambda functions in Rust This is a Cookiecutter template to create a serverless application based on the Serverless Appl

This repo scans pypi, rubygems and hexpm for AWS keys

What is this? This is a project to try to detect any AWS access keys that are accidentally uploaded to the Python Package Index (PyPi). New uploads ar

Remote Secret Editor for AWS Secret Manager

Barberousse - Remote Secrets Editor About Usage Options Printing Editing Copying RoadMap 1.0 1.1 Future About A project aimed to avoid downloading sec

Rust client for AWS Infinidash service.
Rust client for AWS Infinidash service.

AWS Infinidash - Fully featured Rust client Fully featured AWS Infinidash client for Rust applications. You can use the AWS Infinidash client to make

Rusoto is an AWS SDK for Rust
Rusoto is an AWS SDK for Rust

Rusoto is an AWS SDK for Rust You may be looking for: An overview of Rusoto AWS services supported by Rusoto API documentation Getting help with Rusot

Simple fake AWS Cognito User Pool API server for development.

Fakey Cognito 🏡 Homepage Simple fake AWS Cognito API server for development. ✅ Implemented features AdminXxx on User Pools API. Get Started # run wit

A tool to run web applications on AWS Lambda without changing code.
A tool to run web applications on AWS Lambda without changing code.

AWS Lambda Adapter A tool to run web applications on AWS Lambda without changing code. How does it work? AWS Lambda Adapter supports AWS Lambda functi

📦 🚀 a smooth-talking smuggler of Rust HTTP functions into AWS lambda
📦 🚀 a smooth-talking smuggler of Rust HTTP functions into AWS lambda

lando 🚧 maintenance mode ahead 🚧 As of this announcement AWS not officialy supports Rust through this project. As mentioned below this projects goal

Comments
  • Bump openssl from 0.10.54 to 0.10.55

    Bump openssl from 0.10.54 to 0.10.55

    Bumps openssl from 0.10.54 to 0.10.55.

    Release notes

    Sourced from openssl's releases.

    openssl-v0.10.55

    What's Changed

    New Contributors

    Full Changelog: https://github.com/sfackler/rust-openssl/compare/openssl-v0.10.54...openssl-v0.10.55

    Commits
    • d7dae6f Merge pull request #1970 from alex/bump-for-release
    • 983b9e2 Release openssl v0.10.55 and openssl-sys v0.9.89
    • 28b3925 Merge pull request #1966 from tesuji/tidy-old-msrv
    • f03a2dc Merge pull request #1968 from sfackler/empty-domain-segfault
    • 155b3dc Fix handling of empty host strings
    • 9784356 chore: simplify cfg attributes
    • 8ab3c3f update min-version passed to bindgen
    • 8587ff8 chore: use pre-existing clean APIs instead
    • b1e16e9 clippy: use strip_prefix instead of manually strip
    • fb5ae60 clippy: remove unused allow attributes
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the Security Alerts page.
    dependencies 
    opened by dependabot[bot] 0
  • [pgmq] Validate messages using pg_jsonschema

    [pgmq] Validate messages using pg_jsonschema

    I see that jsonschema is already used in the repo (but to the best of my ability I can't find it being used in pgmq), and I want to point out that pgmq could use pg_jsonschema (which is just a thin wrapper on the jsonschema crate) to prevent adding malformed messages to the queue.

    I think that it would be useful if each pgmq deployment could optionally have a constraint on the message jsonb column of the queue using an user-provided json schema. It should also be possible to update the constraint with a new json schema, but maybe only if the queue is empty.

    The archive is a more delicate matter because it is expected that the json schema evolves as new kinds of messages appear, but old messages aren't supposed to always follow the new schema. In this case my preferred solution would be to add a "type" column with a string value containing a type, and have the constraint check the message using a json schema that corresponds to the type in question (this works better if the type is generated by something like obake).

    If pausing the queue for new events isn't desirable when updating the json schema of the messages, then maybe also adding a type column to the queue table makes sense too (with the understanding that new events must be added with the last type only, but other types are permitted to stay in the queue awaiting processing). This also would enable clients to avoid attempting to receiving a message if they don't support a particular type.

    Well all of this adds complexity and not everyone would want to validate their messages in the database. But, I think it's worth it anyway.

    Another concern is that validating json schemas might be too slow. I don't think it is in practice (jsonschema is very fast) but there are still low hanging fruits in pg_jsonschema, like this issue regarding using a cache to avoid re-parsing the json schema every time it is checked.

    enhancement 
    opened by dlight 1
Owner
Tembo
Tembo
Examples of how to use Rust with Serverless Framework, Lambda, API Gateway v1 and v2, SQS, GraphQL, etc

Rust Serverless Examples All examples live in their own directories: project: there is nothing here, just a simple cargo new project_name with a custo

Fernando Daciuk 9 Dec 17, 2022
Aws-sdk-rust - AWS SDK for the Rust Programming Language

The AWS SDK for Rust This repo contains the new AWS SDK for Rust (the SDK) and its public roadmap. Please Note: The SDK is currently released as a dev

Amazon Web Services - Labs 2k Jan 3, 2023
Rs.aws-login - A command line utility to simplify logging into AWS services.

aws-login A command line utility to simplify logging into AWS accounts and services. $ aws-login use ? Please select a profile to use: › ❯ dev-read

Kevin Herrera 11 Oct 30, 2022
A dynamically prioritizable priority queue.

bheap A generic binary max heap implementation for implementing a dynamically prioritizable priority queue. This implementation uses a vector as the u

Arindam Das 4 Sep 21, 2022
A naive buffered/sync channel implementation in Rust, using the queue data structure

buffered-queue-rs Introduction This is my attempt at a simple and very naive buffered/synced queue implementation in Rust. The base thread-safe queue

Dhruv 4 Jul 22, 2023
Like jq, but for HTML. Uses CSS selectors to extract bits content from HTML files.

Like jq, but for HTML. Uses CSS selectors to extract bits content from HTML files. Mozilla's MDN has a good reference for CSS selector syntax.

Michael Maclean 6.3k Jan 3, 2023
The tool like Browserslist, but written in Rust.

browserslist-rs The tool like Browserslist, but written in Rust. Try it out Before trying this crate, you're required to get Rust installed. Then, clo

Pig Fang 76 Nov 29, 2022
Freebsd-embedded-hal - Like linux-embedded-hal but FreeBSD

freebsd-embedded-hal Implementation of embedded-hal traits for FreeBSD devices: gpio: using libgpio, with stateful and toggleable support, with suppor

null 2 Oct 1, 2022
Booru software for the 21st century. (Name is supposed to be like Puro, the big monster, but I failed..)

Pooru Booru software for the 21st century. Setup Setup is a little funky, but I hope to fix this funkyness down the road. First and foremost, you will

null 2 May 8, 2022
It's like Circus but totally different.

Read I do not know Rust. If you see something that is being done in a suboptimal way at a language-level, I'd love to hear it. If you want to argue ab

zkxjzmswkwl 4 Oct 5, 2023