@@ -35,6 +35,9 @@ public class Module : MTConnectAgentModule
3535 private readonly MqttFactory _mqttFactory ;
3636 private readonly IMqttClient _mqttClient ;
3737 private CancellationTokenSource _stop ;
38+ private static readonly object _lastSentSequenceLock = new object ( ) ;
39+ private long _totalIncomingObservations = 0 ;
40+ private long _lastSentSequence = 0 ;
3841
3942
4043 public Module ( IMTConnectAgentBroker mtconnectAgent , object configuration ) : base ( mtconnectAgent )
@@ -204,6 +207,10 @@ private async Task Worker()
204207 // Initialize Entity Server (if configured)
205208 if ( _entityServer != null )
206209 {
210+ if ( _configuration . DurableRelay )
211+ {
212+ await RelayBufferedObservations ( ) ;
213+ }
207214 await PublishDevices ( ) ;
208215 await PublishCurrentObservations ( ) ;
209216 await PublishAssets ( ) ;
@@ -243,7 +250,121 @@ private async Task Worker()
243250 } while ( ! _stop . Token . IsCancellationRequested ) ;
244251 }
245252
253+ private static IEnumerable < IObservationOutput > GetAllObservations ( IStreamsResponseOutputDocument doc )
254+ {
255+ if ( doc == null || doc . Streams == null ) yield break ;
256+
257+ foreach ( var deviceStream in doc . Streams )
258+ {
259+ if ( deviceStream . ComponentStreams == null ) continue ;
260+ foreach ( var componentStream in deviceStream . ComponentStreams )
261+ {
262+ if ( componentStream . Observations == null ) continue ;
263+ foreach ( var obs in componentStream . Observations )
264+ {
265+ yield return obs ;
266+ }
267+ }
268+ }
269+ }
270+
271+ private async Task RelayBufferedObservations ( )
272+ {
273+ if ( Agent is IMTConnectAgentBroker broker )
274+ {
275+ ulong lastSent = GetLastSentSequence ( ) ;
276+ ulong from = Math . Max ( lastSent + 1 , broker . FirstSequence ) ;
277+ ulong to = broker . LastSequence ;
278+
279+ Log ( MTConnectLogLevel . Information , $ "[MQTT Relay] RelayBufferedObservations: lastSent={ lastSent } , broker.FirstSequence={ broker . FirstSequence } , broker.LastSequence={ broker . LastSequence } ") ;
280+
281+ long missed = ( long ) ( to - lastSent ) ;
282+
283+ if ( lastSent + 1 < broker . FirstSequence )
284+ {
285+ Log ( MTConnectLogLevel . Warning , $ "[MQTT Relay] Some observations could not be relayed because they were overwritten in the buffer (lastSent={ lastSent } , firstAvailable={ broker . FirstSequence } ).") ;
286+ }
287+ if ( missed > 0 && from <= to )
288+ {
289+ Log ( MTConnectLogLevel . Warning , $ "[MQTT Relay] Network reconnected. { to - from + 1 } observations will now be sent (from Sequence { from } to { to } ).") ;
290+ }
291+
292+ if ( from <= to )
293+ {
294+ var doc = broker . GetDeviceStreamsResponseDocument ( from , to ) ;
295+ var observations = GetAllObservations ( doc )
296+ . OrderBy ( o => o . Sequence )
297+ . ToList ( ) ;
298+
299+ Log ( MTConnectLogLevel . Information , $ "[MQTT Relay] Relaying buffered observations: { observations . Count } from Sequence { from } to { to } ") ;
300+
301+ foreach ( var obs in observations )
302+ {
303+ var x = new Observation ( ) ;
304+ x . DeviceUuid = obs . DeviceUuid ;
305+ x . DataItemId = obs . DataItemId ;
306+ x . DataItem = obs . DataItem ;
307+ x . Name = obs . Name ;
308+ x . Category = obs . Category ;
309+ x . Type = obs . Type ;
310+ x . SubType = obs . SubType ;
311+ x . Representation = obs . Representation ;
312+ x . CompositionId = obs . CompositionId ;
313+ x . InstanceId = obs . InstanceId ;
314+ x . Sequence = obs . Sequence ;
315+ x . Timestamp = obs . Timestamp ;
316+ x . AddValues ( obs . Values ) ;
317+
318+ var result = await _entityServer . PublishObservation ( _mqttClient , x ) ;
319+ if ( result != null && result . IsSuccess )
320+ {
321+ SetLastSentSequence ( x . Sequence ) ;
322+ }
323+ }
246324
325+ Log ( MTConnectLogLevel . Information , $ "[MQTT Relay] Buffered observation relay complete. { observations . Count } missed observations sent.") ;
326+ var unsent = broker . LastSequence > ( ulong ) _lastSentSequence ? broker . LastSequence - ( ulong ) _lastSentSequence : 0 ;
327+ if ( unsent > 0 )
328+ Log ( MTConnectLogLevel . Information , $ "[MQTT Relay] { unsent } new observations arrived during relay and will be sent next.") ;
329+ }
330+ else
331+ {
332+ Log ( MTConnectLogLevel . Information , "[MQTT Relay] No buffered observations to relay." ) ;
333+ }
334+ }
335+ }
336+
337+ private static string GetLastSentSequenceFilePath ( )
338+ {
339+ var bufferDir = Path . Combine ( AppContext . BaseDirectory , "buffer" ) ;
340+ Directory . CreateDirectory ( bufferDir ) ;
341+ return Path . Combine ( bufferDir , "mqttrelay_last_sent.seq" ) ;
342+ }
343+
344+ private ulong GetLastSentSequence ( )
345+ {
346+ if ( ! _configuration . DurableRelay ) return 0 ; // Default
347+ var path = GetLastSentSequenceFilePath ( ) ;
348+ lock ( _lastSentSequenceLock )
349+ {
350+ if ( File . Exists ( path ) )
351+ {
352+ var text = File . ReadAllText ( path ) ;
353+ if ( ulong . TryParse ( text , out var seq ) ) return seq ;
354+ }
355+ }
356+ return 0 ;
357+ }
358+
359+ private void SetLastSentSequence ( ulong seq )
360+ {
361+ if ( ! _configuration . DurableRelay ) return ; // Default
362+ var path = GetLastSentSequenceFilePath ( ) ;
363+ lock ( _lastSentSequenceLock )
364+ {
365+ File . WriteAllText ( path , seq . ToString ( ) ) ;
366+ }
367+ }
247368
248369 private async void ProbeReceived ( IDevice device , IDevicesResponseDocument responseDocument )
249370 {
@@ -553,16 +674,47 @@ private async void AgentDeviceAdded(object sender, IDevice device)
553674
554675 private async void AgentObservationAdded ( object sender , IObservation observation )
555676 {
556- if ( _entityServer != null )
677+ if ( _configuration . DurableRelay )
557678 {
558- if ( observation . Category == DataItemCategory . CONDITION )
679+ Interlocked . Increment ( ref _totalIncomingObservations ) ;
680+
681+ var lastSent = GetLastSentSequence ( ) ;
682+ _lastSentSequence = ( long ) lastSent ;
683+
684+ if ( _entityServer != null )
559685 {
560- var conditionObservations = Agent . GetCurrentObservations ( observation . DeviceUuid , observation . DataItemId ) ;
561- await PublishObservations ( conditionObservations ) ;
686+ if ( observation . Category == DataItemCategory . CONDITION )
687+ {
688+ var conditionObservations = Agent . GetCurrentObservations ( observation . DeviceUuid , observation . DataItemId ) ;
689+ var result = await _entityServer . PublishObservations ( _mqttClient , conditionObservations . OfType < IObservation > ( ) ) ;
690+ if ( result != null && result . IsSuccess && conditionObservations != null && conditionObservations . Any ( ) )
691+ {
692+ SetLastSentSequence ( conditionObservations . Max ( o => o . Sequence ) ) ;
693+ }
694+ }
695+ else
696+ {
697+ var result = await _entityServer . PublishObservation ( _mqttClient , observation ) ;
698+ if ( result != null && result . IsSuccess )
699+ {
700+ SetLastSentSequence ( observation . Sequence ) ;
701+ }
702+ }
562703 }
563- else
704+ }
705+ else
706+ {
707+ if ( _entityServer != null )
564708 {
565- await _entityServer . PublishObservation ( _mqttClient , observation ) ;
709+ if ( observation . Category == DataItemCategory . CONDITION )
710+ {
711+ var conditionObservations = Agent . GetCurrentObservations ( observation . DeviceUuid , observation . DataItemId ) ;
712+ await PublishObservations ( conditionObservations ) ;
713+ }
714+ else
715+ {
716+ await _entityServer . PublishObservation ( _mqttClient , observation ) ;
717+ }
566718 }
567719 }
568720 }
0 commit comments