Async iterables over Redis streams. Requires Redis 5+ (6.2+ for claimIdleTime).
This package includes two higher-level modules built on RedisStream:
- Queue -- Job queue with workers, concurrency, retries, backoff, dead-letter queues, and output streams.
import { Queue, Worker } from 'redis-x-stream/queue' - Cache -- Distributed single-flight cache with two-tier storage (in-process + Redis), cross-instance broadcast via Redis Streams, and automatic thundering herd prevention.
import { SingleFlightCache } from 'redis-x-stream/cache'
npm install redis-x-stream ioredisimport { RedisStream } from 'redis-x-stream'
for await (const [stream, [id, keyvals]] of new RedisStream('my-stream')) {
console.log(stream, id, keyvals) // 'my-stream', '1234-0', ['key', 'value']
}const stream = new RedisStream({
streams: ['my-stream'],
group: 'my-group',
consumer: 'worker-1',
block: Infinity,
count: 10,
ackOnIterate: true,
deleteOnAck: true,
})
for await (const [name, [id, keyvals]] of stream) {
await process(keyvals)
}Groups and consumers are created automatically. On startup, pending entries (PEL) are re-delivered before new entries.
Pass a parse callback to transform the raw key-value array. The return type flows
through the generic to the iterator.
interface Order {
product: string
qty: number
}
const stream = new RedisStream<Order>({
stream: 'orders',
group: 'workers',
ackOnIterate: true,
parse: (id, kv) => ({ product: kv[1], qty: Number(kv[3]) }),
})
for await (const [name, [id, order]] of stream) {
order.product // string
order.qty // number
}claimIdleTime uses XAUTOCLAIM (Redis 6.2+) to automatically claim entries
from consumers that have been idle too long. Claimed entries are yielded alongside
new entries.
const stream = new RedisStream({
streams: ['tasks'],
group: 'workers',
consumer: 'worker-1',
block: Infinity,
claimIdleTime: 30_000, // claim entries idle > 30s
ackOnIterate: true,
})Add streams at runtime, even while blocked:
const stream = new RedisStream({
streams: ['stream-a'],
block: Infinity,
})
// later, from another context:
stream.addStream('stream-b')// Graceful shutdown: finish PEL, flush acks, stop
await stream.drain()
// Immediate stop: flush acks, close connections
await stream.quit()
// break also cleans up (via try/finally)
for await (const entry of stream) {
if (done) break // connections are closed automatically
}When the consumer is slow or the reader is blocked, acks can pile up.
flushPendingAckInterval adds a watchdog timer that flushes pending acks
if no new acks are queued within the interval.
const stream = new RedisStream({
streams: ['tasks'],
group: 'workers',
ackOnIterate: true,
flushPendingAckInterval: 5000, // flush every 5s of inactivity
})Query stream and consumer group state without dropping to raw ioredis:
const info = await stream.info() // XINFO STREAM (per stream)
const groups = await stream.groups() // XINFO GROUPS
const consumers = await stream.consumers() // XINFO CONSUMERS
const pending = await stream.pending() // XPENDING summaryAll return typed objects (StreamInfo, GroupInfo, ConsumerInfo, PendingSummary).
Connection lifecycle events are forwarded from ioredis:
stream.on('error', (err) => console.error(err))
stream.on('ready', () => console.log('connected'))
stream.on('close', () => console.log('disconnected'))
stream.on('reconnecting', () => console.log('reconnecting'))| Option | Type | Default | Description |
|---|---|---|---|
streams / stream |
string | string[] | Record<string, string> |
Stream keys to read | |
group |
string |
Consumer group name | |
consumer |
string |
auto | Consumer name |
redis |
Redis | string | RedisOptions |
new Redis() |
Reader connection |
redisControl |
Redis | string | RedisOptions |
auto | Control connection (blocking mode) |
block |
number |
Block timeout in ms (Infinity for indefinite) |
|
count |
number |
100 |
Max entries per read |
ackOnIterate |
boolean |
false |
Auto-ack previous entry on each iteration |
deleteOnAck |
boolean |
false |
XDEL after XACK |
noack |
boolean |
false |
Bypass PEL (NOACK flag) |
claimIdleTime |
number |
Min idle ms for XAUTOCLAIM (Redis 6.2+) | |
flushPendingAckInterval |
number | null |
null |
Ack flush watchdog timer in ms |
parse |
(id, kv, stream) => T |
Entry transform callback | |
buffers |
boolean |
false |
Return Buffers instead of strings |
MIT