Skip to content

Commit cc9e08b

Browse files
committed
Message: Add port(), for knowing which inport port was sent on
1 parent 449342c commit cc9e08b

2 files changed

Lines changed: 22 additions & 8 deletions

File tree

include/msgflo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class Message {
7777
virtual void ack() = 0;
7878

7979
virtual void nack() = 0;
80+
81+
virtual std::string port() = 0;
8082
};
8183

8284
class Engine;

src/msgflo.cpp

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,28 @@ struct ParticipantRegistrationT : public Participant {
8585

8686
class AbstractMessage : public Message {
8787
protected:
88-
AbstractMessage(const char *data, const uint64_t len) : _data(data), _len(len) {}
88+
AbstractMessage(const char *data, const uint64_t len, const std::string &port)
89+
: _data(data)
90+
, _len(len)
91+
, _port(port)
92+
{}
8993

9094
virtual ~AbstractMessage() {};
9195

9296
const char *_data;
9397
const uint64_t _len;
98+
const std::string _port;
9499

95100
public:
96101
virtual void data(const char **data, uint64_t *len) override {
97102
*data = this->_data;
98103
*len = this->_len;
99104
}
100105

106+
virtual std::string port() override {
107+
return _port;
108+
}
109+
101110
virtual std::string asString() override {
102111
string str(_data, _len);
103112
return str;
@@ -145,8 +154,11 @@ class AbstractEngine {
145154
class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
146155

147156
struct AmqpMessage final : public AbstractMessage {
148-
AmqpMessage(AMQP::Channel &channel, uint64_t deliveryTag, const AMQP::Message &m)
149-
: AbstractMessage(m.body(), m.bodySize()), _deliveryTag(deliveryTag), channel(channel) {
157+
AmqpMessage(AMQP::Channel &channel, uint64_t deliveryTag, const AMQP::Message &m, const std::string &p)
158+
: AbstractMessage(m.body(), m.bodySize(), p)
159+
, _deliveryTag(deliveryTag)
160+
,channel(channel)
161+
{
150162
}
151163

152164
uint64_t _deliveryTag;
@@ -209,10 +221,10 @@ class AmqpEngine final : public Engine, protected AbstractEngine<AmqpEngine> {
209221
void setupInPort(const ParticipantRegistration &r, const Definition::Port &port) {
210222
channel.declareQueue(port.queue, AMQP::durable);
211223
channel.consume(port.queue).onReceived(
212-
[r, this](const AMQP::Message &message,
224+
[r, this, port](const AMQP::Message &message,
213225
uint64_t deliveryTag,
214226
bool redelivered) {
215-
AmqpMessage msg(channel, deliveryTag, message);
227+
AmqpMessage msg(channel, deliveryTag, message, port.id);
216228
r.handler(&msg);
217229
});
218230
}
@@ -243,8 +255,8 @@ using msg_flo_mqtt_client = mqtt_client<trygvis::mqtt_support::mqtt_client_perso
243255
class MosquittoEngine final : public Engine, protected mqtt_event_listener, protected AbstractEngine<MosquittoEngine> {
244256

245257
struct MosquittoMessage final : public AbstractMessage {
246-
MosquittoMessage(const struct mosquitto_message *m, bool d)
247-
: AbstractMessage(static_cast<char *>(m->payload), static_cast<uint64_t>(m->payloadlen))
258+
MosquittoMessage(const struct mosquitto_message *m, bool d, const std::string &p)
259+
: AbstractMessage(static_cast<char *>(m->payload), static_cast<uint64_t>(m->payloadlen), p)
248260
, _mid(m->mid)
249261
, _debugOutput(d)
250262
{
@@ -319,7 +331,7 @@ class MosquittoEngine final : public Engine, protected mqtt_event_listener, prot
319331
for (auto &r : registrations) {
320332
for (auto &p : r.inports) {
321333
if (p.queue == topic) {
322-
MosquittoMessage m(message, _debugOutput);
334+
MosquittoMessage m(message, _debugOutput, p.id);
323335

324336
r.handler(&m);
325337
}

0 commit comments

Comments
 (0)