Skip to content

DavidBM/rsmq-async-rs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

103 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rsmq-async

Crates.io Docs.rs Crates.io dependency status

An async Rust port of RSMQ — a lightweight message queue built entirely on Redis. No extra infrastructure, no brokers, just Redis.

Wire-compatible with the original JavaScript implementation: messages enqueued by a JS server can be consumed by a Rust worker and vice versa.


How it works

RSMQ stores queues as Redis sorted sets. Each message gets a score based on its delivery time, so delayed and hidden messages are naturally ordered. All queue operations are executed as atomic Lua scripts, making them safe under concurrent access.

  • At-least-once delivery — receive a message, process it, then delete it. If your worker crashes mid-flight, the message becomes visible again after its visibility timeout.
  • At-most-once delivery — use pop_message to receive and delete in one atomic step.
  • Delayed messages — messages can be held back for a configurable duration before becoming visible.
  • Per-message visibility timeouts — received messages stay hidden for a configurable window, giving your worker time to process them.

Quick start

[dependencies]
rsmq_async = "16"
use rsmq_async::{Rsmq, RsmqConnection, RsmqError};

#[tokio::main]
async fn main() -> Result<(), RsmqError> {
    let mut rsmq = Rsmq::new(Default::default()).await?;

    rsmq.create_queue("jobs", None, None, None).await?;

    // Send a message
    rsmq.send_message("jobs", "hello from Rust", None).await?;

    // Receive and process it
    if let Some(msg) = rsmq.receive_message::<String>("jobs", None).await? {
        println!("Got: {}", msg.message);
        rsmq.delete_message("jobs", &msg.id).await?;
    }

    Ok(())
}

Always delete_message after successfully processing — this is what confirms delivery.


Implementations

Three implementations are provided, all behind the same RsmqConnection trait.

Type Use when
Rsmq Start here. A single multiplexed connection handles concurrent operations efficiently — no pool overhead, no contention. Right for the vast majority of workloads.
PooledRsmq You're sending large payloads (images, documents, big blobs) and one slow operation blocking the shared connection becomes a problem.
RsmqSync You're in a sync context. Wraps Rsmq in a Tokio runtime. Requires the sync feature.

Write code against the trait to stay implementation-agnostic:

use rsmq_async::RsmqConnection;

async fn process(rsmq: &mut impl RsmqConnection) {
    // works with Rsmq, PooledRsmq, or RsmqSync
}

Connection pool

use rsmq_async::{PooledRsmq, PoolOptions, RsmqConnection};

let pool_opts = PoolOptions { max_size: Some(20), min_idle: Some(5) };
let mut rsmq = PooledRsmq::new(Default::default(), pool_opts).await?;

Sync wrapper

use rsmq_async::{RsmqSync, RsmqConnectionSync};

let mut rsmq = RsmqSync::new(Default::default()).await?;
rsmq.send_message("myqueue", "hello", None)?;

Message types

send_message, receive_message, and pop_message are generic over your message type.

Built-in implementations cover the common cases:

// Send and receive strings
rsmq.send_message("q", "hello", None).await?;
let msg = rsmq.receive_message::<String>("q", None).await?;

// Or raw bytes
rsmq.send_message("q", vec![0u8, 1, 2], None).await?;
let msg = rsmq.receive_message::<Vec<u8>>("q", None).await?;

For custom types, implement TryFrom<RedisBytes> to receive and Into<RedisBytes> to send:

use rsmq_async::RedisBytes;

struct MyPayload { /* ... */ }

impl TryFrom<RedisBytes> for MyPayload {
    type Error = Vec<u8>; // must be Vec<u8>

    fn try_from(b: RedisBytes) -> Result<Self, Vec<u8>> {
        // deserialize from b.0
    }
}

impl From<MyPayload> for RedisBytes {
    fn from(p: MyPayload) -> RedisBytes {
        RedisBytes(/* serialize to bytes */)
    }
}

Queue configuration

use std::time::Duration;

rsmq.create_queue(
    "jobs",
    Some(Duration::from_secs(30)),  // visibility timeout (default: 30s)
    Some(Duration::from_secs(0)),   // delivery delay    (default: 0)
    Some(65536),                    // max message size in bytes, -1 for unlimited (default: 65536)
).await?;

Queue attributes can be updated after creation:

rsmq.set_queue_attributes("jobs", Some(Duration::from_secs(60)), None, None).await?;

You can inspect queue stats at any time:

let attrs = rsmq.get_queue_attributes("jobs").await?;
println!("messages: {}, hidden: {}", attrs.msgs, attrs.hiddenmsgs);

Realtime notifications

Set realtime: true in RsmqOptions to have RSMQ publish to {ns}:rt:{qname} on every send_message. Subscribe with redis-rs to wake workers immediately instead of polling.

use rsmq_async::RsmqOptions;

let mut rsmq = Rsmq::new(RsmqOptions { realtime: true, ..Default::default() }).await?;

Use a single subscriber per queue — multiple workers listening on the same SUBSCRIBE channel and racing to call receive_message is a common mistake.


Features

Feature Default Description
tokio-comp yes Tokio async runtime support
smol-comp no smol async runtime support
sync yes Enables RsmqSync and RsmqConnectionSync
break-js-comp no Microsecond-precision scores and IDs (see below)

To use the smol runtime instead of Tokio:

rsmq_async = { version = "16", default-features = false, features = ["smol-comp"] }

break-js-comp

By default, rsmq-async is wire-compatible with the JS library: message IDs encode microseconds in base36 (matching JS) and queue scores are stored in milliseconds.

Enabling break-js-comp switches scores to full microsecond precision. This gives finer ordering for high-throughput queues but means messages written by a JS server and a Rust server with break-js-comp will have mismatched score units — don't mix the two on the same queue.

rsmq_async = { version = "16", features = ["break-js-comp"] }

Delivery guarantees

Pattern How
At least once receive_message + delete_message after success. Failures re-deliver after visibility timeout.
At most once pop_message atomically dequeues and deletes.

Development

Start a local Redis with Docker:

docker run -d --name redis-test -p 6379:6379 redis:latest

Run the tests (sequential is required — tests share the same Redis DB):

cargo test -- --test-threads=1

To target a different Redis instance:

REDIS_URL=127.0.0.1:6380 cargo test -- --test-threads=1

About

RSMQ port to async rust. RSMQ is a simple redis queue system that works in any redis v2.6+

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Sponsor this project

 

Packages

 
 
 

Contributors