Skip to content

Commit f1f2e58

Browse files
committed
routing/MQTT: Implement data subscription
1 parent 0bd2612 commit f1f2e58

1 file changed

Lines changed: 20 additions & 2 deletions

File tree

src/routing.coffee

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ bindingId = (f, t) ->
3030
class Binder
3131
constructor: (@transport) ->
3232
@bindings = {}
33+
@subscriptions = {}
3334

3435
addBinding: (binding, callback) ->
3536
from = binding.src
@@ -43,8 +44,15 @@ class Binder
4344
binding = @bindings[id]
4445
return if not binding?.enabled
4546
debug 'edge message', from, to, msg
47+
48+
subscription = @subscriptions[id]
49+
if subscription
50+
for subCallback in subscription.handlers
51+
subCallback(subscription.binding, msg.data)
52+
4653
@transport.sendTo 'outqueue', to, msg.data, (err) ->
4754
throw err if err
55+
4856
@transport.subscribeToQueue from, handler, (err) =>
4957
return callback err if err
5058
@bindings[id] =
@@ -69,9 +77,17 @@ class Binder
6977
return callback null, []
7078

7179
subscribeData: (binding, datahandler, callback) ->
72-
throw new Error 'Not Implemented'
80+
id = bindingId binding.src, binding.tgt
81+
@subscriptions[id] = { handlers: [], binding: binding } if not @subscriptions[id]
82+
@subscriptions[id].handlers.push datahandler
83+
return callback null
7384
unsubscribeData: (binding, datahandler, callback) ->
74-
throw new Error 'Not Implemented'
85+
id = bindingId binding.src, binding.tgt
86+
subscription = @subscriptions[id]
87+
handlerIndex = subscription.handlers.indexOf datahandler
88+
return callback new Error "Subscription was not found" if handlerIndex == -1
89+
subscription.handlers = subscription.handlers.splice(handlerIndex, 1)
90+
return callback null
7591

7692
exports.Binder = Binder
7793
exports.binderMixin = (transport) ->
@@ -80,4 +96,6 @@ exports.binderMixin = (transport) ->
8096
transport.addBinding = b.addBinding.bind b
8197
transport.removeBinding = b.removeBinding.bind b
8298
transport.listBindings = b.listBindings.bind b
99+
transport.subscribeData = b.subscribeData.bind b
100+
transport.unsubscribeData = b.unsubscribeData.bind b
83101

0 commit comments

Comments
 (0)