Skip to content

Commit 935118f

Browse files
committed
feat(redis): add support for useMessageBuffer option when using cluster
1 parent 00e8e7a commit 935118f

3 files changed

Lines changed: 68 additions & 2 deletions

File tree

src/transports/redis.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type {
1717
Serializable,
1818
SubscribeHandler,
1919
RedisTransportConfig,
20+
RedisTransportOptions,
2021
} from '../types/main.js'
2122

2223
export function redis(config: RedisTransportConfig, encoder?: TransportEncoder) {
@@ -33,10 +34,15 @@ export class RedisTransport implements Transport {
3334

3435
constructor(path: string, encoder?: TransportEncoder)
3536
constructor(options: RedisTransportConfig, encoder?: TransportEncoder)
36-
constructor(connection: Redis | Cluster, encoder?: TransportEncoder)
37+
constructor(
38+
connection: Redis | Cluster,
39+
encoder?: TransportEncoder,
40+
options?: RedisTransportOptions
41+
)
3742
constructor(
3843
options: RedisTransportConfig | string | Redis | Cluster,
39-
encoder?: TransportEncoder
44+
encoder?: TransportEncoder,
45+
transportOptions?: RedisTransportOptions
4046
) {
4147
this.#encoder = encoder ?? new JsonEncoder()
4248

@@ -47,6 +53,7 @@ export class RedisTransport implements Transport {
4753
if (options instanceof Redis || options instanceof Cluster) {
4854
this.#publisher = options.duplicate()
4955
this.#subscriber = options.duplicate()
56+
this.#useMessageBuffer = transportOptions?.useMessageBuffer ?? false
5057
return
5158
}
5259

src/types/main.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ export interface RedisTransportConfig extends RedisOptions {
3939
useMessageBuffer?: boolean
4040
}
4141

42+
export interface RedisTransportOptions {
43+
/**
44+
* If true, we will use `messageBuffer` event instead of `message` event
45+
* that is emitted by ioredis. `messageBuffer` will returns a buffer instead
46+
* of a string and this is useful when you are dealing with binary data.
47+
*/
48+
useMessageBuffer?: boolean
49+
}
50+
4251
export enum MqttProtocol {
4352
MQTT = 'mqtt',
4453
MQTTS = 'mqtts',

tests/drivers/redis_transport.spec.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,54 @@ test.group('Redis Transport', (group) => {
206206
})
207207
.waitForDone()
208208
.skip(!!process.env.CI, 'Skipping cluster test on CI')
209+
210+
test('send binary data using useMessageBuffer with existing redis instance', async ({
211+
assert,
212+
cleanup,
213+
}, done) => {
214+
assert.plan(1)
215+
216+
class BinaryEncoder implements TransportEncoder {
217+
encode(message: TransportMessage<any>) {
218+
return Buffer.from(JSON.stringify(message))
219+
}
220+
221+
decode(data: string | Buffer) {
222+
const buffer = Buffer.isBuffer(data) ? data : Buffer.from(data, 'binary')
223+
return JSON.parse(buffer.toString())
224+
}
225+
}
226+
227+
const redisInstance = new Redis({
228+
host: container.getHost(),
229+
port: container.getMappedPort(6379),
230+
})
231+
232+
cleanup(async () => {
233+
await redisInstance.quit()
234+
})
235+
236+
const transport1 = new RedisTransport(redisInstance, new BinaryEncoder(), {
237+
useMessageBuffer: true,
238+
}).setId('bus1')
239+
240+
const transport2 = new RedisTransport(redisInstance, new BinaryEncoder(), {
241+
useMessageBuffer: true,
242+
}).setId('bus2')
243+
244+
cleanup(async () => {
245+
await transport1.disconnect()
246+
await transport2.disconnect()
247+
})
248+
249+
const data = ['foo', '👍']
250+
251+
await transport1.subscribe('testing-channel', (payload) => {
252+
assert.deepEqual(payload, data)
253+
done()
254+
})
255+
256+
await setTimeout(200)
257+
await transport2.publish('testing-channel', data)
258+
}).waitForDone()
209259
})

0 commit comments

Comments
 (0)