Skip to content

Commit 67af85b

Browse files
Ticket #102: Persist client information in EventMesh + start to use consensus & gossip
1 parent 8657936 commit 67af85b

54 files changed

Lines changed: 779 additions & 501 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
namespace FaasNet.EventMesh.Client.Messages
2+
{
3+
public class AddClientRequest : Package
4+
{
5+
public string Vpn { get; set; }
6+
public string ClientId { get; set; }
7+
8+
public override void Serialize(WriteBufferContext context)
9+
{
10+
base.Serialize(context);
11+
context.WriteString(Vpn);
12+
context.WriteString(ClientId);
13+
}
14+
15+
public void Extract(ReadBufferContext context)
16+
{
17+
Vpn = context.NextString();
18+
ClientId = context.NextString();
19+
}
20+
}
21+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
namespace FaasNet.EventMesh.Client.Messages
2+
{
3+
public class AddClientResponse: Package
4+
{
5+
public string Queue { get; set; }
6+
7+
public override void Serialize(WriteBufferContext context)
8+
{
9+
base.Serialize(context);
10+
context.WriteString(Queue);
11+
}
12+
13+
public void Extract(ReadBufferContext context)
14+
{
15+
Queue = context.NextString();
16+
}
17+
}
18+
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/Commands.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ public class Commands : IEquatable<Commands>
7575
/// Get all VPN response.
7676
/// </summary>
7777
public static Commands GET_ALL_VPNS_RESPONSE = new Commands(17, "GET_ALL_VPNS_RESPONSE");
78+
/// <summary>
79+
/// Request sent to create a client.
80+
/// </summary>
81+
public static Commands ADD_CLIENT_REQUEST = new Commands(18, "ADD_CLIENT_REQUEST");
82+
/// <summary>
83+
/// Result returned when a client is added;
84+
/// </summary>
85+
public static Commands ADD_CLIENT_RESPONSE = new Commands(19, "ADD_CLIENT_RESPONSE");
7886

7987
private Commands(int code)
8088
{

src/EventMesh/FaasNet.EventMesh.Client/Messages/Package.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ public static Package Deserialize(ReadBufferContext context)
4040
return result;
4141
}
4242

43+
if (Commands.SUBSCRIBE_RESPONSE == header.Command)
44+
{
45+
var result = new SubscriptionResult
46+
{
47+
Header = header
48+
};
49+
result.Extract(context);
50+
return result;
51+
}
52+
4353
if (Commands.ASYNC_MESSAGE_TO_CLIENT == header.Command)
4454
{
4555
var result = new AsyncMessageToClient

src/EventMesh/FaasNet.EventMesh.Client/Messages/PackageResponseBuilder.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ public static Package Hello(string seq, string sessionId)
2424
return result;
2525
}
2626

27+
public static Package Client(string seq, string queue)
28+
{
29+
var result = new AddClientResponse
30+
{
31+
Header = new Header(Commands.ADD_CLIENT_RESPONSE, HeaderStatus.SUCCESS, seq),
32+
Queue = queue
33+
};
34+
return result;
35+
}
36+
2737
public static Package GetAllVpns(string seq, IEnumerable<string> vpns)
2838
{
2939
var result = new GetAllVpnResponse
@@ -34,9 +44,9 @@ public static Package GetAllVpns(string seq, IEnumerable<string> vpns)
3444
return result;
3545
}
3646

37-
public static Package Subscription(string seq)
47+
public static Package Subscription(string seq, IEnumerable<string> queueNames)
3848
{
39-
var result = new Package
49+
var result = new SubscriptionResult
4050
{
4151
Header = new Header(Commands.SUBSCRIBE_RESPONSE, HeaderStatus.SUCCESS, seq)
4252
};
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System.Collections.Generic;
2+
3+
namespace FaasNet.EventMesh.Client.Messages
4+
{
5+
public class SubscriptionResult : Package
6+
{
7+
public IEnumerable<string> QueueNames { get; set; }
8+
9+
public override void Serialize(WriteBufferContext context)
10+
{
11+
base.Serialize(context);
12+
context.WriteStringArray(QueueNames);
13+
}
14+
15+
public void Extract(ReadBufferContext context)
16+
{
17+
QueueNames = context.NextStringArray();
18+
}
19+
}
20+
}

src/EventMesh/FaasNet.EventMesh.Client/RuntimeClient.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace FaasNet.EventMesh.Client
1414
{
15-
public class RuntimeClient
15+
public class RuntimeClient: IDisposable
1616
{
1717
private UdpClient _udpClient;
1818
private readonly IPAddress _clientIPAddress;
@@ -131,10 +131,12 @@ public UdpClient UdpClient
131131

132132
public void Close()
133133
{
134-
if (_udpClient != null)
135-
{
136-
_udpClient.Close();
137-
}
134+
if (_udpClient != null) _udpClient.Close();
135+
}
136+
137+
public void Dispose()
138+
{
139+
Close();
138140
}
139141

140142
public async Task<Package> AddBridge(string vpn, string urn, int port, string targetVpn, CancellationToken cancellationToken = default(CancellationToken))

src/EventMesh/FaasNet.EventMesh.Runtime.EF/Configurations/BridgeServerConfiguration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public class BridgeServerConfiguration : IEntityTypeConfiguration<BridgeServer>
88
{
99
public void Configure(EntityTypeBuilder<BridgeServer> builder)
1010
{
11-
builder.HasKey(b => b.Urn);
11+
builder.HasKey(b => b.TargetUrn);
1212
}
1313
}
1414
}

src/EventMesh/FaasNet.EventMesh.Runtime/EventMeshNode.cs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public class EventMeshNode : BaseNodeHost
1818
{
1919
private readonly IEnumerable<IMessageHandler> _messageHandlers;
2020

21-
public EventMeshNode(IEnumerable<IMessageHandler> messageHandlers, IPeerStore peerStore, RaftConsensus.Core.Stores.INodeStateStore nodeStateStore, IPeerHostFactory peerHostFactory, ILogger<BaseNodeHost> logger, IOptions<ConsensusPeerOptions> options, IEnumerable<RaftConsensus.Core.INodeStateStore> requestHandlers) : base(peerStore, peerHostFactory, logger, options, requestHandlers)
21+
public EventMeshNode(IEnumerable<IMessageHandler> messageHandlers, IPeerStore peerStore, IPeerInfoStore peerInfoStore, INodeStateStore nodeStateStore, IClusterStore clusterStore, IPeerHostFactory peerHostFactory, ILogger<BaseNodeHost> logger, IOptions<ConsensusPeerOptions> options) : base(peerStore, peerInfoStore, peerHostFactory, nodeStateStore, clusterStore, logger, options)
2222
{
2323
_messageHandlers = messageHandlers;
2424
}
@@ -34,20 +34,20 @@ protected override async Task HandleUDPPackage(UdpReceiveResult udpResult, Cance
3434
Logger.LogInformation("Command {command} is received with sequence {sequence}", package.Header.Command.Name, package.Header.Seq);
3535
var cmd = package.Header.Command;
3636
var messageHandler = _messageHandlers.First(m => m.Command == package.Header.Command);
37-
Package result = null;
37+
EventMeshPackageResult result = null;
3838
try
3939
{
4040
result = await messageHandler.Run(package, cancellationToken);
4141
}
4242
catch (RuntimeException ex)
4343
{
4444
Logger.LogError("Command {command}, sequence {sequence}, exception {exception}", package.Header.Command.Name, package.Header.Seq, ex.ToString());
45-
result = PackageResponseBuilder.Error(ex.SourceCommand, ex.SourceSeq, ex.Error);
45+
result = EventMeshPackageResult.SendResult(PackageResponseBuilder.Error(ex.SourceCommand, ex.SourceSeq, ex.Error));
4646
}
4747
catch (Exception ex)
4848
{
4949
Logger.LogError("Command {command}, sequence {sequence}, exception {exception}", package.Header.Command.Name, package.Header.Seq, ex.ToString());
50-
result = PackageResponseBuilder.Error(package.Header.Command, package.Header.Seq, Errors.INTERNAL_ERROR);
50+
result = EventMeshPackageResult.SendResult(PackageResponseBuilder.Error(package.Header.Command, package.Header.Seq, Errors.INTERNAL_ERROR));
5151
}
5252

5353
if (result == null)
@@ -56,12 +56,22 @@ protected override async Task HandleUDPPackage(UdpReceiveResult udpResult, Cance
5656
return;
5757
}
5858

59-
EventMeshMeter.IncrementNbOutgoingRequest();
60-
Logger.LogInformation("Command {command} with sequence {sequence} is going to be sent", result.Header.Command.Name, result.Header.Seq);
61-
var writeCtx = new WriteBufferContext();
62-
result.Serialize(writeCtx);
63-
var resultPayload = writeCtx.Buffer.ToArray();
64-
// await _udpClient.SendAsync(resultPayload, resultPayload.Count(), receiveResult.RemoteEndPoint).WithCancellation(_cancellationToken);
59+
if(result.Status == EventMeshPackageResultStatus.ADD_PEER)
60+
{
61+
await AddPeer(result.Termid);
62+
await StartPeer(result.Termid);
63+
}
64+
65+
if(result.Status == EventMeshPackageResultStatus.SEND_RESULT)
66+
{
67+
EventMeshMeter.IncrementNbOutgoingRequest();
68+
Logger.LogInformation("Command {command} with sequence {sequence} is going to be sent", result.Package.Header.Command.Name, result.Package.Header.Seq);
69+
var writeCtx = new WriteBufferContext();
70+
result.Package.Serialize(writeCtx);
71+
var resultPayload = writeCtx.Buffer.ToArray();
72+
// await _udpClient.SendAsync(resultPayload, resultPayload.Count(), receiveResult.RemoteEndPoint).WithCancellation(_cancellationToken);
73+
}
74+
6575
activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Ok);
6676
}
6777
catch (Exception)
@@ -71,7 +81,5 @@ protected override async Task HandleUDPPackage(UdpReceiveResult udpResult, Cance
7181
}
7282
}
7383
}
74-
75-
// Add state.
7684
}
7785
}

src/EventMesh/FaasNet.EventMesh.Runtime/Handlers/AddBridgeMessageHandler.cs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,64 @@
22
using FaasNet.EventMesh.Client.Exceptions;
33
using FaasNet.EventMesh.Client.Messages;
44
using FaasNet.EventMesh.Runtime.Exceptions;
5+
using FaasNet.EventMesh.Runtime.Models;
56
using FaasNet.EventMesh.Runtime.Stores;
6-
using System.Net;
7+
using System.Linq;
78
using System.Threading;
89
using System.Threading.Tasks;
910

1011
namespace FaasNet.EventMesh.Runtime.Handlers
1112
{
1213
public class AddBridgeMessageHandler : IMessageHandler
1314
{
14-
private readonly IUdpClientServerFactory _udpClientFactory;
1515
private readonly IVpnStore _vpnStore;
16+
private readonly IBridgeStore _bridgeStore;
1617

17-
public AddBridgeMessageHandler(
18-
IUdpClientServerFactory udpClientFactory,
19-
IVpnStore vpnStore)
18+
public AddBridgeMessageHandler(IVpnStore vpnStore, IBridgeStore bridgeStore)
2019
{
21-
_udpClientFactory = udpClientFactory;
2220
_vpnStore = vpnStore;
21+
_bridgeStore = bridgeStore;
2322
}
2423

2524
public Commands Command => Commands.ADD_BRIDGE_REQUEST;
2625

27-
public async Task<Package> Run(Package package, IPEndPoint sender, CancellationToken cancellationToken)
26+
public async Task<EventMeshPackageResult> Run(Package package, CancellationToken cancellationToken)
2827
{
2928
var addBridgeRequest = package as AddBridgeRequest;
29+
await CheckVpn(addBridgeRequest, cancellationToken);
30+
await CheckBridgeServer(addBridgeRequest, cancellationToken);
31+
await _bridgeStore.Add(BridgeServer.Create(addBridgeRequest.Vpn, addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort, addBridgeRequest.TargetVpn), cancellationToken);
32+
return EventMeshPackageResult.SendResult(PackageResponseBuilder.AddBridge(package.Header.Seq));
33+
}
34+
35+
private async Task CheckVpn(AddBridgeRequest addBridgeRequest, CancellationToken cancellationToken)
36+
{
3037
var vpn = await _vpnStore.Get(addBridgeRequest.Vpn, cancellationToken);
3138
if (vpn == null)
3239
{
3340
throw new RuntimeException(addBridgeRequest.Header.Command, addBridgeRequest.Header.Seq, Errors.UNKNOWN_VPN);
3441
}
42+
}
3543

36-
var bridgeServer = vpn.GetBridgeServer(addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort, addBridgeRequest.TargetVpn);
37-
if (bridgeServer != null)
44+
private async Task CheckBridgeServer(AddBridgeRequest addBridgeRequest, CancellationToken cancellationToken)
45+
{
46+
var bridgeServers = await _bridgeStore.GetAll(cancellationToken);
47+
if (bridgeServers.Any(b => b.SourceVpn == addBridgeRequest.Vpn && b.TargetUrn == addBridgeRequest.TargetUrn && b.TargetPort == addBridgeRequest.TargetPort && b.TargetVpn == addBridgeRequest.TargetVpn))
3848
{
3949
throw new RuntimeException(addBridgeRequest.Header.Command, addBridgeRequest.Header.Seq, Errors.BRIDGE_EXISTS);
4050
}
4151

42-
var udpClient = _udpClientFactory.Build();
4352
try
4453
{
45-
var runtimeClient = new RuntimeClient(udpClient, addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort);
46-
await runtimeClient.HeartBeat();
54+
using (var runtimeClient = new RuntimeClient(addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort))
55+
{
56+
await runtimeClient.HeartBeat();
57+
}
4758
}
48-
catch(RuntimeClientException)
59+
catch (RuntimeClientException)
4960
{
5061
throw new RuntimeException(addBridgeRequest.Header.Command, addBridgeRequest.Header.Seq, Errors.INVALID_BRIDGE);
5162
}
52-
53-
vpn.AddBridge(addBridgeRequest.TargetUrn, addBridgeRequest.TargetPort, addBridgeRequest.TargetVpn);
54-
return PackageResponseBuilder.AddBridge(package.Header.Seq);
5563
}
5664
}
5765
}

0 commit comments

Comments
 (0)