Skip to content

Commit 4a26937

Browse files
committed
feat(redis): add redis cluster support in
1 parent c4c8232 commit 4a26937

4 files changed

Lines changed: 101 additions & 5 deletions

File tree

compose.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
services:
2+
redis:
3+
image: redis:7.2-alpine
4+
ports:
5+
- "6379:6379"
6+
7+
redis-cluster:
8+
profiles:
9+
- cluster
10+
image: grokzen/redis-cluster:7.0.8
11+
environment:
12+
- IP=0.0.0.0
13+
- INITIAL_PORT=7000
14+
- MASTERS=3
15+
- SLAVES_PER_MASTER=0
16+
ports:
17+
- "7000:7000"
18+
- "7001:7001"
19+
- "7002:7002"

src/transports/redis.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* @copyright BoringNode
66
*/
77

8-
import { Redis } from 'ioredis'
8+
import { Redis, Cluster } from 'ioredis'
99
import { assert } from '@poppinss/utils/assert'
1010

1111
import debug from '../debug.js'
@@ -24,21 +24,36 @@ export function redis(config: RedisTransportConfig, encoder?: TransportEncoder)
2424
}
2525

2626
export class RedisTransport implements Transport {
27-
readonly #publisher: Redis
28-
readonly #subscriber: Redis
27+
readonly #publisher: Redis | Cluster
28+
readonly #subscriber: Redis | Cluster
2929
readonly #encoder: TransportEncoder
3030
readonly #useMessageBuffer: boolean = false
3131

3232
#id: string | undefined
3333

3434
constructor(path: string, encoder?: TransportEncoder)
3535
constructor(options: RedisTransportConfig, encoder?: TransportEncoder)
36-
constructor(options: RedisTransportConfig | string, encoder?: TransportEncoder) {
36+
constructor(connection: Redis | Cluster, encoder?: TransportEncoder)
37+
constructor(
38+
options: RedisTransportConfig | string | Redis | Cluster,
39+
encoder?: TransportEncoder
40+
) {
41+
this.#encoder = encoder ?? new JsonEncoder()
42+
43+
/**
44+
* If an existing Redis or Cluster instance is passed, we duplicate it
45+
* to have separate connections for publisher and subscriber
46+
*/
47+
if (options instanceof Redis || options instanceof Cluster) {
48+
this.#publisher = options.duplicate()
49+
this.#subscriber = options.duplicate()
50+
return
51+
}
52+
3753
// @ts-expect-error - merged definitions of overloaded constructor is not public
3854
this.#publisher = new Redis(options)
3955
// @ts-expect-error - merged definitions of overloaded constructor is not public
4056
this.#subscriber = new Redis(options)
41-
this.#encoder = encoder ?? new JsonEncoder()
4257

4358
if (typeof options === 'object') {
4459
this.#useMessageBuffer = options.useMessageBuffer ?? false

src/types/main.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import type { RedisOptions } from 'ioredis'
99
import type { IClientOptions } from 'mqtt'
10+
11+
export type { Redis, Cluster } from 'ioredis'
1012
export type TransportFactory = () => Transport
1113

1214
/**

tests/drivers/redis_transport.spec.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import { setTimeout } from 'node:timers/promises'
99
import { test } from '@japa/runner'
10+
import { Redis, Cluster } from 'ioredis'
1011
import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis'
1112
import { RedisTransport } from '../../src/transports/redis.js'
1213
import { JsonEncoder } from '../../src/encoders/json_encoder.js'
@@ -146,4 +147,63 @@ test.group('Redis Transport', (group) => {
146147
await setTimeout(200)
147148
await transport2.publish('testing-channel', data)
148149
}).waitForDone()
150+
151+
test('should work with an existing redis instance', async ({ assert, cleanup }, done) => {
152+
assert.plan(1)
153+
154+
const redisInstance = new Redis({
155+
host: container.getHost(),
156+
port: container.getMappedPort(6379),
157+
})
158+
159+
cleanup(async () => {
160+
await redisInstance.quit()
161+
})
162+
163+
const transport1 = new RedisTransport(redisInstance).setId('bus1')
164+
const transport2 = new RedisTransport(redisInstance).setId('bus2')
165+
166+
cleanup(async () => {
167+
await transport1.disconnect()
168+
await transport2.disconnect()
169+
})
170+
171+
await transport1.subscribe('testing-channel', (payload) => {
172+
assert.equal(payload, 'test')
173+
done()
174+
})
175+
176+
await setTimeout(200)
177+
178+
await transport2.publish('testing-channel', 'test')
179+
}).waitForDone()
180+
181+
test('should work with an existing cluster instance', async ({ assert, cleanup }, done) => {
182+
assert.plan(1)
183+
184+
const cluster = new Cluster([{ host: '127.0.0.1', port: 7000 }])
185+
186+
cleanup(async () => {
187+
await cluster.quit()
188+
})
189+
190+
const transport1 = new RedisTransport(cluster).setId('bus1')
191+
const transport2 = new RedisTransport(cluster).setId('bus2')
192+
193+
cleanup(async () => {
194+
await transport1.disconnect()
195+
await transport2.disconnect()
196+
})
197+
198+
await transport1.subscribe('testing-channel', (payload) => {
199+
assert.equal(payload, 'test')
200+
done()
201+
})
202+
203+
await setTimeout(200)
204+
205+
await transport2.publish('testing-channel', 'test')
206+
})
207+
.waitForDone()
208+
.skip(!!process.env.CI, 'Skipping cluster test on CI')
149209
})

0 commit comments

Comments
 (0)