Skip to content

Commit f869548

Browse files
authored
Merge pull request #7 from msgflo/on-message
Participant: Add dedicated onMessage function
2 parents 5114656 + 61c0daf commit f869548

2 files changed

Lines changed: 33 additions & 11 deletions

File tree

include/msgflo.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class Message {
8181
virtual std::string port() = 0;
8282
};
8383

84+
using MessageHandler = std::function<void(Message *)>;
85+
8486
class Engine;
8587

8688
class Participant {
@@ -93,14 +95,21 @@ class Participant {
9395

9496
virtual void send(std::string port, const char *data, uint64_t len) = 0;
9597

98+
virtual void onMessage(const MessageHandler &handler) = 0;
99+
96100
private:
97101
};
98102

99-
using MessageHandler = std::function<void(Message *)>;
100-
101103
class Engine {
102104
public:
103-
virtual Participant *registerParticipant(const Definition &definition, MessageHandler handler) = 0;
105+
virtual Participant *registerParticipant(const Definition &definition) = 0;
106+
107+
// API compatibility with existing signature
108+
Participant *registerParticipant(const Definition &definition, MessageHandler handler) {
109+
auto p = registerParticipant(definition);
110+
p->onMessage(handler);
111+
return p;
112+
}
104113

105114
virtual void launch() = 0;
106115
protected:

src/msgflo.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,31 @@ class DiscoveryMessage {
4444
Definition definition;
4545
};
4646

47+
void defaultMessageHandler(msgflo::Message *msg) {
48+
cout << "Warning: No message handler defined for msgflo::Participant" << endl;
49+
}
50+
4751
template<typename Engine_t>
4852
struct ParticipantRegistrationT : public Participant {
4953
Engine_t *engine;
5054
const std::vector<Definition::Port> inports;
5155
const std::vector<Definition::Port> outports;
5256
const string id;
53-
const MessageHandler handler;
57+
MessageHandler handler;
5458
const DiscoveryMessage discoveryMessage;
5559

56-
ParticipantRegistrationT(Engine_t *engine, const Definition &definition, const MessageHandler &handler)
57-
: engine(engine), inports(definition.inports), outports(definition.outports), id(generateId(definition)),
58-
handler(handler), discoveryMessage(definition) {}
60+
ParticipantRegistrationT(Engine_t *engine, const Definition &definition)
61+
: engine(engine)
62+
, inports(definition.inports)
63+
, outports(definition.outports)
64+
, id(generateId(definition))
65+
, handler(defaultMessageHandler)
66+
, discoveryMessage(definition)
67+
{}
68+
69+
void onMessage(const MessageHandler &h) {
70+
handler = h;
71+
}
5972

6073
virtual void send(std::string port, const json11::Json &json) override {
6174
send(port, json.dump());
@@ -192,9 +205,9 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
192205
});
193206
}
194207

195-
virtual Participant *registerParticipant(const Definition &definition, MessageHandler handler) override {
208+
virtual Participant *registerParticipant(const Definition &definition) override {
196209
Definition d = validateDefinitionFromUser(definition);
197-
registrations.emplace_back(this, d, handler);
210+
registrations.emplace_back(this, d);
198211
return &registrations[registrations.size() - 1];
199212
}
200213

@@ -289,9 +302,9 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
289302
virtual ~MosquittoEngine() {
290303
}
291304

292-
virtual Participant *registerParticipant(const Definition &definition, MessageHandler handler) override {
305+
virtual Participant *registerParticipant(const Definition &definition) override {
293306
Definition d = validateDefinitionFromUser(definition);
294-
registrations.emplace_back(this, d, handler);
307+
registrations.emplace_back(this, d);
295308
return &registrations[registrations.size() - 1];
296309
}
297310

0 commit comments

Comments
 (0)