An async Rust port of RSMQ — a lightweight message queue built entirely on Redis. No extra infrastructure, no brokers, just Redis.
Wire-compatible with the original JavaScript implementation: messages enqueued by a JS server can be consumed by a Rust worker and vice versa.
RSMQ stores queues as Redis sorted sets. Each message gets a score based on its delivery time, so delayed and hidden messages are naturally ordered. All queue operations are executed as atomic Lua scripts, making them safe under concurrent access.
- At-least-once delivery — receive a message, process it, then delete it. If your worker crashes mid-flight, the message becomes visible again after its visibility timeout.
- At-most-once delivery — use
pop_messageto receive and delete in one atomic step. - Delayed messages — messages can be held back for a configurable duration before becoming visible.
- Per-message visibility timeouts — received messages stay hidden for a configurable window, giving your worker time to process them.
[dependencies]
rsmq_async = "16"use rsmq_async::{Rsmq, RsmqConnection, RsmqError};
#[tokio::main]
async fn main() -> Result<(), RsmqError> {
let mut rsmq = Rsmq::new(Default::default()).await?;
rsmq.create_queue("jobs", None, None, None).await?;
// Send a message
rsmq.send_message("jobs", "hello from Rust", None).await?;
// Receive and process it
if let Some(msg) = rsmq.receive_message::<String>("jobs", None).await? {
println!("Got: {}", msg.message);
rsmq.delete_message("jobs", &msg.id).await?;
}
Ok(())
}Always
delete_messageafter successfully processing — this is what confirms delivery.
Three implementations are provided, all behind the same RsmqConnection trait.
| Type | Use when |
|---|---|
Rsmq |
Start here. A single multiplexed connection handles concurrent operations efficiently — no pool overhead, no contention. Right for the vast majority of workloads. |
PooledRsmq |
You're sending large payloads (images, documents, big blobs) and one slow operation blocking the shared connection becomes a problem. |
RsmqSync |
You're in a sync context. Wraps Rsmq in a Tokio runtime. Requires the sync feature. |
Write code against the trait to stay implementation-agnostic:
use rsmq_async::RsmqConnection;
async fn process(rsmq: &mut impl RsmqConnection) {
// works with Rsmq, PooledRsmq, or RsmqSync
}use rsmq_async::{PooledRsmq, PoolOptions, RsmqConnection};
let pool_opts = PoolOptions { max_size: Some(20), min_idle: Some(5) };
let mut rsmq = PooledRsmq::new(Default::default(), pool_opts).await?;use rsmq_async::{RsmqSync, RsmqConnectionSync};
let mut rsmq = RsmqSync::new(Default::default()).await?;
rsmq.send_message("myqueue", "hello", None)?;send_message, receive_message, and pop_message are generic over your message type.
Built-in implementations cover the common cases:
// Send and receive strings
rsmq.send_message("q", "hello", None).await?;
let msg = rsmq.receive_message::<String>("q", None).await?;
// Or raw bytes
rsmq.send_message("q", vec![0u8, 1, 2], None).await?;
let msg = rsmq.receive_message::<Vec<u8>>("q", None).await?;For custom types, implement TryFrom<RedisBytes> to receive and Into<RedisBytes> to send:
use rsmq_async::RedisBytes;
struct MyPayload { /* ... */ }
impl TryFrom<RedisBytes> for MyPayload {
type Error = Vec<u8>; // must be Vec<u8>
fn try_from(b: RedisBytes) -> Result<Self, Vec<u8>> {
// deserialize from b.0
}
}
impl From<MyPayload> for RedisBytes {
fn from(p: MyPayload) -> RedisBytes {
RedisBytes(/* serialize to bytes */)
}
}use std::time::Duration;
rsmq.create_queue(
"jobs",
Some(Duration::from_secs(30)), // visibility timeout (default: 30s)
Some(Duration::from_secs(0)), // delivery delay (default: 0)
Some(65536), // max message size in bytes, -1 for unlimited (default: 65536)
).await?;Queue attributes can be updated after creation:
rsmq.set_queue_attributes("jobs", Some(Duration::from_secs(60)), None, None).await?;You can inspect queue stats at any time:
let attrs = rsmq.get_queue_attributes("jobs").await?;
println!("messages: {}, hidden: {}", attrs.msgs, attrs.hiddenmsgs);Set realtime: true in RsmqOptions to have RSMQ publish to {ns}:rt:{qname} on every send_message. Subscribe with redis-rs to wake workers immediately instead of polling.
use rsmq_async::RsmqOptions;
let mut rsmq = Rsmq::new(RsmqOptions { realtime: true, ..Default::default() }).await?;Use a single subscriber per queue — multiple workers listening on the same SUBSCRIBE channel and racing to call
receive_messageis a common mistake.
| Feature | Default | Description |
|---|---|---|
tokio-comp |
yes | Tokio async runtime support |
smol-comp |
no | smol async runtime support |
sync |
yes | Enables RsmqSync and RsmqConnectionSync |
break-js-comp |
no | Microsecond-precision scores and IDs (see below) |
To use the smol runtime instead of Tokio:
rsmq_async = { version = "16", default-features = false, features = ["smol-comp"] }By default, rsmq-async is wire-compatible with the JS library: message IDs encode microseconds in base36 (matching JS) and queue scores are stored in milliseconds.
Enabling break-js-comp switches scores to full microsecond precision. This gives finer ordering for high-throughput queues but means messages written by a JS server and a Rust server with break-js-comp will have mismatched score units — don't mix the two on the same queue.
rsmq_async = { version = "16", features = ["break-js-comp"] }| Pattern | How |
|---|---|
| At least once | receive_message + delete_message after success. Failures re-deliver after visibility timeout. |
| At most once | pop_message atomically dequeues and deletes. |
Start a local Redis with Docker:
docker run -d --name redis-test -p 6379:6379 redis:latestRun the tests (sequential is required — tests share the same Redis DB):
cargo test -- --test-threads=1To target a different Redis instance:
REDIS_URL=127.0.0.1:6380 cargo test -- --test-threads=1