Skip to content

Commit 43de050

Browse files
Ticket #102 : Start to implement GOSSIP protocol
1 parent dba8508 commit 43de050

28 files changed

Lines changed: 444 additions & 74 deletions

FaasNet.EventMesh.sln

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
3-
# Visual Studio Version 17
4-
VisualStudioVersion = 17.1.32328.378
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.31624.102
55
MinimumVisualStudioVersion = 10.0.40219.1
6-
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "01. Core Layer", "01. Core Layer", "{6E495E0A-0DC8-4E42-8C58-3C48506D3D24}"
6+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "02. Core Layer", "02. Core Layer", "{6E495E0A-0DC8-4E42-8C58-3C48506D3D24}"
77
EndProject
8-
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "03. Testing Layer", "03. Testing Layer", "{2A14F697-FFC8-402F-82B4-8128FE897DC9}"
8+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "04. Testing Layer", "04. Testing Layer", "{2A14F697-FFC8-402F-82B4-8128FE897DC9}"
99
EndProject
10-
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "02. Startup Layer", "02. Startup Layer", "{20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}"
10+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "03. Startup Layer", "03. Startup Layer", "{20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}"
1111
EndProject
1212
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Runtime.Tests", "tests\FaasNet.EventMesh.Runtime.Tests\FaasNet.EventMesh.Runtime.Tests.csproj", "{AA3F0C66-7575-47D9-B45E-6B38F78E632D}"
1313
EndProject
@@ -33,6 +33,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "00. OpenTelemetry", "00. Op
3333
EndProject
3434
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.IntegrationEvents", "src\EventMesh\FaasNet.EventMesh.IntegrationEvents\FaasNet.EventMesh.IntegrationEvents.csproj", "{BE0F462B-1C13-4160-8A71-7FCDDE0C6BF3}"
3535
EndProject
36+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "01. Raft consensus Layer", "01. Raft consensus Layer", "{7A13262A-D1E6-4210-BA95-03A0741CBFA4}"
37+
EndProject
38+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.RaftConsensus.Core", "src\RaftConsensus\FaasNet.RaftConsensus.Core\FaasNet.RaftConsensus.Core.csproj", "{69259CDF-1E32-4310-9155-BC095FFDBECD}"
39+
EndProject
40+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.RaftConsensus.Client", "src\RaftConsensus\FaasNet.RaftConsensus.Client\FaasNet.RaftConsensus.Client.csproj", "{9FD70B89-220D-47E7-958D-E10F493F9436}"
41+
EndProject
3642
Global
3743
GlobalSection(SolutionConfigurationPlatforms) = preSolution
3844
Debug|Any CPU = Debug|Any CPU
@@ -83,6 +89,14 @@ Global
8389
{BE0F462B-1C13-4160-8A71-7FCDDE0C6BF3}.Debug|Any CPU.Build.0 = Debug|Any CPU
8490
{BE0F462B-1C13-4160-8A71-7FCDDE0C6BF3}.Release|Any CPU.ActiveCfg = Release|Any CPU
8591
{BE0F462B-1C13-4160-8A71-7FCDDE0C6BF3}.Release|Any CPU.Build.0 = Release|Any CPU
92+
{69259CDF-1E32-4310-9155-BC095FFDBECD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
93+
{69259CDF-1E32-4310-9155-BC095FFDBECD}.Debug|Any CPU.Build.0 = Debug|Any CPU
94+
{69259CDF-1E32-4310-9155-BC095FFDBECD}.Release|Any CPU.ActiveCfg = Release|Any CPU
95+
{69259CDF-1E32-4310-9155-BC095FFDBECD}.Release|Any CPU.Build.0 = Release|Any CPU
96+
{9FD70B89-220D-47E7-958D-E10F493F9436}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
97+
{9FD70B89-220D-47E7-958D-E10F493F9436}.Debug|Any CPU.Build.0 = Debug|Any CPU
98+
{9FD70B89-220D-47E7-958D-E10F493F9436}.Release|Any CPU.ActiveCfg = Release|Any CPU
99+
{9FD70B89-220D-47E7-958D-E10F493F9436}.Release|Any CPU.Build.0 = Release|Any CPU
86100
EndGlobalSection
87101
GlobalSection(SolutionProperties) = preSolution
88102
HideSolutionNode = FALSE
@@ -99,6 +113,8 @@ Global
99113
{33040232-E628-448D-B85F-BC68D757D24B} = {3A7C8C56-A93C-4722-9397-EF82EB5E4655}
100114
{1186F448-3722-443F-A0C8-A0D4216D23D9} = {3A7C8C56-A93C-4722-9397-EF82EB5E4655}
101115
{BE0F462B-1C13-4160-8A71-7FCDDE0C6BF3} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
116+
{69259CDF-1E32-4310-9155-BC095FFDBECD} = {7A13262A-D1E6-4210-BA95-03A0741CBFA4}
117+
{9FD70B89-220D-47E7-958D-E10F493F9436} = {7A13262A-D1E6-4210-BA95-03A0741CBFA4}
102118
EndGlobalSection
103119
GlobalSection(ExtensibilityGlobals) = postSolution
104120
SolutionGuid = {B9BD3B8C-B2C9-468F-BF54-66BFE9B565EC}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
using FaasNet.EventMesh.Client.Messages;
2+
using FaasNet.EventMesh.Runtime.Exceptions;
3+
using FaasNet.EventMesh.Runtime.Handlers;
4+
using FaasNet.RaftConsensus.Core;
5+
using FaasNet.RaftConsensus.Core.Stores;
6+
using Microsoft.Extensions.Logging;
7+
using Microsoft.Extensions.Options;
8+
using System;
9+
using System.Collections.Generic;
10+
using System.Linq;
11+
using System.Net.Sockets;
12+
using System.Threading;
13+
using System.Threading.Tasks;
14+
15+
namespace FaasNet.EventMesh.Runtime
16+
{
17+
public class EventMeshNode : BaseNodeHost
18+
{
19+
private readonly IEnumerable<IMessageHandler> _messageHandlers;
20+
21+
public EventMeshNode(IEnumerable<IMessageHandler> messageHandlers, IPeerStore peerStore, IPeerHostFactory peerHostFactory, ILogger<BaseNodeHost> logger, IOptions<ConsensusPeerOptions> options) : base(peerStore, peerHostFactory, logger, options)
22+
{
23+
_messageHandlers = messageHandlers;
24+
}
25+
26+
protected override async Task HandleUDPPackage(UdpReceiveResult udpResult, CancellationToken cancellationToken)
27+
{
28+
var package = Package.Deserialize(new ReadBufferContext(udpResult.Buffer.ToArray()));
29+
using (var activity = EventMeshMeter.RequestActivitySource.StartActivity(package.Header.Command.Name))
30+
{
31+
try
32+
{
33+
EventMeshMeter.IncrementNbIncomingRequest();
34+
Logger.LogInformation("Command {command} is received with sequence {sequence}", package.Header.Command.Name, package.Header.Seq);
35+
var cmd = package.Header.Command;
36+
var messageHandler = _messageHandlers.First(m => m.Command == package.Header.Command);
37+
Package result = null;
38+
try
39+
{
40+
result = await messageHandler.Run(package, cancellationToken);
41+
}
42+
catch (RuntimeException ex)
43+
{
44+
Logger.LogError("Command {command}, sequence {sequence}, exception {exception}", package.Header.Command.Name, package.Header.Seq, ex.ToString());
45+
result = PackageResponseBuilder.Error(ex.SourceCommand, ex.SourceSeq, ex.Error);
46+
}
47+
catch (Exception ex)
48+
{
49+
Logger.LogError("Command {command}, sequence {sequence}, exception {exception}", package.Header.Command.Name, package.Header.Seq, ex.ToString());
50+
result = PackageResponseBuilder.Error(package.Header.Command, package.Header.Seq, Errors.INTERNAL_ERROR);
51+
}
52+
53+
if (result == null)
54+
{
55+
activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Error);
56+
return;
57+
}
58+
59+
EventMeshMeter.IncrementNbOutgoingRequest();
60+
Logger.LogInformation("Command {command} with sequence {sequence} is going to be sent", result.Header.Command.Name, result.Header.Seq);
61+
var writeCtx = new WriteBufferContext();
62+
result.Serialize(writeCtx);
63+
var resultPayload = writeCtx.Buffer.ToArray();
64+
// await _udpClient.SendAsync(resultPayload, resultPayload.Count(), receiveResult.RemoteEndPoint).WithCancellation(_cancellationToken);
65+
activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Ok);
66+
}
67+
catch (Exception)
68+
{
69+
activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Error);
70+
throw;
71+
}
72+
}
73+
}
74+
}
75+
}

src/EventMesh/FaasNet.EventMesh.Runtime/RuntimeHost.cs renamed to src/EventMesh/FaasNet.EventMesh.Runtime/EventMeshPeer.cs

Lines changed: 67 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
using CloudNative.CloudEvents;
2-
using FaasNet.EventMesh.Client.Extensions;
3-
using FaasNet.EventMesh.Client.Messages;
4-
using FaasNet.EventMesh.Runtime.Events;
1+
using FaasNet.EventMesh.Client.Messages;
52
using FaasNet.EventMesh.Runtime.Exceptions;
63
using FaasNet.EventMesh.Runtime.Handlers;
74
using FaasNet.EventMesh.Runtime.Stores;
5+
using FaasNet.RaftConsensus.Core;
6+
using FaasNet.RaftConsensus.Core.Stores;
87
using Microsoft.Extensions.Logging;
98
using Microsoft.Extensions.Options;
109
using System;
@@ -16,23 +15,80 @@
1615

1716
namespace FaasNet.EventMesh.Runtime
1817
{
19-
public class RuntimeHost: IRuntimeHost
18+
public class EventMeshPeer: BasePeerHost
2019
{
2120
private readonly IEnumerable<IMessageHandler> _messageHandlers;
2221
private readonly IEnumerable<IMessageConsumer> _messageConsumers;
2322
private readonly IClientStore _clientStore;
2423
private readonly IUdpClientServerFactory _udpClientFactory;
25-
private readonly ILogger<RuntimeHost> _logger;
24+
private readonly ILogger<EventMeshPeer> _logger;
2625
private readonly RuntimeOptions _options;
2726
private CancellationTokenSource _tokenSource;
2827
private CancellationToken _cancellationToken;
2928
private UdpClient _udpClient;
3029

31-
public RuntimeHost(
30+
public EventMeshPeer(IEnumerable<IMessageHandler> messageHandlers, ILogger<BasePeerHost> logger, IOptions<ConsensusPeerOptions> options, IClusterStore clusterStore, ILogStore logStore, IPeerStore peerStore) : base(logger, options, clusterStore, logStore, peerStore)
31+
{
32+
_messageHandlers = messageHandlers;
33+
}
34+
35+
protected override async Task<bool> HandlePackage(UdpReceiveResult udpResult)
36+
{
37+
var package = Package.Deserialize(new ReadBufferContext(udpResult.Buffer.ToArray()));
38+
using (var activity = EventMeshMeter.RequestActivitySource.StartActivity(package.Header.Command.Name))
39+
{
40+
try
41+
{
42+
EventMeshMeter.IncrementNbIncomingRequest();
43+
_logger.LogInformation("Command {command} is received with sequence {sequence}", package.Header.Command.Name, package.Header.Seq);
44+
var cmd = package.Header.Command;
45+
var messageHandler = _messageHandlers.First(m => m.Command == package.Header.Command);
46+
Package result = null;
47+
try
48+
{
49+
result = await messageHandler.Run(package, _cancellationToken);
50+
}
51+
catch (RuntimeException ex)
52+
{
53+
_logger.LogError("Command {command}, sequence {sequence}, exception {exception}", package.Header.Command.Name, package.Header.Seq, ex.ToString());
54+
result = PackageResponseBuilder.Error(ex.SourceCommand, ex.SourceSeq, ex.Error);
55+
}
56+
catch (Exception ex)
57+
{
58+
_logger.LogError("Command {command}, sequence {sequence}, exception {exception}", package.Header.Command.Name, package.Header.Seq, ex.ToString());
59+
result = PackageResponseBuilder.Error(package.Header.Command, package.Header.Seq, Errors.INTERNAL_ERROR);
60+
}
61+
62+
if (result == null)
63+
{
64+
activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Error);
65+
return false;
66+
}
67+
68+
EventMeshMeter.IncrementNbOutgoingRequest();
69+
_logger.LogInformation("Command {command} with sequence {sequence} is going to be sent", result.Header.Command.Name, result.Header.Seq);
70+
var writeCtx = new WriteBufferContext();
71+
result.Serialize(writeCtx);
72+
var resultPayload = writeCtx.Buffer.ToArray();
73+
// await _udpClient.SendAsync(resultPayload, resultPayload.Count(), receiveResult.RemoteEndPoint).WithCancellation(_cancellationToken);
74+
activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Ok);
75+
}
76+
catch (Exception)
77+
{
78+
activity?.SetStatus(System.Diagnostics.ActivityStatusCode.Error);
79+
throw;
80+
}
81+
}
82+
83+
return true;
84+
}
85+
86+
/*
87+
public EventMeshPeer(
3288
IEnumerable<IMessageHandler> messageHandlers,
3389
IEnumerable<IMessageConsumer> messageConsumers,
3490
IUdpClientServerFactory udpClientFactory,
35-
ILogger<RuntimeHost> logger,
91+
ILogger<EventMeshPeer> logger,
3692
IClientStore clientStore,
3793
IOptions<RuntimeOptions> options)
3894
{
@@ -131,6 +187,8 @@ private async Task HandleEventMeshPackage()
131187
EventMeshPackageReceived(this, new PackageEventArgs(package));
132188
}
133189
190+
// La requête heartbeat peut être traitée que par le leader.
191+
// La requête hello message peut être traitée que par le leader.
134192
using (var activity = EventMeshMeter.RequestActivitySource.StartActivity(package.Header.Command.Name))
135193
{
136194
try
@@ -238,5 +296,6 @@ private async Task HandleCloudEventReceived(object sender, CloudEventArgs e)
238296
}
239297
240298
#endregion
299+
*/
241300
}
242301
}

src/EventMesh/FaasNet.EventMesh.Runtime/FaasNet.EventMesh.Runtime.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<PackageReference Include="OpenTelemetry" Version="1.2.0" />
99
</ItemGroup>
1010
<ItemGroup>
11+
<ProjectReference Include="..\..\RaftConsensus\FaasNet.RaftConsensus.Core\FaasNet.RaftConsensus.Core.csproj" />
1112
<ProjectReference Include="..\FaasNet.EventMesh.Client\FaasNet.EventMesh.Client.csproj" />
1213
<ProjectReference Include="..\FaasNet.EventMesh.IntegrationEvents\FaasNet.EventMesh.IntegrationEvents.csproj" />
1314
</ItemGroup>

src/EventMesh/FaasNet.EventMesh.Runtime/Handlers/HelloMessageHandler.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using Microsoft.Extensions.Options;
55
using System;
66
using System.Linq;
7-
using System.Net;
87
using System.Threading;
98
using System.Threading.Tasks;
109

@@ -25,7 +24,7 @@ public HelloMessageHandler(IVpnStore vpnStore, IClientStore clientStore, IOption
2524

2625
public Commands Command => Commands.HELLO_REQUEST;
2726

28-
public async Task<Package> Run(Package package, IPEndPoint sender, CancellationToken cancellationToken)
27+
public async Task<Package> Run(Package package, CancellationToken cancellationToken)
2928
{
3029
var helloRequest = package as HelloRequest;
3130
Models.Vpn vpn = null;
@@ -57,7 +56,7 @@ public async Task<Package> Run(Package package, IPEndPoint sender, CancellationT
5756
throw new RuntimeException(helloRequest.Header.Command, helloRequest.Header.Seq, Errors.NOT_AUTHORIZED);
5857
}
5958

60-
var sessionId = client.AddSession(sender,
59+
var sessionId = client.AddSession(
6160
helloRequest.UserAgent.Environment,
6261
helloRequest.UserAgent.Pid,
6362
helloRequest.UserAgent.Purpose,
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using FaasNet.EventMesh.Client.Messages;
2-
using System.Net;
32
using System.Threading;
43
using System.Threading.Tasks;
54

@@ -8,6 +7,6 @@ namespace FaasNet.EventMesh.Runtime.Handlers
87
public interface IMessageHandler
98
{
109
Commands Command { get; }
11-
Task<Package> Run(Package package, IPEndPoint sender, CancellationToken cancellationToken);
10+
Task<Package> Run(Package package, CancellationToken cancellationToken);
1211
}
1312
}

src/EventMesh/FaasNet.EventMesh.Runtime/IRuntimeHost.cs

Lines changed: 0 additions & 11 deletions
This file was deleted.

src/EventMesh/FaasNet.EventMesh.Runtime/Models/Client.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ public void ConsumeCloudEvents(string brokerName, string topicName, int nbEvents
8484
topic.Offset += nbEventsConsumed;
8585
}
8686

87-
public string AddSession(IPEndPoint endpoint, string env, int pid, UserAgentPurpose purpose, int bufferCloudEvents, bool isServer, string vpn, TimeSpan expirationTimeSpan, bool isSessionInfinite)
87+
public string AddSession(string env, int pid, UserAgentPurpose purpose, int bufferCloudEvents, bool isServer, string vpn, TimeSpan expirationTimeSpan, bool isSessionInfinite)
8888
{
89-
var session = ClientSession.Create(endpoint, env, pid, purpose, bufferCloudEvents, vpn, isServer ? ClientSessionTypes.SERVER : ClientSessionTypes.CLIENT, expirationTimeSpan, isSessionInfinite);
89+
var session = ClientSession.Create(env, pid, purpose, bufferCloudEvents, vpn, isServer ? ClientSessionTypes.SERVER : ClientSessionTypes.CLIENT, expirationTimeSpan, isSessionInfinite);
9090
session.Activate();
9191
Sessions.Add(session);
9292
return session.Id;

src/EventMesh/FaasNet.EventMesh.Runtime/Models/ClientSession.cs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System;
44
using System.Collections.Generic;
55
using System.Linq;
6-
using System.Net;
76

87
namespace FaasNet.EventMesh.Runtime.Models
98
{
@@ -19,22 +18,8 @@ public ClientSession()
1918

2019
#region Properties
2120

22-
public IPEndPoint Endpoint
23-
{
24-
get
25-
{
26-
return new IPEndPoint(new IPAddress(IPAddressData), Port);
27-
}
28-
set
29-
{
30-
Port = value.Port;
31-
IPAddressData = value.Address.GetAddressBytes();
32-
}
33-
}
34-
3521
public string Id { get; set; }
3622
public string Vpn { get; set; }
37-
public byte[] IPAddressData { get; set; }
3823
public int Port { get; set; }
3924
public string Environment { get; set; }
4025
public int Pid { get; set; }
@@ -129,14 +114,13 @@ public void Close()
129114

130115
#endregion
131116

132-
public static ClientSession Create(IPEndPoint edp, string env, int pid, UserAgentPurpose purpose, int bufferCloudEvents, string vpn, ClientSessionTypes type, TimeSpan expirationTimeSpan, bool isSessionInfinite)
117+
public static ClientSession Create(string env, int pid, UserAgentPurpose purpose, int bufferCloudEvents, string vpn, ClientSessionTypes type, TimeSpan expirationTimeSpan, bool isSessionInfinite)
133118
{
134119
var createDateTime = DateTime.UtcNow;
135120
var expirationDateTime = createDateTime.Add(expirationTimeSpan);
136121
var result = new ClientSession
137122
{
138123
Id = Guid.NewGuid().ToString(),
139-
Endpoint = edp,
140124
Environment = env,
141125
Vpn = vpn,
142126
Pid = pid,

src/EventMesh/FaasNet.EventMesh.Runtime/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public static ServerBuilder AddRuntime(this IServiceCollection services, Action<
2222
}
2323

2424
services.AddLogging();
25-
services.AddTransient<IRuntimeHost, RuntimeHost>();
25+
services.AddTransient<IRuntimeHost, EventMeshPeer>();
2626
services.AddTransient<IMessageHandler, HeartbeatMessageHandler>();
2727
services.AddTransient<IMessageHandler, HelloMessageHandler>();
2828
services.AddTransient<IMessageHandler, SubscribeMessageHandler>();

0 commit comments

Comments
 (0)