1- // Copyright (c) 2024 TrakHound Inc., All Rights Reserved.
1+ // Copyright (c) 2025 TrakHound Inc., All Rights Reserved.
22// TrakHound Inc. licenses this file to you under the MIT license.
33
44using MQTTnet ;
99using MTConnect . Devices ;
1010using MTConnect . Formatters ;
1111using MTConnect . Observations ;
12+ using System ;
1213using System . Collections . Generic ;
1314using System . IO ;
1415using System . Linq ;
@@ -22,6 +23,11 @@ public class MTConnectMqttEntityServer
2223
2324 public string TopicPrefix => _configuration . TopicPrefix ;
2425
26+ public event EventHandler < string > MessageSent ;
27+ public event EventHandler < string > SendError ;
28+ public event EventHandler < Exception > ClientError ;
29+ public event EventHandler < Exception > ServerError ;
30+
2531
2632 public MTConnectMqttEntityServer ( string topicPrefix = null , string documentFormat = DocumentFormat . JSON , int qos = 0 )
2733 {
@@ -43,15 +49,22 @@ public async Task PublishDevice(IMqttClient mqttClient, IDevice device)
4349 {
4450 if ( mqttClient != null && mqttClient . IsConnected && device != null )
4551 {
46- var message = CreateMessage ( device ) ;
47- var result = await mqttClient . PublishAsync ( message ) ;
48- if ( result . IsSuccess )
52+ try
4953 {
50-
54+ var message = CreateMessage ( device ) ;
55+ var result = await mqttClient . PublishAsync ( message ) ;
56+ if ( result . IsSuccess )
57+ {
58+ if ( MessageSent != null ) MessageSent . Invoke ( this , message . Topic ) ;
59+ }
60+ else
61+ {
62+ if ( SendError != null ) SendError . Invoke ( this , message . Topic ) ;
63+ }
5164 }
52- else
65+ catch ( Exception ex )
5366 {
54-
67+ if ( ClientError != null ) ClientError . Invoke ( this , ex ) ;
5568 }
5669 }
5770 }
@@ -60,10 +73,17 @@ public async Task PublishDevice(MqttServer mqttServer, IDevice device)
6073 {
6174 if ( mqttServer != null && device != null )
6275 {
63- var message = CreateMessage ( device ) ;
64- var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
76+ try
77+ {
78+ var message = CreateMessage ( device ) ;
79+ var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
6580
66- await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
81+ await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
82+ }
83+ catch ( Exception ex )
84+ {
85+ if ( ServerError != null ) ServerError . Invoke ( this , ex ) ;
86+ }
6787 }
6888 }
6989
@@ -95,15 +115,22 @@ public async Task PublishObservation(IMqttClient mqttClient, IObservation observ
95115 {
96116 if ( mqttClient != null && mqttClient . IsConnected && observation != null )
97117 {
98- var message = CreateMessage ( observation ) ;
99- var result = await mqttClient . PublishAsync ( message ) ;
100- if ( result . IsSuccess )
118+ try
101119 {
102-
120+ var message = CreateMessage ( observation ) ;
121+ var result = await mqttClient . PublishAsync ( message ) ;
122+ if ( result . IsSuccess )
123+ {
124+ if ( MessageSent != null ) MessageSent . Invoke ( this , message . Topic ) ;
125+ }
126+ else
127+ {
128+ if ( SendError != null ) SendError . Invoke ( this , message . Topic ) ;
129+ }
103130 }
104- else
131+ catch ( Exception ex )
105132 {
106-
133+ if ( ClientError != null ) ClientError . Invoke ( this , ex ) ;
107134 }
108135 }
109136 }
@@ -112,15 +139,22 @@ public async Task PublishObservations(IMqttClient mqttClient, IEnumerable<IObser
112139 {
113140 if ( mqttClient != null && mqttClient . IsConnected && ! observations . IsNullOrEmpty ( ) )
114141 {
115- var message = CreateMessage ( observations ) ;
116- var result = await mqttClient . PublishAsync ( message ) ;
117- if ( result . IsSuccess )
142+ try
118143 {
119-
144+ var message = CreateMessage ( observations ) ;
145+ var result = await mqttClient . PublishAsync ( message ) ;
146+ if ( result . IsSuccess )
147+ {
148+ if ( MessageSent != null ) MessageSent . Invoke ( this , message . Topic ) ;
149+ }
150+ else
151+ {
152+ if ( SendError != null ) SendError . Invoke ( this , message . Topic ) ;
153+ }
120154 }
121- else
155+ catch ( Exception ex )
122156 {
123-
157+ if ( ClientError != null ) ClientError . Invoke ( this , ex ) ;
124158 }
125159 }
126160 }
@@ -129,21 +163,35 @@ public async Task PublishObservation(MqttServer mqttServer, IObservation observa
129163 {
130164 if ( mqttServer != null && observation != null )
131165 {
132- var message = CreateMessage ( observation ) ;
133- var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
166+ try
167+ {
168+ var message = CreateMessage ( observation ) ;
169+ var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
134170
135- await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
171+ await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
172+ }
173+ catch ( Exception ex )
174+ {
175+ if ( ServerError != null ) ServerError . Invoke ( this , ex ) ;
176+ }
136177 }
137178 }
138179
139180 public async Task PublishObservations ( MqttServer mqttServer , IEnumerable < IObservation > observations )
140181 {
141182 if ( mqttServer != null && ! observations . IsNullOrEmpty ( ) )
142183 {
143- var message = CreateMessage ( observations ) ;
144- var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
184+ try
185+ {
186+ var message = CreateMessage ( observations ) ;
187+ var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
145188
146- await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
189+ await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
190+ }
191+ catch ( Exception ex )
192+ {
193+ if ( ServerError != null ) ServerError . Invoke ( this , ex ) ;
194+ }
147195 }
148196 }
149197
@@ -215,15 +263,22 @@ public async Task PublishAsset(IMqttClient mqttClient, IAsset asset)
215263 {
216264 if ( mqttClient != null && mqttClient . IsConnected && asset != null )
217265 {
218- var message = CreateMessage ( asset ) ;
219- var result = await mqttClient . PublishAsync ( message ) ;
220- if ( result . IsSuccess )
266+ try
221267 {
222-
268+ var message = CreateMessage ( asset ) ;
269+ var result = await mqttClient . PublishAsync ( message ) ;
270+ if ( result . IsSuccess )
271+ {
272+ if ( MessageSent != null ) MessageSent . Invoke ( this , message . Topic ) ;
273+ }
274+ else
275+ {
276+ if ( SendError != null ) SendError . Invoke ( this , message . Topic ) ;
277+ }
223278 }
224- else
279+ catch ( Exception ex )
225280 {
226-
281+ if ( ClientError != null ) ClientError . Invoke ( this , ex ) ;
227282 }
228283 }
229284 }
@@ -232,10 +287,17 @@ public async Task PublishAsset(MqttServer mqttServer, IAsset asset)
232287 {
233288 if ( mqttServer != null && asset != null )
234289 {
235- var message = CreateMessage ( asset ) ;
236- var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
290+ try
291+ {
292+ var message = CreateMessage ( asset ) ;
293+ var injectedMessage = new InjectedMqttApplicationMessage ( message ) ;
237294
238- await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
295+ await mqttServer . InjectApplicationMessage ( injectedMessage ) ;
296+ }
297+ catch ( Exception ex )
298+ {
299+ if ( ServerError != null ) ServerError . Invoke ( this , ex ) ;
300+ }
239301 }
240302 }
241303
0 commit comments