Skip to content

Commit 31d220d

Browse files
Ticket #102 : When node is stopped then it become inactive
1 parent 210421e commit 31d220d

7 files changed

Lines changed: 101 additions & 19 deletions

File tree

src/RaftConsensus/FaasNet.RaftConsensus.Client/Extensions/AsyncExtensions.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationT
1616
{
1717
task, tcs.Task
1818
};
19-
2019
Task delayTask = null;
2120
if (timeOutMilliSeconds != null)
2221
{

src/RaftConsensus/FaasNet.RaftConsensus.Client/GossipClient.cs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,31 @@ public GossipClient(string url, int port) : this(new IPEndPoint(IPAddressHelper.
2525

2626
public UdpClient UdpClient { get; private set; }
2727

28-
public Task Heartbeat(string url, int port, CancellationToken cancellationToken = default(CancellationToken))
28+
public Task Heartbeat(string url, int port, int? timeout, CancellationToken cancellationToken = default(CancellationToken))
2929
{
3030
var package = GossipPackageRequestBuilder.Heartbeat(url, port);
31-
return Send(package, cancellationToken);
32-
}
33-
34-
public async Task Send(GossipPackage gossipPackage, CancellationToken cancellationToken = default(CancellationToken))
35-
{
36-
var writeCtx = new WriteBufferContext();
37-
gossipPackage.Serialize(writeCtx);
38-
var payload = writeCtx.Buffer.ToArray();
39-
await UdpClient.SendAsync(payload, payload.Count(), _target).WithCancellation(cancellationToken);
31+
return Send(package, timeout, cancellationToken);
4032
}
4133

4234
public Task JoinNode(string url, int port, CancellationToken cancellationToken = default(CancellationToken))
4335
{
4436
var package = GossipPackageRequestBuilder.AddNode(url, port);
45-
return Send(package, cancellationToken);
37+
return Send(package, cancellationToken: cancellationToken);
4638
}
4739

4840
public Task UpdateClusterNodes(string url, int port, ICollection<ClusterNodeMessage> clusterNodes, CancellationToken cancellationToken = default(CancellationToken))
4941
{
5042
var package = GossipPackageRequestBuilder.UpdateClusterNodes(url, port, clusterNodes);
51-
return Send(package, cancellationToken);
43+
return Send(package, cancellationToken: cancellationToken);
44+
}
45+
46+
public async Task Send(GossipPackage gossipPackage, int? timeoutMS = null, CancellationToken cancellationToken = default(CancellationToken))
47+
{
48+
var writeCtx = new WriteBufferContext();
49+
gossipPackage.Serialize(writeCtx);
50+
var payload = writeCtx.Buffer.ToArray();
51+
await UdpClient.SendAsync(payload, payload.Count(), _target).WithCancellation(cancellationToken, timeoutMS);
52+
if (UdpClient.Available == 1) throw new TimeoutException();
5253
}
5354

5455
public void Dispose()

src/RaftConsensus/FaasNet.RaftConsensus.Core/BaseNodeHost.cs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public interface INodeHost
2525
int Port { get; }
2626
bool IsStarted { get; }
2727
BlockingCollection<IPeerHost> Peers { get; }
28+
BlockingCollection<UnreachableClusterNode> UnreachableClusterNodes { get; }
2829
INodeStateStore NodeStateStore { get; }
2930
Task Start(CancellationToken cancellationToken);
3031
Task Stop();
@@ -42,6 +43,7 @@ public abstract class BaseNodeHost : INodeHost
4243
private BlockingCollection<IPeerHost> _peers;
4344
private string _nodeId;
4445
private bool _isStarted = false;
46+
private BlockingCollection<UnreachableClusterNode> _clusterNodes;
4547

4648
public BaseNodeHost(IPeerStore peerStore, IPeerHostFactory peerHostFactory, INodeStateStore nodeStateStore, IClusterStore clusterStore, ILogger<BaseNodeHost> logger, IOptions<ConsensusPeerOptions> options)
4749
{
@@ -56,7 +58,9 @@ public BaseNodeHost(IPeerStore peerStore, IPeerHostFactory peerHostFactory, INod
5658
}
5759

5860
public BlockingCollection<IPeerHost> Peers => _peers;
61+
public BlockingCollection<UnreachableClusterNode> UnreachableClusterNodes => _clusterNodes;
5962
public INodeStateStore NodeStateStore => _nodeStateStore;
63+
public IClusterStore ClusterStore => _clusterStore;
6064
public bool IsStarted => _isStarted;
6165
public int Port => _options.Port;
6266
public bool IsRunning { get; private set; }
@@ -71,6 +75,7 @@ public async Task Start(CancellationToken cancellationToken)
7175
{
7276
if (IsRunning) throw new InvalidOperationException("The node is already running");
7377
_peers = new BlockingCollection<IPeerHost>();
78+
_clusterNodes = new BlockingCollection<UnreachableClusterNode>();
7479
TokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
7580
await RunConsensusPeers(cancellationToken);
7681
StartGossipTimer();
@@ -174,7 +179,7 @@ private async Task<bool> TryHandleGossipRequest(UdpReceiveResult udpResult)
174179
if(packageResult == null) return true;
175180
using(var client = new GossipClient(gossipPackage.Header.SourceUrl, gossipPackage.Header.SourcePort))
176181
{
177-
await client.Send(packageResult, TokenSource.Token);
182+
await client.Send(packageResult, cancellationToken: TokenSource.Token);
178183
}
179184

180185
return true;
@@ -205,9 +210,20 @@ private async Task BroadcastGossipHeartbeat(object source, ElapsedEventArgs e)
205210
{
206211
var rndNodeIndex = rnd.Next(0, totalNodes);
207212
var selectedNode = nodes.ElementAt(rndNodeIndex);
208-
using (var client = new GossipClient(selectedNode.Url, selectedNode.Port))
213+
if (_clusterNodes.Any(n => n.Node.Url == selectedNode.Url && n.Node.Port == selectedNode.Port && n.ReactivationDateTime > DateTime.UtcNow)) continue;
214+
try
209215
{
210-
await client.Heartbeat(_options.Url, _options.Port, TokenSource.Token);
216+
using (var client = new GossipClient(selectedNode.Url, selectedNode.Port))
217+
{
218+
await client.Heartbeat(_options.Url, _options.Port, _options.GossipTimeoutHeartbeatMS, TokenSource.Token);
219+
}
220+
}
221+
catch(Exception ex)
222+
{
223+
Logger.LogError(ex.ToString());
224+
var clusterNode = _clusterNodes.FirstOrDefault(c => c.Node.Port == selectedNode.Port && c.Node.Url == selectedNode.Url);
225+
if (clusterNode == null) _clusterNodes.Add(new UnreachableClusterNode(selectedNode, DateTime.UtcNow.AddMilliseconds(_options.GossipClusterNodeDeactivationDurationMS)));
226+
else clusterNode.ReactivationDateTime = DateTime.UtcNow.AddMilliseconds(_options.GossipClusterNodeDeactivationDurationMS);
211227
}
212228
}
213229

@@ -289,4 +305,16 @@ private async Task<GossipPackage> HandleGossipRequest(GossipUpdateClusterRequest
289305

290306
#endregion
291307
}
308+
309+
public class UnreachableClusterNode
310+
{
311+
public UnreachableClusterNode(ClusterNode node, DateTime reactivationDateTime)
312+
{
313+
Node = node;
314+
ReactivationDateTime = reactivationDateTime;
315+
}
316+
317+
public ClusterNode Node { get; private set; }
318+
public DateTime ReactivationDateTime { get; set; }
319+
}
292320
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/ConsensusPeerOptions.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public ConsensusPeerOptions()
1616
LeaderHeartbeatTimerMS = 1000;
1717
GossipTimerMS = 1000;
1818
GossipMaxNodeBroadcast = 2;
19+
GossipTimeoutHeartbeatMS = 2000;
20+
GossipClusterNodeDeactivationDurationMS = 3000;
1921
}
2022

2123
/// <summary>
@@ -55,9 +57,17 @@ public ConsensusPeerOptions()
5557
/// </summary>
5658
public int GossipTimerMS { get; set; }
5759
/// <summary>
58-
/// Maximum number of nodes to broadcast the message.
60+
/// Gossip - Maximum number of nodes to broadcast the message.
5961
/// </summary>
6062
public int GossipMaxNodeBroadcast{ get; set; }
63+
/// <summary>
64+
/// Gossip - Heartbeat request expire after MS.
65+
/// </summary>
66+
public int GossipTimeoutHeartbeatMS { get; set; }
67+
/// <summary>
68+
/// Gossip - When cluster node is not reachable then deactivate the node.
69+
/// </summary>
70+
public int GossipClusterNodeDeactivationDurationMS { get; set; }
6171
}
6272

6373
public class Interval

src/RaftConsensus/FaasNet.RaftConsensus.Core/Stores/ClusterStore.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using FaasNet.RaftConsensus.Client;
22
using FaasNet.RaftConsensus.Core.Models;
3-
using Microsoft.Extensions.Options;
43
using System.Collections.Generic;
54
using System.Linq;
65
using System.Text.Json;

src/RaftConsensus/FaasNet.RaftConsensus.Logging.Startup/Program.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async Task DisplayMenu(ICollection<INodeHost> nodes)
104104
var continueExecution = true;
105105
do
106106
{
107-
Console.WriteLine("Enter 'Q' to stop execution / Enter 'inf' to display the states / Start a new node 'addnode'");
107+
Console.WriteLine("Enter 'Q' to stop execution / Enter 'inf' to display the states / Start a new node 'addnode' / Stop one node 'stopnode'");
108108
string termId = Console.ReadLine();
109109
continueExecution = termId != "Q";
110110
if (termId == "inf")
@@ -130,6 +130,15 @@ async Task DisplayMenu(ICollection<INodeHost> nodes)
130130
false);
131131
await StartNode(newNode, nodes.Count());
132132
nodes.Add(newNode);
133+
continue;
134+
}
135+
136+
if (termId == "stopnode")
137+
{
138+
Console.WriteLine("Enter the index");
139+
int index = int.Parse(Console.ReadLine());
140+
await nodes.ElementAt(index).Stop();
141+
continue;
133142
}
134143
}
135144
while (continueExecution);

tests/FaasNet.RaftConsensus.Tests/PeerHostFixture.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public async Task When_JoinNodeToCluster_Then_StorageIsUpdated()
3131
using (var gossipClient = new GossipClient("localhost", 4000)) await gossipClient.JoinNode("localhost", 4001);
3232
var seedNodeStates = await WaitEntityTypes(seedNode, (nodes) => nodes.Count() == 2);
3333
var firstNodeStates = await WaitEntityTypes(firstNode, (nodes) => nodes.Count() == 2);
34+
await seedNode.Stop();
35+
await firstNode.Stop();
3436

3537
// ASSERT
3638
Assert.Equal(2, seedNodeStates.Count());
@@ -43,6 +45,31 @@ public async Task When_JoinNodeToCluster_Then_StorageIsUpdated()
4345
Assert.Equal("{\"Url\":\"localhost\",\"Port\":4001}", firstNodeStates.ElementAt(0).Value);
4446
}
4547

48+
[Fact]
49+
public async Task When_NodeIsStopped_Then_NodeBecomeUnreachable()
50+
{
51+
// ARRANGE
52+
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4000, new ConcurrentBag<ClusterNode>(), true);
53+
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4001, new ConcurrentBag<ClusterNode>());
54+
await seedNode.Start(CancellationToken.None);
55+
await firstNode.Start(CancellationToken.None);
56+
WaitNodeIsStarted(seedNode);
57+
WaitNodeIsStarted(firstNode);
58+
using (var gossipClient = new GossipClient("localhost", 4000)) await gossipClient.JoinNode("localhost", 4001);
59+
var seedNodeStates = await WaitEntityTypes(seedNode, (nodes) => nodes.Count() == 2);
60+
var firstNodeStates = await WaitEntityTypes(firstNode, (nodes) => nodes.Count() == 2);
61+
62+
// ACT
63+
await firstNode.Stop();
64+
var clusterNodes = WaitUnreachableClusterNodes(seedNode, (un) => un.Count() == 1);
65+
await seedNode.Stop();
66+
67+
// ASSERT
68+
Assert.Single(clusterNodes);
69+
Assert.Equal("localhost", clusterNodes.First().Node.Url);
70+
Assert.Equal(4001, clusterNodes.First().Node.Port);
71+
}
72+
4673
#endregion
4774

4875
[Fact]
@@ -175,6 +202,15 @@ private static async Task<IEnumerable<NodeState>> WaitEntityTypes(INodeHost node
175202
}
176203
}
177204

205+
private static IEnumerable<UnreachableClusterNode> WaitUnreachableClusterNodes(INodeHost node, Func<IEnumerable<UnreachableClusterNode>, bool> callback)
206+
{
207+
while(true)
208+
{
209+
if (callback(node.UnreachableClusterNodes)) return node.UnreachableClusterNodes.ToArray();
210+
Thread.Sleep(200);
211+
}
212+
}
213+
178214
private static void WaitOnlyOneLeader(List<INodeHost> nodes, string termId)
179215
{
180216
while (true)

0 commit comments

Comments
 (0)