Skip to content

Commit d7394d8

Browse files
Ticket #102: Propagate node state
1 parent 31d220d commit d7394d8

5 files changed

Lines changed: 49 additions & 1 deletion

File tree

src/RaftConsensus/FaasNet.RaftConsensus.Client/GossipClient.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ public GossipClient(string url, int port) : this(new IPEndPoint(IPAddressHelper.
4343
return Send(package, cancellationToken: cancellationToken);
4444
}
4545

46+
public Task UpdateNodeState(string entityType, string entityId, string value, CancellationToken cancellationToken = default(CancellationToken))
47+
{
48+
var package = GossipPackageRequestBuilder.UpdateNodeState(string.Empty, 0, entityType, entityId, value);
49+
return Send(package, cancellationToken: cancellationToken);
50+
}
51+
4652
public async Task Send(GossipPackage gossipPackage, int? timeoutMS = null, CancellationToken cancellationToken = default(CancellationToken))
4753
{
4854
var writeCtx = new WriteBufferContext();

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/Gossip/GossipPackageRequestBuilder.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,16 @@ public static GossipPackage UpdateClusterNodes(string url, int port, ICollection
4040
Nodes = clusterNodes
4141
};
4242
}
43+
44+
public static GossipPackage UpdateNodeState(string url, int port, string entityType, string entityId, string value)
45+
{
46+
return new GossipUpdateNodeStateRequest
47+
{
48+
Header = new GossipHeader(GossipCommands.UPDATE_NODE_STATE_REQUEST, url, port),
49+
EntityId = entityId,
50+
EntityType = entityType,
51+
Value = value
52+
};
53+
}
4354
}
4455
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/BaseNodeHost.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ private async Task<GossipPackage> HandleGossipRequest(GossipHeartbeatResult requ
247247
var states = await _nodeStateStore.GetAllLastEntityTypes(TokenSource.Token);
248248
var stateToBeSynced = states.Where(s => request.States.Any(st => st.EntityType == s.EntityType && s.EntityVersion < st.EntityVersion))
249249
.ToDictionary(s => s.EntityType, s => s.EntityVersion + 1);
250+
var missingStates = request.States.Where(rs => !states.Any(s => s.EntityType == rs.EntityType));
251+
foreach (var missingState in missingStates) stateToBeSynced.Add(missingState.EntityType, missingState.EntityVersion);
250252
if (!stateToBeSynced.Any()) return null;
251253
return GossipPackageRequestBuilder.Sync(_options.Url, _options.Port, stateToBeSynced);
252254
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/Models/NodeState.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
namespace FaasNet.RaftConsensus.Core.Models
1+
using System.Diagnostics;
2+
3+
namespace FaasNet.RaftConsensus.Core.Models
24
{
5+
[DebuggerDisplay("EntityType = {EntityType}, EntityId = {EntityId}, Value = {Value}")]
36
public class NodeState
47
{
58
public string EntityType { get; set; }

tests/FaasNet.RaftConsensus.Tests/PeerHostFixture.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,32 @@ public async Task When_NodeIsStopped_Then_NodeBecomeUnreachable()
7070
Assert.Equal(4001, clusterNodes.First().Node.Port);
7171
}
7272

73+
[Fact]
74+
public async Task When_StateIsAdded_Then_StorageIsUpdated()
75+
{
76+
// ARRANGE
77+
var seedNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4000, new ConcurrentBag<ClusterNode>(), true);
78+
var firstNode = BuildNodeHost(new ConcurrentBag<PeerInfo>(), 4001, new ConcurrentBag<ClusterNode>());
79+
await seedNode.Start(CancellationToken.None);
80+
await firstNode.Start(CancellationToken.None);
81+
WaitNodeIsStarted(seedNode);
82+
WaitNodeIsStarted(firstNode);
83+
using (var gossipClient = new GossipClient("localhost", 4000)) await gossipClient.JoinNode("localhost", 4001);
84+
var seedNodeStates = await WaitEntityTypes(seedNode, (nodes) => nodes.Count() == 2);
85+
var firstNodeStates = await WaitEntityTypes(firstNode, (nodes) => nodes.Count() == 2);
86+
87+
// ACT
88+
using (var gossipClient = new GossipClient("localhost", 4000)) await gossipClient.UpdateNodeState("Client", "id", "value");
89+
var seedClient = (await WaitEntityTypes(seedNode, (nodes) => nodes.Any(n => n.EntityType == "Client"))).First(c => c.EntityType == "Client");
90+
var firstNodeClient = (await WaitEntityTypes(firstNode, (nodes) => nodes.Any(n => n.EntityType == "Client"))).First(c => c.EntityType == "Client");
91+
92+
// ASSERT
93+
Assert.NotNull(seedClient);
94+
Assert.NotNull(firstNodeClient);
95+
Assert.Equal("Client", seedClient.EntityType);
96+
Assert.Equal("Client", firstNodeClient.EntityType);
97+
}
98+
7399
#endregion
74100

75101
[Fact]

0 commit comments

Comments
 (0)