Skip to content

Commit bdd2aa7

Browse files
Ticket #107 : Add VPN bridge between two servers + fix some UTS
1 parent fa661ab commit bdd2aa7

20 files changed

Lines changed: 309 additions & 243 deletions

File tree

src/Common/FaasNet.Common/ServerBuilder.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,21 @@ namespace FaasNet.Common
44
{
55
public class ServerBuilder
66
{
7+
private ServiceProvider _serviceProvider;
8+
79
public ServerBuilder(IServiceCollection services)
810
{
911
Services = services;
1012
}
1113

1214
public IServiceCollection Services { get; private set; }
15+
public ServiceProvider ServiceProvider
16+
{
17+
get
18+
{
19+
if(_serviceProvider == null) _serviceProvider = Services.BuildServiceProvider();
20+
return _serviceProvider;
21+
}
22+
}
1323
}
1424
}
93.7 KB
Loading

src/EventMesh/FaasNet.EventMesh.Client/Constants.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace FaasNet.EventMesh.Client
55
{
66
public class Constants
77
{
8-
public const int DefaultPort = 4889;
8+
public const int DefaultPort = 4000;
99
public const string DefaultUrl = "localhost";
1010
public const string DefaultVpn = "default";
1111
public const string DefaultIPAddress = "127.0.0.1";

src/EventMesh/FaasNet.EventMesh.Client/EventMeshClient.cs

Lines changed: 91 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,22 @@ public EventMeshClient(string url = Constants.DefaultUrl, int port = Constants.D
6060
}
6161
}
6262

63-
public async Task<IEventMeshClientPubSession> CreatePubSession(string vpn, string clientId, CancellationToken cancellationToken = default(CancellationToken))
63+
public async Task<IEventMeshClientPubSession> CreatePubSession(string vpn, string clientId, TimeSpan? expirationTime = null, CancellationToken cancellationToken = default(CancellationToken))
6464
{
65-
var pubSession = await CreateSession(vpn, clientId, UserAgentPurpose.PUB, cancellationToken);
66-
return new EventMeshClientPubSession(pubSession, _ipAddr, _port);
65+
var pubSession = await CreateSession(vpn, clientId, UserAgentPurpose.PUB, expirationTime, false, cancellationToken);
66+
return new EventMeshClientPubSession(pubSession, clientId, _ipAddr, _port);
6767
}
6868

69-
public async Task<IEventMeshClientSubSession> CreateSubSession(string vpn, string clientId, CancellationToken cancellationToken = default(CancellationToken))
69+
public async Task<IEventMeshClientPubSession> CreatePubSession(string vpn, string clientId, TimeSpan? expirationTime = null, bool isInfinite = false, CancellationToken cancellationToken = default(CancellationToken))
7070
{
71-
var subSession = await CreateSession(vpn, clientId, UserAgentPurpose.SUB, cancellationToken);
72-
return new EventMeshClientSubSession(subSession, _ipAddr, _port);
71+
var pubSession = await CreateSession(vpn, clientId, UserAgentPurpose.PUB, expirationTime, isInfinite, cancellationToken);
72+
return new EventMeshClientPubSession(pubSession, clientId, _ipAddr, _port);
73+
}
74+
75+
public async Task<IEventMeshClientSubSession> CreateSubSession(string vpn, string clientId, TimeSpan? expirationTime = null, CancellationToken cancellationToken = default(CancellationToken))
76+
{
77+
var subSession = await CreateSession(vpn, clientId, UserAgentPurpose.SUB, expirationTime, false, cancellationToken);
78+
return new EventMeshClientSubSession(subSession, clientId, _ipAddr, _port);
7379
}
7480

7581
public async Task AddVpn(string vpn, CancellationToken cancellationToken = default(CancellationToken))
@@ -137,6 +143,22 @@ public EventMeshClient(string url = Constants.DefaultUrl, int port = Constants.D
137143
}
138144
}
139145

146+
public async Task Disconnect(string clientId, string sessionId, CancellationToken cancellationToken = default(CancellationToken))
147+
{
148+
using(var udpClient = new UdpClient())
149+
{
150+
var writeCtx = new WriteBufferContext();
151+
var package = PackageRequestBuilder.Disconnect(clientId, sessionId);
152+
package.Serialize(writeCtx);
153+
var payload = writeCtx.Buffer.ToArray();
154+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
155+
var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken);
156+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
157+
var packageResult = Package.Deserialize(readCtx);
158+
EnsureSuccessStatus(package, packageResult);
159+
}
160+
}
161+
140162
internal static void EnsureSuccessStatus(Package packageRequest, Package packageResponse)
141163
{
142164
if (packageResponse.Header.Status != HeaderStatus.SUCCESS)
@@ -150,9 +172,9 @@ internal static void EnsureSuccessStatus(Package packageRequest, Package package
150172
}
151173
}
152174

153-
private async Task<HelloResponse> CreateSession(string vpn, string clientId, UserAgentPurpose purpose, CancellationToken cancellationToken = default(CancellationToken))
175+
private async Task<HelloResponse> CreateSession(string vpn, string clientId, UserAgentPurpose purpose, TimeSpan? expirationTime = null, bool isInfinite = false, CancellationToken cancellationToken = default(CancellationToken))
154176
{
155-
var userAgent = new UserAgent { ClientId = clientId, Vpn = vpn, Purpose = purpose };
177+
var userAgent = new UserAgent { ClientId = clientId, Vpn = vpn, Purpose = purpose, Expiration = expirationTime, IsSessionInfinite = isInfinite };
156178
using (var udpClient = new UdpClient())
157179
{
158180
var writeCtx = new WriteBufferContext();
@@ -174,38 +196,43 @@ public interface IEventMeshClientPubSession
174196
Task Publish(string topicName, object obj, CancellationToken cancellationToken = default(CancellationToken));
175197
Task Publish(string topicName, string str, CancellationToken cancellationToken = default(CancellationToken));
176198
Task Publish(string topicName, CloudEvent cloudEvent, CancellationToken cancellationToken = default(CancellationToken));
199+
Task Disconnect(CancellationToken cancellationToken = default(CancellationToken));
177200
}
178201

179202
public interface IEventMeshClientSubSession
180203
{
204+
string SessionId { get; }
181205
Task<SubscriptionResult> PersistedSubscribe(string topicFilter, string groupId, Action<CloudEvent> callback, CancellationToken cancellationToken);
182206
SubscriptionResult DirectSubscribe(string topicFilter, Action<CloudEvent> callback, CancellationToken cancellationToken);
207+
Task Disconnect(CancellationToken cancellationToken = default(CancellationToken));
183208
}
184209

185210
public class SubscriptionResult
186211
{
187-
private readonly CancellationTokenSource _cancellationTokenSource;
212+
private readonly Func<CancellationToken, Task> _disconnectCallback;
188213

189-
internal SubscriptionResult(CancellationTokenSource cancellationTokenSource)
214+
internal SubscriptionResult(Func<CancellationToken, Task> disconnectCallback)
190215
{
191-
_cancellationTokenSource = cancellationTokenSource;
216+
_disconnectCallback = disconnectCallback;
192217
}
193218

194-
public void Close()
219+
public async void Close()
195220
{
196-
_cancellationTokenSource.Cancel();
221+
await _disconnectCallback(CancellationToken.None);
197222
}
198223
}
199224

200225
internal class EventMeshClientPubSession : IEventMeshClientPubSession
201226
{
202227
private readonly HelloResponse _session;
228+
private readonly string _clientId;
203229
private readonly IPAddress _ipAddr;
204230
private readonly int _port;
205231

206-
public EventMeshClientPubSession(HelloResponse session, IPAddress ipAddr, int port)
232+
public EventMeshClientPubSession(HelloResponse session, string clientId, IPAddress ipAddr, int port)
207233
{
208234
_session = session;
235+
_clientId = clientId;
209236
_ipAddr = ipAddr;
210237
_port = port;
211238
}
@@ -245,42 +272,83 @@ public EventMeshClientPubSession(HelloResponse session, IPAddress ipAddr, int po
245272
EventMeshClient.EnsureSuccessStatus(package, packageResult);
246273
}
247274
}
275+
276+
public async Task Disconnect(CancellationToken cancellationToken = default)
277+
{
278+
using (var udpClient = new UdpClient())
279+
{
280+
var writeCtx = new WriteBufferContext();
281+
var package = PackageRequestBuilder.Disconnect(_clientId, _session.SessionId);
282+
package.Serialize(writeCtx);
283+
var payload = writeCtx.Buffer.ToArray();
284+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port));
285+
var resultPayload = await udpClient.ReceiveAsync();
286+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
287+
var packageResult = Package.Deserialize(readCtx);
288+
EventMeshClient.EnsureSuccessStatus(package, packageResult);
289+
}
290+
}
248291
}
249292

250293
internal class EventMeshClientSubSession : IEventMeshClientSubSession
251294
{
252295
private readonly HelloResponse _session;
296+
private readonly string _clientId;
253297
private readonly IPAddress _ipAddr;
254298
private readonly int _port;
255299
private int _offsetEvt;
300+
private CancellationTokenSource _activeSubscriptionCancellationTokenSource = null;
256301

257-
public EventMeshClientSubSession(HelloResponse session, IPAddress ipAddr, int port)
302+
public EventMeshClientSubSession(HelloResponse session, string clientId, IPAddress ipAddr, int port)
258303
{
259304
_session = session;
305+
_clientId = clientId;
260306
_ipAddr = ipAddr;
261307
_port = port;
262308
}
263309

310+
public string SessionId => _session.SessionId;
311+
264312
public async Task<SubscriptionResult> PersistedSubscribe(string topicFilter, string groupId, Action<CloudEvent> callback, CancellationToken cancellationToken)
265313
{
266-
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
267-
var result = new SubscriptionResult(cancellationTokenSource);
314+
if (_activeSubscriptionCancellationTokenSource != null) throw new InvalidOperationException("There is already an active subscription");
315+
_activeSubscriptionCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
316+
var activeSubscription = new SubscriptionResult(Disconnect);
268317
await PersistedSubscribe(topicFilter, groupId, cancellationToken);
269318
#pragma warning disable 4014
270-
Task.Run(async () => await HandlePersistedSubscribe(callback, groupId, cancellationTokenSource.Token));
319+
Task.Run(async () => await HandlePersistedSubscribe(callback, groupId, _activeSubscriptionCancellationTokenSource.Token));
271320
#pragma warning restore 4014
272-
return result;
321+
return activeSubscription;
273322
}
274323

275324
public SubscriptionResult DirectSubscribe(string topicName, Action<CloudEvent> callback, CancellationToken cancellationToken)
276325
{
326+
if (_activeSubscriptionCancellationTokenSource != null) throw new InvalidOperationException("There is already an active subscription");
277327
_offsetEvt = 1;
278-
var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
279-
var result = new SubscriptionResult(cancellationTokenSource);
328+
_activeSubscriptionCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
329+
var activeSubscription = new SubscriptionResult(Disconnect);
280330
#pragma warning disable 4014
281-
Task.Run(async () => await HandleDirectSubscribe(callback, topicName, cancellationTokenSource.Token));
331+
Task.Run(async () => await HandleDirectSubscribe(callback, topicName, _activeSubscriptionCancellationTokenSource.Token));
282332
#pragma warning restore 4014
283-
return result;
333+
return activeSubscription;
334+
}
335+
336+
public async Task Disconnect(CancellationToken cancellationToken = default)
337+
{
338+
using (var udpClient = new UdpClient())
339+
{
340+
var writeCtx = new WriteBufferContext();
341+
var package = PackageRequestBuilder.Disconnect(_clientId, _session.SessionId);
342+
package.Serialize(writeCtx);
343+
var payload = writeCtx.Buffer.ToArray();
344+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port));
345+
var resultPayload = await udpClient.ReceiveAsync();
346+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
347+
var packageResult = Package.Deserialize(readCtx);
348+
EventMeshClient.EnsureSuccessStatus(package, packageResult);
349+
}
350+
351+
if (_activeSubscriptionCancellationTokenSource != null) _activeSubscriptionCancellationTokenSource.Cancel();
284352
}
285353

286354
private async Task PersistedSubscribe(string topicFilter, string groupId, CancellationToken cancellationToken)

src/EventMesh/FaasNet.EventMesh.Common/ConsoleHelper.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ public class ConsoleHelper
2525

2626
public static async Task Start(int seedPort, int amqpPort = 5672)
2727
{
28-
// Ajouter un bridge entre deux VPNS.
2928
// Ajouter des tests unitaires.
3029
// Supporter WS-Socket.
3130
_seedPort = seedPort;
@@ -176,7 +175,7 @@ private static async Task DisplayMenu(ICollection<INodeHost> nodes)
176175
Console.WriteLine("Enter the client identifier");
177176
var clientIdentifier = Console.ReadLine();
178177
var eventMeshClient = new EventMeshClient("localhost", _seedPort);
179-
var session = await eventMeshClient.CreatePubSession(vpn, clientIdentifier, CancellationToken.None);
178+
var session = await eventMeshClient.CreatePubSession(vpn, clientIdentifier, null, CancellationToken.None);
180179
await session.Publish("person.created", new { firstName = "firstName" }, CancellationToken.None);
181180
continue;
182181
}
@@ -190,7 +189,7 @@ private static async Task DisplayMenu(ICollection<INodeHost> nodes)
190189
Console.WriteLine("Enter the group identifier");
191190
var groupId = Console.ReadLine();
192191
var eventMeshClient = new EventMeshClient("localhost", _seedPort);
193-
var session = await eventMeshClient.CreateSubSession(vpn, clientIdentifier, CancellationToken.None);
192+
var session = await eventMeshClient.CreateSubSession(vpn, clientIdentifier, null, CancellationToken.None);
194193
await session.PersistedSubscribe("person.created", groupId, (ce) =>
195194
{
196195
Console.WriteLine("Persisted sub");
@@ -205,7 +204,7 @@ await session.PersistedSubscribe("person.created", groupId, (ce) =>
205204
Console.WriteLine("Enter the client identifier");
206205
var clientIdentifier = Console.ReadLine();
207206
var eventMeshClient = new EventMeshClient("localhost", _seedPort);
208-
var session = await eventMeshClient.CreateSubSession(vpn, clientIdentifier, CancellationToken.None);
207+
var session = await eventMeshClient.CreateSubSession(vpn, clientIdentifier, null, CancellationToken.None);
209208
session.DirectSubscribe("person.created", (ce) =>
210209
{
211210
Console.WriteLine($"Direct sub {ce.Data}");

src/EventMesh/FaasNet.EventMesh.FirstConsole/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ internal class Program
66
{
77
public static void Main(string[] args)
88
{
9+
Console.Title = "First console";
910
ConsoleHelper.Start(4000, 5672).Wait();
1011
}
1112
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/AttachHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private async Task<SubscriptionResult> CreateSubSession(RequestParameter paramet
4141
var attachCmd = parameter.Cmd as Attach;
4242
var source = attachCmd.Source as Source;
4343
var evtMeshClient = new EventMeshClient(_options.EventMeshUrl, _options.EventMeshPort);
44-
var subSession = await evtMeshClient.CreateSubSession(_options.EventMeshVpn, session.ClientId, cancellationToken);
44+
var subSession = await evtMeshClient.CreateSubSession(_options.EventMeshVpn, session.ClientId, null, cancellationToken);
4545
var subscriptionResult = subSession.DirectSubscribe(source.Address, (cb) =>
4646
{
4747
var message = new Message(cb.Data.ToString());
@@ -62,7 +62,7 @@ private async Task<SubscriptionResult> CreateSubSession(RequestParameter paramet
6262
private async Task<IEventMeshClientPubSession> CreatePubSession(StateSessionObject session, CancellationToken cancellationToken)
6363
{
6464
var evtMeshClient = new EventMeshClient(_options.EventMeshUrl, _options.EventMeshPort);
65-
return await evtMeshClient.CreatePubSession(_options.EventMeshVpn, session.ClientId, cancellationToken);
65+
return await evtMeshClient.CreatePubSession(_options.EventMeshVpn, session.ClientId, null, cancellationToken);
6666
}
6767

6868
private static ByteBuffer BuildAttachResponse(ushort channel, Attach requestAttach)

src/EventMesh/FaasNet.EventMesh.Runtime/Constants.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
{
33
public static class Constants
44
{
5+
public const string FilterWildcard = "*";
56
public const string InMemoryBrokername = "inmemory";
67

78
public static class LockNames

src/EventMesh/FaasNet.EventMesh.Runtime/Handlers/AddBridgeVpnMessageHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public AddBridgeVpnMessageHandler(ILogger<AddBridgeVpnMessageHandler> logger, IV
2929
public async Task<EventMeshPackageResult> Run(Package package, IEnumerable<IPeerHost> peers, CancellationToken cancellationToken)
3030
{
3131
var addBridgeRequest = package as AddBridgeRequest;
32-
if (!(await IsServerReachable(addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort, cancellationToken))) return EventMeshPackageResult.SendResult(PackageResponseBuilder.Error(Commands.ADD_BRIDGE_RESPONSE, addBridgeRequest.Header.Seq, Errors.TARGET_NOT_REACHABLE));
32+
if (!(await IsServerReachable(addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort, cancellationToken))) return EventMeshPackageResult.SendResult(PackageResponseBuilder.Error(Commands.ADD_BRIDGE_REQUEST, addBridgeRequest.Header.Seq, Errors.TARGET_NOT_REACHABLE));
3333
var allTargetVpns = await GetAllVpns(addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort, cancellationToken);
3434
if (!allTargetVpns.Contains(addBridgeRequest.TargetVpn)) return EventMeshPackageResult.SendResult(PackageResponseBuilder.Error(Commands.ADD_BRIDGE_REQUEST, addBridgeRequest.Header.Seq, Errors.UNKNOWN_TARGET_VPN));
3535
var currentVpns = await _vpnStore.GetAll(cancellationToken);

src/EventMesh/FaasNet.EventMesh.Runtime/Handlers/PublishMessageRequestHandler.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,8 @@ private async Task CheckSession(PublishMessageRequest message, CancellationToken
6060
activity?.SetStatus(ActivityStatusCode.Ok);
6161
}
6262

63-
if (sessionResult.ClientSession.Purpose != UserAgentPurpose.PUB)
64-
{
65-
throw new RuntimeException(message.Header.Command, message.Header.Seq, Errors.UNAUTHORIZED_PUBLISH);
66-
}
63+
if (sessionResult.ClientSession.Purpose != UserAgentPurpose.PUB) throw new RuntimeException(message.Header.Command, message.Header.Seq, Errors.UNAUTHORIZED_PUBLISH);
64+
if (!sessionResult.ClientSession.IsActive) throw new RuntimeException(message.Header.Command, message.Header.Seq, Errors.NO_ACTIVE_SESSION);
6765
}
6866

6967
private bool CheckPeerExists(IEnumerable<IPeerHost> peers, string topicName)

0 commit comments

Comments
 (0)