Skip to content

Commit 8657936

Browse files
Ticket #102 : Fix consensus protocol
1 parent d7394d8 commit 8657936

5 files changed

Lines changed: 99 additions & 121 deletions

File tree

src/RaftConsensus/FaasNet.RaftConsensus.Core/BaseNodeHost.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ protected async Task HandleUDPPackage()
123123
try
124124
{
125125
var udpResult = await UdpServer.ReceiveAsync().WithCancellation(TokenSource.Token);
126-
// if (await TryHandleConsensusRequest(udpResult)) return;
126+
if (await TryHandleConsensusRequest(udpResult)) return;
127127
if (await TryHandleGossipRequest(udpResult)) return;
128128
await HandleUDPPackage(udpResult, TokenSource.Token);
129129
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/BasePeerHost.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public abstract class BasePeerHost : IPeerHost
3737
{
3838
private readonly ILogger<BasePeerHost> _logger;
3939
private readonly ConsensusPeerOptions _options;
40-
private readonly ClusterStore _clusterStore;
40+
private readonly IClusterStore _clusterStore;
4141
private readonly ILogStore _logStore;
4242
private readonly IPeerStore _peerStore;
4343
private readonly string _peerId;
@@ -51,7 +51,7 @@ public abstract class BasePeerHost : IPeerHost
5151
private System.Timers.Timer _electionCheckTimer;
5252
private System.Timers.Timer _leaderHeartbeatTimer;
5353

54-
public BasePeerHost(ILogger<BasePeerHost> logger, IOptions<ConsensusPeerOptions> options, ClusterStore clusterStore, ILogStore logStore, IPeerStore peerStore)
54+
public BasePeerHost(ILogger<BasePeerHost> logger, IOptions<ConsensusPeerOptions> options, IClusterStore clusterStore, ILogStore logStore, IPeerStore peerStore)
5555
{
5656
_logger = logger;
5757
_options = options.Value;
@@ -157,6 +157,7 @@ private async Task StartElection()
157157
// Start to vote.
158158
_logger.LogInformation("{Node}:{PeerId}:{TermId}, Start to vote", _nodeId, _peerId, Info.TermId);
159159
var nodes = await _clusterStore.GetAllNodes(TokenSource.Token);
160+
nodes = nodes.Where(n => n.Port != _options.Port || n.Url != _options.Url);
160161
_quorum = (nodes.Count() / 2) + 1;
161162
if (_quorum == 0)
162163
{

src/RaftConsensus/FaasNet.RaftConsensus.Core/StandalonePeerHost.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace FaasNet.RaftConsensus.Core
99
{
1010
public class StandalonePeerHost : BasePeerHost
1111
{
12-
public StandalonePeerHost(ILogger<BasePeerHost> logger, IOptions<ConsensusPeerOptions> options, ClusterStore clusterStore, ILogStore logStore, IPeerStore peerStore) : base(logger, options, clusterStore, logStore, peerStore)
12+
public StandalonePeerHost(ILogger<BasePeerHost> logger, IOptions<ConsensusPeerOptions> options, IClusterStore clusterStore, ILogStore logStore, IPeerStore peerStore) : base(logger, options, clusterStore, logStore, peerStore)
1313
{
1414
}
1515

src/RaftConsensus/FaasNet.RaftConsensus.Logging.Startup/Program.cs

Lines changed: 54 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,65 +3,44 @@
33
using FaasNet.RaftConsensus.Core;
44
using FaasNet.RaftConsensus.Core.Models;
55
using Microsoft.Extensions.DependencyInjection;
6+
using Microsoft.Extensions.Logging;
67
using System.Collections.Concurrent;
78

89
int seedPort = 4000;
9-
var allNodes = CreateNodes();
10+
var peerInfos = AddPeers();
11+
var allNodes = CreateNodes(peerInfos);
1012
await StartNodes(allNodes);
1113
await DisplayMenu(allNodes);
14+
Console.WriteLine("Press Enter to quit the application");
15+
Console.ReadLine();
1216

13-
/*
14-
var peerInfos = new ConcurrentBag<string>();
15-
do
16-
{
17-
Console.WriteLine("Enter 'Q' to stop adding new peer or enter the name of your peer");
18-
string str = Console.ReadLine();
19-
continueExecution = str != "Q";
20-
if (continueExecution) peerInfos.Add(str);
21-
}
22-
while (continueExecution);
17+
foreach (var node in allNodes) await node.Stop();
2318

24-
continueExecution = true;
25-
do
19+
ConcurrentBag<string> AddPeers()
2620
{
27-
Console.WriteLine("Enter 'Q' to stop sending message / Enter 'inf' to display the status of the peers / Enter the name of the partition");
28-
string termId = Console.ReadLine();
29-
continueExecution = termId != "Q";
30-
if(termId == "inf")
21+
var continueExecution = true;
22+
var peerInfos = new ConcurrentBag<string>();
23+
do
3124
{
32-
foreach(var node in allNodes)
33-
{
34-
foreach(var peer in node.Peers)
35-
{
36-
Console.WriteLine($"Node '{node.NodeId}', Peer '{peer.PeerId}', TermId '{peer.Info.TermId}', TermIndex '{peer.Info.ConfirmedTermIndex}', State '{peer.State}");
37-
}
38-
}
39-
40-
continue;
25+
Console.WriteLine("Enter 'Q' to stop adding new peer");
26+
Console.WriteLine("Enter the name of your peer");
27+
string str = Console.ReadLine();
28+
continueExecution = str != "Q";
29+
if (continueExecution) peerInfos.Add(str);
4130
}
42-
43-
Console.WriteLine("Please enter the message");
44-
string message = Console.ReadLine();
45-
var consensusClient = new ConsensusClient("localhost", seedPort);
46-
await consensusClient.AppendEntry(termId, message, CancellationToken.None);
31+
while (continueExecution);
32+
return peerInfos;
4733
}
48-
while (continueExecution);
49-
*/
50-
51-
Console.WriteLine("Press Enter to quit the application");
52-
Console.ReadLine();
53-
54-
foreach (var node in allNodes) await node.Stop();
5534

56-
ICollection<INodeHost> CreateNodes()
35+
ICollection<INodeHost> CreateNodes(ConcurrentBag<string> peerInfos)
5736
{
5837
Console.WriteLine("How many nodes do you want to start ?");
5938
int nbNodes = int.Parse(Console.ReadLine());
6039
var allNodes = new List<INodeHost>();
6140
for (int i = 0; i <= nbNodes; i++)
6241
{
6342
allNodes.Add(BuildNodeHost(
64-
new ConcurrentBag<PeerInfo>(/*peerInfos.Select(p => new PeerInfo { TermId = p })*/),
43+
new ConcurrentBag<PeerInfo>(peerInfos.Select(p => new PeerInfo { TermId = p })),
6544
seedPort + i,
6645
Path.Combine(Directory.GetCurrentDirectory(), i + "-log-{0}.txt"),
6746
i == 0));
@@ -104,10 +83,15 @@ async Task DisplayMenu(ICollection<INodeHost> nodes)
10483
var continueExecution = true;
10584
do
10685
{
107-
Console.WriteLine("Enter 'Q' to stop execution / Enter 'inf' to display the states / Start a new node 'addnode' / Stop one node 'stopnode'");
108-
string termId = Console.ReadLine();
109-
continueExecution = termId != "Q";
110-
if (termId == "inf")
86+
Console.WriteLine("Enter 'Q' to stop execution");
87+
Console.WriteLine("Enter 'states' to display the states");
88+
Console.WriteLine("Enter 'peers' to display the status of the peers");
89+
Console.WriteLine("Enter 'appendentry' to append an entry");
90+
Console.WriteLine("Start a new node 'addnode'");
91+
Console.WriteLine("Stop one node 'stopnode'");
92+
string menuId = Console.ReadLine();
93+
continueExecution = menuId != "Q";
94+
if (menuId == "states")
11195
{
11296
foreach (var node in nodes)
11397
{
@@ -121,10 +105,10 @@ async Task DisplayMenu(ICollection<INodeHost> nodes)
121105
continue;
122106
}
123107

124-
if(termId == "addnode")
108+
if(menuId == "addnode")
125109
{
126110
var newNode = BuildNodeHost(
127-
new ConcurrentBag<PeerInfo>(/*peerInfos.Select(p => new PeerInfo { TermId = p })*/),
111+
new ConcurrentBag<PeerInfo>(peerInfos.Select(p => new PeerInfo { TermId = p })),
128112
seedPort + nodes.Count(),
129113
Path.Combine(Directory.GetCurrentDirectory(), nodes.Count() + "-log-{0}.txt"),
130114
false);
@@ -133,13 +117,35 @@ async Task DisplayMenu(ICollection<INodeHost> nodes)
133117
continue;
134118
}
135119

136-
if (termId == "stopnode")
120+
if (menuId == "stopnode")
137121
{
138122
Console.WriteLine("Enter the index");
139123
int index = int.Parse(Console.ReadLine());
140124
await nodes.ElementAt(index).Stop();
141125
continue;
142126
}
127+
128+
if (menuId == "peers")
129+
{
130+
foreach (var node in allNodes)
131+
{
132+
foreach (var peer in node.Peers)
133+
{
134+
Console.WriteLine($"Node '{node.NodeId}', Peer '{peer.PeerId}', TermId '{peer.Info.TermId}', TermIndex '{peer.Info.ConfirmedTermIndex}', State '{peer.State}");
135+
}
136+
}
137+
continue;
138+
}
139+
140+
if(menuId == "appendentry")
141+
{
142+
Console.WriteLine("Enter the termid");
143+
string termId = Console.ReadLine();
144+
Console.WriteLine("Please enter the message");
145+
string message = Console.ReadLine();
146+
var consensusClient = new ConsensusClient("localhost", seedPort);
147+
await consensusClient.AppendEntry(termId, message, CancellationToken.None);
148+
}
143149
}
144150
while (continueExecution);
145151
}
@@ -149,9 +155,8 @@ INodeHost BuildNodeHost(ConcurrentBag<PeerInfo> peers, int port, string logPath,
149155
var services = new ServiceCollection()
150156
.AddConsensusPeer(o => o.Port = port);
151157
if (isSeed) services.SetNodeStates(new ConcurrentBag<NodeState> { new ClusterNode { Port = 4000, Url = "localhost" }.ToNodeState() });
152-
// .SetPeers(peers)
153-
// .SetNodeStates(nodeStates)
154158
var serviceProvider = services.UseLogFileStore(o => o.LogFilePath = logPath)
159+
.SetPeers(peers)
155160
.Services
156161
// .AddLogging(l => l.AddConsole())
157162
.BuildServiceProvider();

tests/FaasNet.RaftConsensus.Tests/PeerHostFixture.cs

Lines changed: 40 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ public async Task When_JoinNodeToCluster_Then_StorageIsUpdated()
4949
public async Task When_NodeIsStopped_Then_NodeBecomeUnreachable()
5050
{
5151
// ARRANGE
52-
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4000, new ConcurrentBag<ClusterNode>(), true);
53-
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4001, new ConcurrentBag<ClusterNode>());
52+
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4001, new ConcurrentBag<ClusterNode>(), true);
53+
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4002, new ConcurrentBag<ClusterNode>());
5454
await seedNode.Start(CancellationToken.None);
5555
await firstNode.Start(CancellationToken.None);
5656
WaitNodeIsStarted(seedNode);
5757
WaitNodeIsStarted(firstNode);
58-
using (var gossipClient = new GossipClient("localhost", 4000)) await gossipClient.JoinNode("localhost", 4001);
58+
using (var gossipClient = new GossipClient("localhost", 4001)) await gossipClient.JoinNode("localhost", 4002);
5959
var seedNodeStates = await WaitEntityTypes(seedNode, (nodes) => nodes.Count() == 2);
6060
var firstNodeStates = await WaitEntityTypes(firstNode, (nodes) => nodes.Count() == 2);
6161

@@ -67,25 +67,25 @@ public async Task When_NodeIsStopped_Then_NodeBecomeUnreachable()
6767
// ASSERT
6868
Assert.Single(clusterNodes);
6969
Assert.Equal("localhost", clusterNodes.First().Node.Url);
70-
Assert.Equal(4001, clusterNodes.First().Node.Port);
70+
Assert.Equal(4002, clusterNodes.First().Node.Port);
7171
}
7272

7373
[Fact]
7474
public async Task When_StateIsAdded_Then_StorageIsUpdated()
7575
{
7676
// ARRANGE
77-
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4000, new ConcurrentBag<ClusterNode>(), true);
78-
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4001, new ConcurrentBag<ClusterNode>());
77+
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4003, new ConcurrentBag<ClusterNode>(), true);
78+
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4004, new ConcurrentBag<ClusterNode>());
7979
await seedNode.Start(CancellationToken.None);
8080
await firstNode.Start(CancellationToken.None);
8181
WaitNodeIsStarted(seedNode);
8282
WaitNodeIsStarted(firstNode);
83-
using (var gossipClient = new GossipClient("localhost", 4000)) await gossipClient.JoinNode("localhost", 4001);
83+
using (var gossipClient = new GossipClient("localhost", 4003)) await gossipClient.JoinNode("localhost", 4004);
8484
var seedNodeStates = await WaitEntityTypes(seedNode, (nodes) => nodes.Count() == 2);
8585
var firstNodeStates = await WaitEntityTypes(firstNode, (nodes) => nodes.Count() == 2);
8686

8787
// ACT
88-
using (var gossipClient = new GossipClient("localhost", 4000)) await gossipClient.UpdateNodeState("Client", "id", "value");
88+
using (var gossipClient = new GossipClient("localhost", 4003)) await gossipClient.UpdateNodeState("Client", "id", "value");
8989
var seedClient = (await WaitEntityTypes(seedNode, (nodes) => nodes.Any(n => n.EntityType == "Client"))).First(c => c.EntityType == "Client");
9090
var firstNodeClient = (await WaitEntityTypes(firstNode, (nodes) => nodes.Any(n => n.EntityType == "Client"))).First(c => c.EntityType == "Client");
9191

@@ -98,105 +98,77 @@ public async Task When_StateIsAdded_Then_StorageIsUpdated()
9898

9999
#endregion
100100

101+
#region Consensus
102+
101103
[Fact]
102104
public async Task When_AppendLogInOnePartition_Then_LogIsReplicated()
103105
{
104106
// ARRANGE
105-
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>
106-
{
107-
new PeerInfo { TermId = "termId", TermIndex = 0 }
108-
}, 4001, new ConcurrentBag<ClusterNode>
109-
{
110-
new ClusterNode
111-
{
112-
Port = 4002,
113-
Url = "localhost"
114-
}
115-
});
116-
var secondNode = BuildNodeHost(new ConcurrentBag<PeerInfo>
117-
{
118-
new PeerInfo { TermId = "termId", TermIndex = 0 }
119-
}, 4002, new ConcurrentBag<ClusterNode>
120-
{
121-
new ClusterNode
122-
{
123-
Port = 4001,
124-
Url = "localhost"
125-
}
126-
});
107+
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo> { new PeerInfo { TermId = "termId", TermIndex = 0 } }, 4005, new ConcurrentBag<ClusterNode>(), true);
108+
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo> { new PeerInfo { TermId = "termId", TermIndex = 0 } }, 4006, new ConcurrentBag<ClusterNode>());
109+
await seedNode.Start(CancellationToken.None);
127110
await firstNode.Start(CancellationToken.None);
128-
await secondNode.Start(CancellationToken.None);
129-
var allNodes = new List<INodeHost> { firstNode, secondNode };
111+
WaitNodeIsStarted(seedNode);
112+
WaitNodeIsStarted(firstNode);
113+
using (var gossipClient = new GossipClient("localhost", 4005)) await gossipClient.JoinNode("localhost", 4006);
114+
var seedNodeStates = await WaitEntityTypes(seedNode, (nodes) => nodes.Count() == 2);
115+
var firstNodeStates = await WaitEntityTypes(firstNode, (nodes) => nodes.Count() == 2);
116+
var allNodes = new List<INodeHost> { seedNode, firstNode };
130117

131118
// ACT
132119
WaitOnlyOneLeader(allNodes, "termId");
133-
var client = new ConsensusClient("localhost", 4001);
120+
var client = new ConsensusClient("localhost", 4005);
134121
client.AppendEntry("termId", "value", CancellationToken.None).Wait();
135122
WaitLogs(allNodes, p => p.Info.TermId == "termId", l => l.Value == "value");
136123

137124
// ASSERT
125+
var seedPeerLogs = seedNode.Peers.First().LogStore.GetAll(CancellationToken.None).Result;
138126
var firstPeerLogs = firstNode.Peers.First().LogStore.GetAll(CancellationToken.None).Result;
139-
var secondPeerLogs = secondNode.Peers.First().LogStore.GetAll(CancellationToken.None).Result;
127+
Assert.Single(seedPeerLogs);
140128
Assert.Single(firstPeerLogs);
141-
Assert.Single(secondPeerLogs);
129+
Assert.Equal("value", seedPeerLogs.First().Value);
142130
Assert.Equal("value", firstPeerLogs.First().Value);
143-
Assert.Equal("value", secondPeerLogs.First().Value);
131+
await seedNode.Stop();
144132
await firstNode.Stop();
145-
await secondNode.Stop();
146133
}
147134

148135
[Fact]
149136
public async Task When_AppendLogInTwoPartitions_Then_LogIsReplicated()
150137
{
151138
// ARRANGE
152-
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>
153-
{
154-
new PeerInfo { TermId = "termId", TermIndex = 0 },
155-
new PeerInfo { TermId = "secondTermId", TermIndex = 0 }
156-
}, 4001, new ConcurrentBag<ClusterNode>
157-
{
158-
new ClusterNode
159-
{
160-
Port = 4002,
161-
Url = "localhost"
162-
}
163-
});
164-
var secondNode = BuildNodeHost(new ConcurrentBag<PeerInfo>
165-
{
166-
new PeerInfo { TermId = "termId", TermIndex = 0 },
167-
new PeerInfo { TermId = "secondTermId", TermIndex = 0 }
168-
}, 4002, new ConcurrentBag<ClusterNode>
169-
{
170-
new ClusterNode
171-
{
172-
Port = 4001,
173-
Url = "localhost"
174-
}
175-
});
139+
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo> { new PeerInfo { TermId = "termId", TermIndex = 0 }, new PeerInfo { TermId = "secondTermId", TermIndex = 0 } }, 4007, new ConcurrentBag<ClusterNode>(), true);
140+
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo> { new PeerInfo { TermId = "termId", TermIndex = 0 }, new PeerInfo { TermId = "secondTermId", TermIndex = 0 } }, 4008, new ConcurrentBag<ClusterNode>());
141+
await seedNode.Start(CancellationToken.None);
176142
await firstNode.Start(CancellationToken.None);
177-
await secondNode.Start(CancellationToken.None);
178-
var allNodes = new List<INodeHost> { firstNode, secondNode };
143+
WaitNodeIsStarted(seedNode);
144+
WaitNodeIsStarted(firstNode);
145+
using (var gossipClient = new GossipClient("localhost", 4007)) await gossipClient.JoinNode("localhost", 4008);
146+
var seedNodeStates = await WaitEntityTypes(seedNode, (nodes) => nodes.Count() == 2);
147+
var firstNodeStates = await WaitEntityTypes(firstNode, (nodes) => nodes.Count() == 2);
148+
var allNodes = new List<INodeHost> { seedNode, firstNode };
179149

180150
// ACT
181151
WaitOnlyOneLeader(allNodes, "termId");
182152
WaitOnlyOneLeader(allNodes, "secondTermId");
183-
var client = new ConsensusClient("localhost", 4001);
153+
var client = new ConsensusClient("localhost", 4007);
184154
client.AppendEntry("termId", "value", CancellationToken.None).Wait();
185155
client.AppendEntry("secondTermId", "value", CancellationToken.None).Wait();
186156
WaitLogs(allNodes, p => p.Info.TermId == "termId", l => l.Value == "value");
187157
WaitLogs(allNodes, p => p.Info.TermId == "secondTermId", l => l.Value == "value");
188158

189159
// ASSERT
160+
var seedPeerLogs = seedNode.Peers.First().LogStore.GetAll(CancellationToken.None).Result;
190161
var firstPeerLogs = firstNode.Peers.First().LogStore.GetAll(CancellationToken.None).Result;
191-
var secondPeerLogs = secondNode.Peers.First().LogStore.GetAll(CancellationToken.None).Result;
162+
Assert.Single(seedPeerLogs);
192163
Assert.Single(firstPeerLogs);
193-
Assert.Single(secondPeerLogs);
164+
Assert.Equal("value", seedPeerLogs.First().Value);
194165
Assert.Equal("value", firstPeerLogs.First().Value);
195-
Assert.Equal("value", secondPeerLogs.First().Value);
166+
await seedNode.Stop();
196167
await firstNode.Stop();
197-
await secondNode.Stop();
198168
}
199169

170+
#endregion
171+
200172
private static INodeHost BuildNodeHost(ConcurrentBag<PeerInfo> peers, int port, ConcurrentBag<ClusterNode> clusterNodes, bool isSeed = false)
201173
{
202174
var serviceCollection = new ServiceCollection()

0 commit comments

Comments
 (0)