Skip to content

Commit 210421e

Browse files
Ticket #102 : Propagate cluster nodes.
1 parent dcf3272 commit 210421e

12 files changed

Lines changed: 269 additions & 68 deletions

File tree

src/RaftConsensus/FaasNet.RaftConsensus.Client/GossipClient.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using FaasNet.RaftConsensus.Client.Messages;
33
using FaasNet.RaftConsensus.Client.Messages.Gossip;
44
using System;
5+
using System.Collections.Generic;
56
using System.Linq;
67
using System.Net;
78
using System.Net.Sockets;
@@ -44,6 +45,12 @@ public GossipClient(string url, int port) : this(new IPEndPoint(IPAddressHelper.
4445
return Send(package, cancellationToken);
4546
}
4647

48+
public Task UpdateClusterNodes(string url, int port, ICollection<ClusterNodeMessage> clusterNodes, CancellationToken cancellationToken = default(CancellationToken))
49+
{
50+
var package = GossipPackageRequestBuilder.UpdateClusterNodes(url, port, clusterNodes);
51+
return Send(package, cancellationToken);
52+
}
53+
4754
public void Dispose()
4855
{
4956
UdpClient?.Dispose();

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/Gossip/GossipCommands.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public class GossipCommands: BaseCommands
88
public static GossipCommands SYNC_RESULT = new GossipCommands(3, "SYNC_RESULT");
99
public static GossipCommands UPDATE_NODE_STATE_REQUEST = new GossipCommands(4, "UPDATE_NODE_STATE_REQUEST");
1010
public static GossipCommands JOIN_NODE_REQUEST = new GossipCommands(5, "JOIN_NODE_REQUEST");
11+
public static GossipCommands UPDATE_CLUSTER_NODES_REQUEST = new GossipCommands(6, "UPDATE_CLUSTER_NODES_REQUEST");
1112

1213
protected GossipCommands(int code)
1314
{

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/Gossip/GossipPackage.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ public static GossipPackage Deserialize(ReadBufferContext context)
5656
return result;
5757
}
5858

59+
if (header.Command == GossipCommands.UPDATE_CLUSTER_NODES_REQUEST)
60+
{
61+
var result = new GossipUpdateClusterRequest { Header = header };
62+
result.Extract(context);
63+
return result;
64+
}
65+
5966
return new GossipPackage { Header = header };
6067
}
6168
}

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/Gossip/GossipPackageRequestBuilder.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,14 @@ public static GossipPackage AddNode(string url, int port)
3131
Url = url
3232
};
3333
}
34+
35+
public static GossipPackage UpdateClusterNodes(string url, int port, ICollection<ClusterNodeMessage> clusterNodes)
36+
{
37+
return new GossipUpdateClusterRequest
38+
{
39+
Header = new GossipHeader(GossipCommands.UPDATE_CLUSTER_NODES_REQUEST, url, port),
40+
Nodes = clusterNodes
41+
};
42+
}
3443
}
3544
}
Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,48 @@
1-
namespace FaasNet.RaftConsensus.Client.Messages.Gossip
1+
using System.Collections.Generic;
2+
3+
namespace FaasNet.RaftConsensus.Client.Messages.Gossip
24
{
3-
public class GossipUpdateClusterRequest
5+
public class GossipUpdateClusterRequest : GossipPackage
46
{
7+
public GossipUpdateClusterRequest()
8+
{
9+
Nodes = new List<ClusterNodeMessage>();
10+
}
11+
12+
public ICollection<ClusterNodeMessage> Nodes { get; set; }
13+
14+
public override void Serialize(WriteBufferContext context)
15+
{
16+
base.Serialize(context);
17+
context.WriteInteger(Nodes.Count);
18+
foreach (var node in Nodes) node.Serialize(context);
19+
}
20+
21+
public void Extract(ReadBufferContext context)
22+
{
23+
var size = context.NextInt();
24+
for(int i = 0; i < size; i++) Nodes.Add(ClusterNodeMessage.Deserialize(context));
25+
}
26+
}
27+
28+
public class ClusterNodeMessage
29+
{
30+
public string Url { get; set; }
31+
public int Port { get; set; }
32+
33+
public void Serialize(WriteBufferContext context)
34+
{
35+
context.WriteString(Url);
36+
context.WriteInteger(Port);
37+
}
38+
39+
public static ClusterNodeMessage Deserialize(ReadBufferContext context)
40+
{
41+
return new ClusterNodeMessage
42+
{
43+
Url = context.NextString(),
44+
Port = context.NextInt()
45+
};
46+
}
547
}
648
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/BaseNodeHost.cs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ public interface INodeHost
2222
{
2323
event EventHandler<EventArgs> NodeStarted;
2424
string NodeId { get; }
25+
int Port { get; }
26+
bool IsStarted { get; }
2527
BlockingCollection<IPeerHost> Peers { get; }
28+
INodeStateStore NodeStateStore { get; }
2629
Task Start(CancellationToken cancellationToken);
2730
Task Stop();
2831
}
@@ -38,6 +41,7 @@ public abstract class BaseNodeHost : INodeHost
3841
private System.Timers.Timer _gossipTimer;
3942
private BlockingCollection<IPeerHost> _peers;
4043
private string _nodeId;
44+
private bool _isStarted = false;
4145

4246
public BaseNodeHost(IPeerStore peerStore, IPeerHostFactory peerHostFactory, INodeStateStore nodeStateStore, IClusterStore clusterStore, ILogger<BaseNodeHost> logger, IOptions<ConsensusPeerOptions> options)
4347
{
@@ -52,6 +56,9 @@ public BaseNodeHost(IPeerStore peerStore, IPeerHostFactory peerHostFactory, INod
5256
}
5357

5458
public BlockingCollection<IPeerHost> Peers => _peers;
59+
public INodeStateStore NodeStateStore => _nodeStateStore;
60+
public bool IsStarted => _isStarted;
61+
public int Port => _options.Port;
5562
public bool IsRunning { get; private set; }
5663
public UdpClient UdpServer { get; private set; }
5764
public ILogger<BaseNodeHost> Logger { get; private set; }
@@ -82,6 +89,7 @@ public Task Stop()
8289
IsRunning = false;
8390
_proxyClient.Close();
8491
UdpServer.Close();
92+
_isStarted = false;
8593
return Task.CompletedTask;
8694
}
8795

@@ -90,6 +98,7 @@ protected async Task InternalRun()
9098
try
9199
{
92100
if (NodeStarted != null) NodeStarted(this, new EventArgs());
101+
_isStarted = true;
93102
while (true)
94103
{
95104
TokenSource.Token.ThrowIfCancellationRequested();
@@ -187,6 +196,7 @@ private void StopGossipTimer()
187196
private async Task BroadcastGossipHeartbeat(object source, ElapsedEventArgs e)
188197
{
189198
var nodes = await _clusterStore.GetAllNodes(TokenSource.Token);
199+
nodes = nodes.Where(n => n.Port != _options.Port || n.Url != _options.Url);
190200
var totalNodes = nodes.Count();
191201
var nbNodes = _options.GossipMaxNodeBroadcast;
192202
if(totalNodes < nbNodes) nbNodes = totalNodes;
@@ -254,10 +264,24 @@ private async Task<GossipPackage> HandleGossipRequest(GossipUpdateNodeStateReque
254264
private async Task<GossipPackage> HandleGossipRequest(GossipJoinNodeRequest request)
255265
{
256266
var cluster = await _clusterStore.GetNode(request.Url, request.Port, TokenSource.Token);
257-
if (cluster == null) await _clusterStore.AddNode(new ClusterNode { Port = request.Port, Url = request.Url }, TokenSource.Token);
258-
using (var gossipClient = new GossipClient(request.Url, request.Port))
267+
if (cluster == null)
259268
{
260-
// await gossipClient.JoinNode(_options.Url, _options.Port, true, TokenSource.Token);
269+
await _clusterStore.AddNode(new ClusterNode { Port = request.Port, Url = request.Url }, TokenSource.Token);
270+
var allNodes = await _clusterStore.GetAllNodes(TokenSource.Token);
271+
using (var gossipClient = new GossipClient(request.Url, request.Port))
272+
{
273+
await gossipClient.UpdateClusterNodes(_options.Url, _options.Port, allNodes.Select(n => new ClusterNodeMessage { Port = n.Port, Url = n.Url }).ToList(), TokenSource.Token);
274+
}
275+
}
276+
277+
return null;
278+
}
279+
280+
private async Task<GossipPackage> HandleGossipRequest(GossipUpdateClusterRequest request)
281+
{
282+
foreach(var node in request.Nodes)
283+
{
284+
await _clusterStore.AddNode(new ClusterNode { Port = node.Port, Url = node.Url }, TokenSource.Token);
261285
}
262286

263287
return null;

src/RaftConsensus/FaasNet.RaftConsensus.Core/Models/ClusterNode.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using FaasNet.RaftConsensus.Client;
2+
using System;
23
using System.Text.Json;
34

45
namespace FaasNet.RaftConsensus.Core.Models
@@ -12,7 +13,7 @@ public NodeState ToNodeState()
1213
{
1314
return new NodeState
1415
{
15-
EntityId = string.Empty,
16+
EntityId = Guid.NewGuid().ToString(),
1617
EntityType = StandardEntityTypes.Cluster,
1718
EntityVersion = 0,
1819
Value = JsonSerializer.Serialize(this)

src/RaftConsensus/FaasNet.RaftConsensus.Core/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@ public static ServerBuilder AddConsensusPeer(this IServiceCollection services, A
1414
if (callback != null) services.Configure(callback);
1515
else services.Configure<ConsensusPeerOptions>((o) => { });
1616
var peerStore = new InMemoryPeerStore(new ConcurrentBag<PeerInfo>());
17-
var nodeStateStore = new InMemoryNodeStateStore(new ConcurrentBag<NodeState>());
1817
services.AddLogging();
1918
services.AddTransient<INodeHost, StandaloneNodeHost>();
2019
services.AddTransient<IPeerHostFactory, PeerHostFactory>();
2120
services.AddTransient<IClusterStore, ClusterStore>();
2221
services.AddScoped<IPeerHost, StandalonePeerHost>();
2322
services.AddScoped<ILogStore, InMemoryLogStore>();
23+
services.AddSingleton<INodeStateStore, InMemoryNodeStateStore>();
2424
services.AddSingleton<IPeerStore>(peerStore);
25-
services.AddSingleton<INodeStateStore>(nodeStateStore);
2625
return new ServerBuilder(services);
2726
}
2827
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/Stores/ClusterStore.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,21 @@ public interface IClusterStore
1919
public class ClusterStore : IClusterStore
2020
{
2121
private readonly INodeStateStore _nodeStateStore;
22-
private readonly ConsensusPeerOptions _options;
2322

24-
public ClusterStore(INodeStateStore nodeStateStore, IOptions<ConsensusPeerOptions> options)
23+
public ClusterStore(INodeStateStore nodeStateStore)
2524
{
2625
_nodeStateStore = nodeStateStore;
27-
_options = options.Value;
2826
}
2927

3028
public async Task<IEnumerable<ClusterNode>> GetAllNodes(CancellationToken cancellationToken)
3129
{
3230
var lastEntityTypes = await _nodeStateStore.GetAllLastEntityTypes(StandardEntityTypes.Cluster, cancellationToken);
31+
lastEntityTypes = lastEntityTypes.OrderBy(e => e.EntityVersion);
3332
var result = lastEntityTypes.Select(et => JsonSerializer.Deserialize<ClusterNode>(et.Value, new JsonSerializerOptions
3433
{
3534
PropertyNameCaseInsensitive = true
3635
}));
37-
return result.Where(r => r.Url != _options.Url || r.Port != _options.Port);
36+
return result;
3837
}
3938

4039
public async Task<ClusterNode> GetNode(string url, int port, CancellationToken cancellationToken)
@@ -43,10 +42,12 @@ public async Task<ClusterNode> GetNode(string url, int port, CancellationToken c
4342
return allNodes.FirstOrDefault(n => n.Port == port && n.Url == url);
4443
}
4544

46-
public Task AddNode(ClusterNode node, CancellationToken cancellationToken)
45+
public async Task AddNode(ClusterNode node, CancellationToken cancellationToken)
4746
{
48-
_nodeStateStore.Add(node.ToNodeState());
49-
return Task.CompletedTask;
47+
var lastEntityType = await _nodeStateStore.GetLastEntityType(StandardEntityTypes.Cluster, cancellationToken);
48+
var nodeState = node.ToNodeState();
49+
if (lastEntityType != null) nodeState.EntityVersion = lastEntityType.EntityVersion + 1;
50+
_nodeStateStore.Add(nodeState);
5051
}
5152
}
5253
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/Stores/INodeStateStore.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public interface INodeStateStore
1313
void Add(NodeState nodeState);
1414
void Update(NodeState nodeState);
1515
Task<int> SaveChanges(CancellationToken cancellationToken);
16+
Task<IEnumerable<NodeState>> GetAllEntityTypes(CancellationToken cancellationToken);
1617
Task<IEnumerable<NodeState>> GetAllLastEntityTypes(CancellationToken cancellationToken);
1718
Task<IEnumerable<NodeState>> GetAllLastEntityTypes(string entityType, CancellationToken cancellationToken);
1819
Task<IEnumerable<NodeState>> GetAllSpecificEntityTypes(List<(string EntityType, int EntityVersion)> parameter, CancellationToken cancellationToken);
@@ -23,6 +24,11 @@ public class InMemoryNodeStateStore : INodeStateStore
2324
{
2425
private readonly ConcurrentBag<NodeState> _nodeStates;
2526

27+
public InMemoryNodeStateStore()
28+
{
29+
_nodeStates = new ConcurrentBag<NodeState>();
30+
}
31+
2632
public InMemoryNodeStateStore(ConcurrentBag<NodeState> nodeStates)
2733
{
2834
_nodeStates = nodeStates;
@@ -38,6 +44,12 @@ public void Update(NodeState nodeState)
3844
_nodeStates.Remove(nodeState);
3945
}
4046

47+
public Task<IEnumerable<NodeState>> GetAllEntityTypes(CancellationToken cancellationToken)
48+
{
49+
IEnumerable<NodeState> result = _nodeStates.OrderByDescending(ns => ns.EntityVersion);
50+
return Task.FromResult(result);
51+
}
52+
4153
public Task<IEnumerable<NodeState>> GetAllLastEntityTypes(CancellationToken cancellationToken)
4254
{
4355
IEnumerable<NodeState> result = _nodeStates.OrderByDescending(ns => ns.EntityVersion).GroupBy(ns => ns.EntityType).Select(ns => ns.First());
@@ -53,7 +65,7 @@ public Task<IEnumerable<NodeState>> GetAllLastEntityTypes(string entityType, Can
5365
public Task<IEnumerable<NodeState>> GetAllSpecificEntityTypes(List<(string EntityType, int EntityVersion)> parameter, CancellationToken cancellationToken)
5466
{
5567
IEnumerable<NodeState> result = _nodeStates.Where(ns => parameter.Any(p => p.EntityType == ns.EntityType && p.EntityVersion == ns.EntityVersion));
56-
throw new System.NotImplementedException();
68+
return Task.FromResult(result);
5769
}
5870

5971
public Task<NodeState> GetLastEntityType(string entityType, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)