Skip to content

Commit 71fde58

Browse files
Ticket #103 : Publish to a queue
1 parent a7ebe9b commit 71fde58

50 files changed

Lines changed: 436 additions & 373 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/EventMesh/FaasNet.EventMesh.Client/EventMeshClient.cs

Lines changed: 88 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using FaasNet.EventMesh.Client.Messages;
44
using FaasNet.RaftConsensus.Client;
55
using FaasNet.RaftConsensus.Client.Extensions;
6+
using FaasNet.RaftConsensus.Client.Messages;
67
using System;
78
using System.Collections.Generic;
89
using System.Linq;
@@ -59,15 +60,15 @@ public EventMeshClient(string url = Constants.DefaultUrl, int port = Constants.D
5960
}
6061
}
6162

62-
public async Task<IEventMeshClientPubSession> CreatePubSession(string clientId, CancellationToken cancellationToken = default(CancellationToken))
63+
public async Task<IEventMeshClientPubSession> CreatePubSession(string vpn, string clientId, CancellationToken cancellationToken = default(CancellationToken))
6364
{
64-
var pubSession = await CreateSession(clientId, cancellationToken);
65+
var pubSession = await CreateSession(vpn, clientId, UserAgentPurpose.PUB, cancellationToken);
6566
return new EventMeshClientPubSession(pubSession, _ipAddr, _port);
6667
}
6768

68-
public async Task<IEventMeshClientSubSession> CreateSubSession(string clientId, CancellationToken cancellationToken = default(CancellationToken))
69+
public async Task<IEventMeshClientSubSession> CreateSubSession(string vpn, string clientId, CancellationToken cancellationToken = default(CancellationToken))
6970
{
70-
var subSession = await CreateSession(clientId, cancellationToken);
71+
var subSession = await CreateSession(vpn, clientId, UserAgentPurpose.SUB, cancellationToken);
7172
return new EventMeshClientSubSession(subSession, _ipAddr, _port);
7273
}
7374

@@ -87,12 +88,12 @@ public EventMeshClient(string url = Constants.DefaultUrl, int port = Constants.D
8788
}
8889
}
8990

90-
public async Task AddClient(string vpn, string clientId, CancellationToken cancellationToken = default(CancellationToken))
91+
public async Task AddClient(string vpn, string clientId, List<UserAgentPurpose> purposes, CancellationToken cancellationToken = default(CancellationToken))
9192
{
9293
using (var udpClient = new UdpClient())
9394
{
9495
var writeCtx = new WriteBufferContext();
95-
var package = PackageRequestBuilder.AddClient(vpn, clientId);
96+
var package = PackageRequestBuilder.AddClient(vpn, clientId, purposes);
9697
package.Serialize(writeCtx);
9798
var payload = writeCtx.Buffer.ToArray();
9899
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
@@ -116,9 +117,9 @@ internal static void EnsureSuccessStatus(Package packageRequest, Package package
116117
}
117118
}
118119

119-
private async Task<HelloResponse> CreateSession(string clientId, CancellationToken cancellationToken = default(CancellationToken))
120+
private async Task<HelloResponse> CreateSession(string vpn, string clientId, UserAgentPurpose purpose, CancellationToken cancellationToken = default(CancellationToken))
120121
{
121-
var userAgent = new UserAgent { ClientId = clientId, Purpose = UserAgentPurpose.PUB };
122+
var userAgent = new UserAgent { ClientId = clientId, Vpn = vpn, Purpose = purpose };
122123
using (var udpClient = new UdpClient())
123124
{
124125
var writeCtx = new WriteBufferContext();
@@ -143,7 +144,22 @@ public interface IEventMeshClientPubSession
143144

144145
public interface IEventMeshClientSubSession
145146
{
147+
Task<SubscriptionResult> Subscribe(string topicFilter, Action<CloudEvent> callback, CancellationToken cancellationToken);
148+
}
149+
150+
public class SubscriptionResult
151+
{
152+
private readonly CancellationTokenSource _cancellationTokenSource;
153+
154+
internal SubscriptionResult(CancellationTokenSource cancellationTokenSource)
155+
{
156+
_cancellationTokenSource = cancellationTokenSource;
157+
}
146158

159+
public void Close()
160+
{
161+
_cancellationTokenSource.Cancel();
162+
}
147163
}
148164

149165
internal class EventMeshClientPubSession : IEventMeshClientPubSession
@@ -203,5 +219,69 @@ public EventMeshClientSubSession(HelloResponse session, IPAddress ipAddr, int po
203219
_ipAddr = ipAddr;
204220
_port = port;
205221
}
222+
223+
public async Task<SubscriptionResult> Subscribe(string topicFilter, Action<CloudEvent> callback, CancellationToken cancellationToken)
224+
{
225+
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
226+
var result = new SubscriptionResult(cancellationTokenSource);
227+
await Subscribe(topicFilter, cancellationToken);
228+
#pragma warning disable 4014
229+
Task.Run(async () => await Handle(callback, cancellationTokenSource.Token));
230+
#pragma warning restore 4014
231+
return result;
232+
}
233+
234+
private async Task Subscribe(string topicFilter, CancellationToken cancellationToken)
235+
{
236+
using (var udpClient = new UdpClient())
237+
{
238+
var writeCtx = new WriteBufferContext();
239+
var package = PackageRequestBuilder.Subscribe(new List<SubscriptionItem>
240+
{
241+
new SubscriptionItem
242+
{
243+
Topic = topicFilter
244+
}
245+
}, _session.SessionId);
246+
package.Serialize(writeCtx);
247+
var payload = writeCtx.Buffer.ToArray();
248+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
249+
var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken);
250+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
251+
var packageResult = Package.Deserialize(readCtx);
252+
EventMeshClient.EnsureSuccessStatus(package, packageResult);
253+
}
254+
}
255+
256+
private async Task Handle(Action<CloudEvent> callback, CancellationToken cancellationToken)
257+
{
258+
try
259+
{
260+
while (true)
261+
{
262+
cancellationToken.ThrowIfCancellationRequested();
263+
using (var udpClient = new UdpClient())
264+
{
265+
var writeCtx = new WriteBufferContext();
266+
var package = PackageRequestBuilder.ReadNextMessage(_session.SessionId);
267+
package.Serialize(writeCtx);
268+
var payload = writeCtx.Buffer.ToArray();
269+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
270+
var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken);
271+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
272+
var packageResult = Package.Deserialize(readCtx);
273+
EventMeshClient.EnsureSuccessStatus(package, packageResult);
274+
var result = packageResult as ReadNextMessageResponse;
275+
if (result.ContainsMessage)
276+
{
277+
callback(result.CloudEvt);
278+
}
279+
}
280+
281+
Thread.Sleep(200);
282+
}
283+
}
284+
catch { }
285+
}
206286
}
207287
}

src/EventMesh/FaasNet.EventMesh.Client/Extensions/CloudEventExtensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using CloudNative.CloudEvents;
22
using CloudNative.CloudEvents.SystemTextJson;
33
using FaasNet.EventMesh.Client.Messages;
4+
using FaasNet.RaftConsensus.Client.Messages;
45
using System;
6+
using System.Collections.Generic;
57
using System.Linq;
68

79
namespace FaasNet.EventMesh.Client.Extensions
@@ -30,6 +32,12 @@ public static void Serialize(this CloudEvent cloudEvent, WriteBufferContext buff
3032
}
3133
}
3234

35+
public static CloudEvent DeserializeCloudEvent(this IEnumerable<byte> payload)
36+
{
37+
var readBufferContext = new ReadBufferContext(payload.ToArray());
38+
return readBufferContext.DeserializeCloudEvent();
39+
}
40+
3341
public static CloudEvent DeserializeCloudEvent(this ReadBufferContext context)
3442
{
3543
var cloudEventPayload = context.NextByteArray();

src/EventMesh/FaasNet.EventMesh.Client/Extensions/SerializerExtensions.cs

Lines changed: 0 additions & 106 deletions
This file was deleted.

src/EventMesh/FaasNet.EventMesh.Client/Messages/AddBridgeRequest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11

2+
using FaasNet.RaftConsensus.Client.Messages;
3+
24
namespace FaasNet.EventMesh.Client.Messages
35
{
46
public class AddBridgeRequest : Package
Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,35 @@
1-
namespace FaasNet.EventMesh.Client.Messages
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
5+
namespace FaasNet.EventMesh.Client.Messages
26
{
37
public class AddClientRequest : Package
48
{
9+
public AddClientRequest()
10+
{
11+
Purposes = new List<UserAgentPurpose>();
12+
}
13+
514
public string Vpn { get; set; }
615
public string ClientId { get; set; }
16+
public ICollection<UserAgentPurpose> Purposes { get; set; }
717

818
public override void Serialize(WriteBufferContext context)
919
{
1020
base.Serialize(context);
1121
context.WriteString(Vpn);
1222
context.WriteString(ClientId);
23+
context.WriteInteger(Purposes.Count());
24+
foreach (var purpose in Purposes) purpose.Serialize(context);
1325
}
1426

1527
public void Extract(ReadBufferContext context)
1628
{
1729
Vpn = context.NextString();
1830
ClientId = context.NextString();
31+
var nbPurposes = context.NextInt();
32+
for (int i = 0; i < nbPurposes; i++) Purposes.Add(UserAgentPurpose.Deserialize(context));
1933
}
2034
}
2135
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/AddClientResponse.cs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,5 @@
22
{
33
public class AddClientResponse: Package
44
{
5-
public string Queue { get; set; }
6-
7-
public override void Serialize(WriteBufferContext context)
8-
{
9-
base.Serialize(context);
10-
context.WriteString(Queue);
11-
}
12-
13-
public void Extract(ReadBufferContext context)
14-
{
15-
Queue = context.NextString();
16-
}
175
}
186
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/AddVpnRequest.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace FaasNet.EventMesh.Client.Messages
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
3+
namespace FaasNet.EventMesh.Client.Messages
24
{
35
public class AddVpnRequest : Package
46
{

src/EventMesh/FaasNet.EventMesh.Client/Messages/Commands.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
using System;
23
using System.Linq;
34
using System.Reflection;
45

@@ -78,6 +79,14 @@ public class Commands : IEquatable<Commands>
7879
/// Result returned when a VPN is added.
7980
/// </summary>
8081
public static Commands ADD_VPN_RESPONSE = new Commands(17, "ADD_VPN_RESPONSE");
82+
/// <summary>
83+
/// Request sent to read the next message.
84+
/// </summary>
85+
public static Commands READ_NEXT_MESSAGE_REQUEST = new Commands(18, "READ_MESSAGE_REQUEST");
86+
/// <summary>
87+
/// Result returned when message is sent.
88+
/// </summary>
89+
public static Commands READ_NEXT_MESSAGE_RESPONSE = new Commands(19, "READ_MESSAGE_RESPONSE");
8190

8291
private Commands(int code)
8392
{

src/EventMesh/FaasNet.EventMesh.Client/Messages/DisconnectRequest.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11

2+
using FaasNet.RaftConsensus.Client.Messages;
3+
24
namespace FaasNet.EventMesh.Client.Messages
35
{
46
public class DisconnectRequest : Package

src/EventMesh/FaasNet.EventMesh.Client/Messages/Errors.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System;
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
using System;
23

34
namespace FaasNet.EventMesh.Client.Messages
45
{

0 commit comments

Comments
 (0)