Skip to content

Commit 3277778

Browse files
committed
added persistent relay feature
1 parent c186312 commit 3277778

6 files changed

Lines changed: 174 additions & 10 deletions

File tree

agent/MTConnect.NET-Agent/agent.config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ modules:
9393
# - mqtt-relay: # - Add MQTT Relay module (Entity Structure)
9494
# server: localhost
9595
# port: 1883
96+
# durableRelay: true
9697
# currentInterval: 10000
9798
# sampleInterval: 500
9899
# documentFormat: JSON
@@ -102,6 +103,7 @@ modules:
102103
# - mqtt-relay: # - Add MQTT Relay module (TLS)
103104
# server: localhost
104105
# port: 8883
106+
# durableRelay: true
105107
# currentInterval: 10000
106108
# sampleInterval: 500
107109
# tls:

agent/MTConnect.NET-Applications-Agents/agent.config.default.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ modules:
2424
# - mqtt-relay: # - Add MQTT Relay module
2525
# server: localhost
2626
# port: 1883
27+
# durableRelay: true
2728
# currentInterval: 5000
2829
# sampleInterval: 500
2930

agent/Modules/MTConnect.NET-AgentModule-MqttRelay/Module.cs

Lines changed: 158 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ public class Module : MTConnectAgentModule
3535
private readonly MqttFactory _mqttFactory;
3636
private readonly IMqttClient _mqttClient;
3737
private CancellationTokenSource _stop;
38+
private static readonly object _lastSentSequenceLock = new object();
39+
private long _totalIncomingObservations = 0;
40+
private long _lastSentSequence = 0;
3841

3942

4043
public Module(IMTConnectAgentBroker mtconnectAgent, object configuration) : base(mtconnectAgent)
@@ -204,6 +207,10 @@ private async Task Worker()
204207
// Initialize Entity Server (if configured)
205208
if (_entityServer != null)
206209
{
210+
if (_configuration.DurableRelay)
211+
{
212+
await RelayBufferedObservations();
213+
}
207214
await PublishDevices();
208215
await PublishCurrentObservations();
209216
await PublishAssets();
@@ -243,7 +250,121 @@ private async Task Worker()
243250
} while (!_stop.Token.IsCancellationRequested);
244251
}
245252

253+
private static IEnumerable<IObservationOutput> GetAllObservations(IStreamsResponseOutputDocument doc)
254+
{
255+
if (doc == null || doc.Streams == null) yield break;
256+
257+
foreach (var deviceStream in doc.Streams)
258+
{
259+
if (deviceStream.ComponentStreams == null) continue;
260+
foreach (var componentStream in deviceStream.ComponentStreams)
261+
{
262+
if (componentStream.Observations == null) continue;
263+
foreach (var obs in componentStream.Observations)
264+
{
265+
yield return obs;
266+
}
267+
}
268+
}
269+
}
270+
271+
private async Task RelayBufferedObservations()
272+
{
273+
if (Agent is IMTConnectAgentBroker broker)
274+
{
275+
ulong lastSent = GetLastSentSequence();
276+
ulong from = Math.Max(lastSent + 1, broker.FirstSequence);
277+
ulong to = broker.LastSequence;
278+
279+
Log(MTConnectLogLevel.Information, $"[MQTT Relay] RelayBufferedObservations: lastSent={lastSent}, broker.FirstSequence={broker.FirstSequence}, broker.LastSequence={broker.LastSequence}");
280+
281+
long missed = (long)(to - lastSent);
282+
283+
if (lastSent + 1 < broker.FirstSequence)
284+
{
285+
Log(MTConnectLogLevel.Warning, $"[MQTT Relay] Some observations could not be relayed because they were overwritten in the buffer (lastSent={lastSent}, firstAvailable={broker.FirstSequence}).");
286+
}
287+
if (missed > 0 && from <= to)
288+
{
289+
Log(MTConnectLogLevel.Warning, $"[MQTT Relay] Network reconnected. {to - from + 1} observations will now be sent (from Sequence {from} to {to}).");
290+
}
291+
292+
if (from <= to)
293+
{
294+
var doc = broker.GetDeviceStreamsResponseDocument(from, to);
295+
var observations = GetAllObservations(doc)
296+
.OrderBy(o => o.Sequence)
297+
.ToList();
298+
299+
Log(MTConnectLogLevel.Information, $"[MQTT Relay] Relaying buffered observations: {observations.Count} from Sequence {from} to {to}");
300+
301+
foreach (var obs in observations)
302+
{
303+
var x = new Observation();
304+
x.DeviceUuid = obs.DeviceUuid;
305+
x.DataItemId = obs.DataItemId;
306+
x.DataItem = obs.DataItem;
307+
x.Name = obs.Name;
308+
x.Category = obs.Category;
309+
x.Type = obs.Type;
310+
x.SubType = obs.SubType;
311+
x.Representation = obs.Representation;
312+
x.CompositionId = obs.CompositionId;
313+
x.InstanceId = obs.InstanceId;
314+
x.Sequence = obs.Sequence;
315+
x.Timestamp = obs.Timestamp;
316+
x.AddValues(obs.Values);
317+
318+
var result = await _entityServer.PublishObservation(_mqttClient, x);
319+
if (result != null && result.IsSuccess)
320+
{
321+
SetLastSentSequence(x.Sequence);
322+
}
323+
}
246324

325+
Log(MTConnectLogLevel.Information, $"[MQTT Relay] Buffered observation relay complete. {observations.Count} missed observations sent.");
326+
var unsent = broker.LastSequence > (ulong)_lastSentSequence ? broker.LastSequence - (ulong)_lastSentSequence : 0;
327+
if (unsent > 0)
328+
Log(MTConnectLogLevel.Information, $"[MQTT Relay] {unsent} new observations arrived during relay and will be sent next.");
329+
}
330+
else
331+
{
332+
Log(MTConnectLogLevel.Information, "[MQTT Relay] No buffered observations to relay.");
333+
}
334+
}
335+
}
336+
337+
private static string GetLastSentSequenceFilePath()
338+
{
339+
var bufferDir = Path.Combine(AppContext.BaseDirectory, "buffer");
340+
Directory.CreateDirectory(bufferDir);
341+
return Path.Combine(bufferDir, "mqttrelay_last_sent.seq");
342+
}
343+
344+
private ulong GetLastSentSequence()
345+
{
346+
if (!_configuration.DurableRelay) return 0; // Default
347+
var path = GetLastSentSequenceFilePath();
348+
lock (_lastSentSequenceLock)
349+
{
350+
if (File.Exists(path))
351+
{
352+
var text = File.ReadAllText(path);
353+
if (ulong.TryParse(text, out var seq)) return seq;
354+
}
355+
}
356+
return 0;
357+
}
358+
359+
private void SetLastSentSequence(ulong seq)
360+
{
361+
if (!_configuration.DurableRelay) return; // Default
362+
var path = GetLastSentSequenceFilePath();
363+
lock (_lastSentSequenceLock)
364+
{
365+
File.WriteAllText(path, seq.ToString());
366+
}
367+
}
247368

248369
private async void ProbeReceived(IDevice device, IDevicesResponseDocument responseDocument)
249370
{
@@ -553,16 +674,47 @@ private async void AgentDeviceAdded(object sender, IDevice device)
553674

554675
private async void AgentObservationAdded(object sender, IObservation observation)
555676
{
556-
if (_entityServer != null)
677+
if (_configuration.DurableRelay)
557678
{
558-
if (observation.Category == DataItemCategory.CONDITION)
679+
Interlocked.Increment(ref _totalIncomingObservations);
680+
681+
var lastSent = GetLastSentSequence();
682+
_lastSentSequence = (long)lastSent;
683+
684+
if (_entityServer != null)
559685
{
560-
var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId);
561-
await PublishObservations(conditionObservations);
686+
if (observation.Category == DataItemCategory.CONDITION)
687+
{
688+
var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId);
689+
var result = await _entityServer.PublishObservations(_mqttClient, conditionObservations.OfType<IObservation>());
690+
if (result != null && result.IsSuccess && conditionObservations != null && conditionObservations.Any())
691+
{
692+
SetLastSentSequence(conditionObservations.Max(o => o.Sequence));
693+
}
694+
}
695+
else
696+
{
697+
var result = await _entityServer.PublishObservation(_mqttClient, observation);
698+
if (result != null && result.IsSuccess)
699+
{
700+
SetLastSentSequence(observation.Sequence);
701+
}
702+
}
562703
}
563-
else
704+
}
705+
else
706+
{
707+
if (_entityServer != null)
564708
{
565-
await _entityServer.PublishObservation(_mqttClient, observation);
709+
if (observation.Category == DataItemCategory.CONDITION)
710+
{
711+
var conditionObservations = Agent.GetCurrentObservations(observation.DeviceUuid, observation.DataItemId);
712+
await PublishObservations(conditionObservations);
713+
}
714+
else
715+
{
716+
await _entityServer.PublishObservation(_mqttClient, observation);
717+
}
566718
}
567719
}
568720
}

agent/Modules/MTConnect.NET-AgentModule-MqttRelay/MqttRelayModuleConfiguration.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public class MqttRelayModuleConfiguration : IMTConnectMqttDocumentServerConfigur
9595
/// </summary>
9696
public int SampleInterval { get; set; }
9797

98+
/// <summary>
99+
/// Sets whether to send buffered observation on successful reconnect
100+
/// </summary>
101+
public bool DurableRelay { get; set; } = false;
102+
98103

99104
public MqttRelayModuleConfiguration()
100105
{

libraries/MTConnect.NET-MQTT/MTConnectMqttEntityServer.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private MqttApplicationMessage CreateMessage(IDevice device)
111111
}
112112

113113

114-
public async Task PublishObservation(IMqttClient mqttClient, IObservation observation)
114+
public async Task<MqttClientPublishResult> PublishObservation(IMqttClient mqttClient, IObservation observation)
115115
{
116116
if (mqttClient != null && mqttClient.IsConnected && observation != null)
117117
{
@@ -127,15 +127,17 @@ public async Task PublishObservation(IMqttClient mqttClient, IObservation observ
127127
{
128128
if (SendError != null) SendError.Invoke(this, message.Topic);
129129
}
130+
return result;
130131
}
131132
catch (Exception ex)
132133
{
133134
if (ClientError != null) ClientError.Invoke(this, ex);
134135
}
135136
}
137+
return null;
136138
}
137139

138-
public async Task PublishObservations(IMqttClient mqttClient, IEnumerable<IObservation> observations)
140+
public async Task<MqttClientPublishResult> PublishObservations(IMqttClient mqttClient, IEnumerable<IObservation> observations)
139141
{
140142
if (mqttClient != null && mqttClient.IsConnected && !observations.IsNullOrEmpty())
141143
{
@@ -151,12 +153,14 @@ public async Task PublishObservations(IMqttClient mqttClient, IEnumerable<IObser
151153
{
152154
if (SendError != null) SendError.Invoke(this, message.Topic);
153155
}
156+
return result;
154157
}
155158
catch (Exception ex)
156159
{
157160
if (ClientError != null) ClientError.Invoke(this, ex);
158161
}
159162
}
163+
return null;
160164
}
161165

162166
public async Task PublishObservation(MqttServer mqttServer, IObservation observation)

tests/IntegrationTests/ClientAgentCommunicationTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ private void AddCuttingTools()
162162
var measurements = new List<IMeasurement>();
163163
measurements.Add(new MTConnect.Assets.CuttingTools.Measurements.FunctionalLengthMeasurement(7.6543));
164164
measurements.Add(new MTConnect.Assets.CuttingTools.Measurements.CuttingDiameterMaxMeasurement(0.375));
165-
cuttingToolLifeCycle.Measurements = measurements;
165+
cuttingToolLifeCycle.Measurements = measurements.OfType<IToolingMeasurement>();
166166

167167
var cuttingItems = new List<ICuttingItem>();
168168
cuttingItems.Add(new CuttingItem
@@ -239,7 +239,7 @@ private void AddCuttingTools()
239239
return tcs.Task;
240240
}
241241

242-
private static void GenerateDevicesXml(
242+
internal static void GenerateDevicesXml(
243243
string machineId,
244244
string machineName,
245245
string fileName,

0 commit comments

Comments
 (0)