Skip to content

Commit bf755bf

Browse files
Ticket #103 : support persisted session + direct subscription
1 parent ad5f0aa commit bf755bf

26 files changed

Lines changed: 279 additions & 250 deletions

src/EventMesh/FaasNet.EventMesh.Client/EventMeshClient.cs

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ public interface IEventMeshClientPubSession
144144

145145
public interface IEventMeshClientSubSession
146146
{
147-
Task<SubscriptionResult> Subscribe(string topicFilter, Action<CloudEvent> callback, CancellationToken cancellationToken);
147+
Task<SubscriptionResult> PersistedSubscribe(string topicFilter, string groupId, Action<CloudEvent> callback, CancellationToken cancellationToken);
148+
SubscriptionResult DirectSubscribe(string topicFilter, Action<CloudEvent> callback, CancellationToken cancellationToken);
148149
}
149150

150151
public class SubscriptionResult
@@ -212,6 +213,7 @@ internal class EventMeshClientSubSession : IEventMeshClientSubSession
212213
private readonly HelloResponse _session;
213214
private readonly IPAddress _ipAddr;
214215
private readonly int _port;
216+
private int _offsetEvt;
215217

216218
public EventMeshClientSubSession(HelloResponse session, IPAddress ipAddr, int port)
217219
{
@@ -220,23 +222,34 @@ public EventMeshClientSubSession(HelloResponse session, IPAddress ipAddr, int po
220222
_port = port;
221223
}
222224

223-
public async Task<SubscriptionResult> Subscribe(string topicFilter, Action<CloudEvent> callback, CancellationToken cancellationToken)
225+
public async Task<SubscriptionResult> PersistedSubscribe(string topicFilter, string groupId, Action<CloudEvent> callback, CancellationToken cancellationToken)
224226
{
225227
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
226228
var result = new SubscriptionResult(cancellationTokenSource);
227-
await Subscribe(topicFilter, cancellationToken);
229+
await PersistedSubscribe(topicFilter, groupId, cancellationToken);
228230
#pragma warning disable 4014
229-
Task.Run(async () => await Handle(callback, cancellationTokenSource.Token));
231+
Task.Run(async () => await HandlePersistedSubscribe(callback, groupId, cancellationTokenSource.Token));
230232
#pragma warning restore 4014
231233
return result;
232234
}
233235

234-
private async Task Subscribe(string topicFilter, CancellationToken cancellationToken)
236+
public SubscriptionResult DirectSubscribe(string topicName, Action<CloudEvent> callback, CancellationToken cancellationToken)
237+
{
238+
_offsetEvt = 1;
239+
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
240+
var result = new SubscriptionResult(cancellationTokenSource);
241+
#pragma warning disable 4014
242+
Task.Run(async () => await HandleDirectSubscribe(callback, topicName, cancellationTokenSource.Token));
243+
#pragma warning restore 4014
244+
return result;
245+
}
246+
247+
private async Task PersistedSubscribe(string topicFilter, string groupId, CancellationToken cancellationToken)
235248
{
236249
using (var udpClient = new UdpClient())
237250
{
238251
var writeCtx = new WriteBufferContext();
239-
var package = PackageRequestBuilder.Subscribe(new List<SubscriptionItem>
252+
var package = PackageRequestBuilder.Subscribe(groupId, new List<SubscriptionItem>
240253
{
241254
new SubscriptionItem
242255
{
@@ -253,7 +266,7 @@ private async Task Subscribe(string topicFilter, CancellationToken cancellationT
253266
}
254267
}
255268

256-
private async Task Handle(Action<CloudEvent> callback, CancellationToken cancellationToken)
269+
private async Task HandlePersistedSubscribe(Action<CloudEvent> callback, string groupId, CancellationToken cancellationToken)
257270
{
258271
try
259272
{
@@ -263,7 +276,7 @@ private async Task Handle(Action<CloudEvent> callback, CancellationToken cancell
263276
using (var udpClient = new UdpClient())
264277
{
265278
var writeCtx = new WriteBufferContext();
266-
var package = PackageRequestBuilder.ReadNextMessage(_session.SessionId);
279+
var package = PackageRequestBuilder.ReadNextMessage(_session.SessionId, groupId);
267280
package.Serialize(writeCtx);
268281
var payload = writeCtx.Buffer.ToArray();
269282
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
@@ -283,5 +296,37 @@ private async Task Handle(Action<CloudEvent> callback, CancellationToken cancell
283296
}
284297
catch { }
285298
}
299+
300+
private async Task HandleDirectSubscribe(Action<CloudEvent> callback, string topicName, CancellationToken cancellationToken)
301+
{
302+
try
303+
{
304+
while (true)
305+
{
306+
cancellationToken.ThrowIfCancellationRequested();
307+
using (var udpClient = new UdpClient())
308+
{
309+
var writeCtx = new WriteBufferContext();
310+
var package = PackageRequestBuilder.ReadTopicMessage(topicName, _offsetEvt);
311+
package.Serialize(writeCtx);
312+
var payload = writeCtx.Buffer.ToArray();
313+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
314+
var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken);
315+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
316+
var packageResult = Package.Deserialize(readCtx);
317+
EventMeshClient.EnsureSuccessStatus(package, packageResult);
318+
var result = packageResult as ReadMessageTopicResponse;
319+
if (result.ContainsMessage)
320+
{
321+
callback(result.Value);
322+
_offsetEvt++;
323+
}
324+
}
325+
326+
Thread.Sleep(200);
327+
}
328+
}
329+
catch { }
330+
}
286331
}
287332
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/Commands.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ public class Commands : IEquatable<Commands>
8787
/// Result returned when message is sent.
8888
/// </summary>
8989
public static Commands READ_NEXT_MESSAGE_RESPONSE = new Commands(19, "READ_MESSAGE_RESPONSE");
90+
/// <summary>
91+
/// Request sent to read topic message.
92+
/// </summary>
93+
public static Commands READ_TOPIC_MESSAGE_REQUEST = new Commands(20, "READ_TOPIC_MESSAGE_REQUEST");
94+
/// <summary>
95+
/// Result returned when message is sent.
96+
/// </summary>
97+
public static Commands READ_TOPIC_MESSAGE_RESPONSE = new Commands(21, "READ_TOPIC_MESSAGE_RESPONSE");
9098

9199
private Commands(int code)
92100
{

src/EventMesh/FaasNet.EventMesh.Client/Messages/Package.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,20 @@ public static Package Deserialize(ReadBufferContext context)
137137
return result;
138138
}
139139

140+
if (Commands.READ_TOPIC_MESSAGE_REQUEST == header.Command)
141+
{
142+
var result = new ReadMessageTopicRequest { Header = header };
143+
result.Extract(context);
144+
return result;
145+
}
146+
147+
if (Commands.READ_TOPIC_MESSAGE_RESPONSE == header.Command)
148+
{
149+
var result = new ReadMessageTopicResponse { Header = header };
150+
result.Extract(context);
151+
return result;
152+
}
153+
140154
return new Package
141155
{
142156
Header = header

src/EventMesh/FaasNet.EventMesh.Client/Messages/PackageRequestBuilder.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,12 @@ public static DisconnectRequest Disconnect(string clientId, string sessionId)
7070
return result;
7171
}
7272

73-
public static SubscriptionRequest Subscribe(ICollection<SubscriptionItem> subscriptionItems, string sessionId, string seq = null)
73+
public static SubscriptionRequest Subscribe(string groupId, ICollection<SubscriptionItem> subscriptionItems, string sessionId, string seq = null)
7474
{
7575
var result = new SubscriptionRequest
7676
{
7777
Header = new Header(Commands.SUBSCRIBE_REQUEST, HeaderStatus.SUCCESS, seq ?? GenerateRandomSeq()),
78+
GroupId = groupId,
7879
TopicFilters = subscriptionItems,
7980
SessionId = sessionId
8081
};
@@ -106,12 +107,24 @@ public static Package AddBridge(string vpn, string urn, int port, string targetV
106107
return result;
107108
}
108109

109-
public static Package ReadNextMessage(string sessionId, string seq = null)
110+
public static Package ReadNextMessage(string sessionId, string groupId, string seq = null)
110111
{
111112
var result = new ReadNextMessageRequest
112113
{
113114
Header = new Header(Commands.READ_NEXT_MESSAGE_REQUEST, HeaderStatus.SUCCESS, seq ?? GenerateRandomSeq()),
114-
SessionId = sessionId
115+
SessionId = sessionId,
116+
GroupId = groupId
117+
};
118+
return result;
119+
}
120+
121+
public static Package ReadTopicMessage(string topic, int evtOffset, string seq = null)
122+
{
123+
var result = new ReadMessageTopicRequest
124+
{
125+
Header = new Header(Commands.READ_TOPIC_MESSAGE_REQUEST, HeaderStatus.SUCCESS, seq ?? GenerateRandomSeq()),
126+
Topic = topic,
127+
EvtOffset = evtOffset
115128
};
116129
return result;
117130
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/PackageResponseBuilder.cs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ public static Package Subscription(string seq, string queueName)
5353
return result;
5454
}
5555

56+
public static Package Subscription(string seq)
57+
{
58+
var result = new SubscriptionResult
59+
{
60+
Header = new Header(Commands.SUBSCRIBE_RESPONSE, HeaderStatus.SUCCESS, seq),
61+
QueueName = string.Empty
62+
};
63+
return result;
64+
}
65+
5666
public static Package PublishMessage(string seq)
5767
{
5868
var result = new Package
@@ -89,14 +99,25 @@ public static Package AddVpn(string seq)
8999
return result;
90100
}
91101

92-
public static Package ReadNextMessage(CloudEvent cloudEvt, int evtOffset, string seq)
102+
public static Package ReadNextMessage(CloudEvent cloudEvt, string seq)
93103
{
94104
return new ReadNextMessageResponse
95105
{
96106
Header = new Header(Commands.READ_NEXT_MESSAGE_RESPONSE, HeaderStatus.SUCCESS, seq),
97107
ContainsMessage = cloudEvt != null,
98-
CloudEvt = cloudEvt,
99-
EvtOffset = evtOffset
108+
CloudEvt = cloudEvt
109+
};
110+
}
111+
112+
public static Package ReadTopicMessage(string topic, CloudEvent value, int evtOffset, string seq)
113+
{
114+
return new ReadMessageTopicResponse
115+
{
116+
Header = new Header(Commands.READ_TOPIC_MESSAGE_RESPONSE, HeaderStatus.SUCCESS, seq),
117+
Topic = topic,
118+
Value = value,
119+
EvtOffset = evtOffset,
120+
ContainsMessage = value != null
100121
};
101122
}
102123

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
3+
namespace FaasNet.EventMesh.Client.Messages
4+
{
5+
public class ReadMessageTopicRequest : Package
6+
{
7+
public string Topic { get; set; }
8+
public int EvtOffset { get; set; }
9+
10+
public override void Serialize(WriteBufferContext context)
11+
{
12+
base.Serialize(context);
13+
context.WriteString(Topic);
14+
context.WriteInteger(EvtOffset);
15+
}
16+
17+
public void Extract(ReadBufferContext context)
18+
{
19+
Topic = context.NextString();
20+
EvtOffset = context.NextInt();
21+
}
22+
}
23+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using CloudNative.CloudEvents;
2+
using FaasNet.EventMesh.Client.Extensions;
3+
using FaasNet.RaftConsensus.Client.Messages;
4+
5+
namespace FaasNet.EventMesh.Client.Messages
6+
{
7+
public class ReadMessageTopicResponse : Package
8+
{
9+
public string Topic { get; set; }
10+
public int EvtOffset { get; set; }
11+
public bool ContainsMessage { get; set; }
12+
public CloudEvent Value { get; set; }
13+
14+
public override void Serialize(WriteBufferContext context)
15+
{
16+
base.Serialize(context);
17+
context.WriteString(Topic);
18+
context.WriteInteger(EvtOffset);
19+
context.WriteBoolean(ContainsMessage);
20+
if (!ContainsMessage) return;
21+
Value.Serialize(context);
22+
}
23+
24+
public void Extract(ReadBufferContext context)
25+
{
26+
Topic = context.NextString();
27+
EvtOffset = context.NextInt();
28+
ContainsMessage = context.NextBoolean();
29+
if (ContainsMessage) Value = context.DeserializeCloudEvent();
30+
}
31+
}
32+
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/ReadNextMessageRequest.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@ namespace FaasNet.EventMesh.Client.Messages
55
public class ReadNextMessageRequest : Package
66
{
77
public string SessionId { get; set; }
8+
public string GroupId { get; set; }
89

910
public override void Serialize(WriteBufferContext context)
1011
{
1112
base.Serialize(context);
1213
context.WriteString(SessionId);
14+
context.WriteString(GroupId);
1315
}
1416

1517
public void Extract(ReadBufferContext context)
1618
{
1719
SessionId = context.NextString();
20+
GroupId = context.NextString();
1821
}
1922
}
2023
}

0 commit comments

Comments
 (0)