Skip to content

Commit dba8508

Browse files
Ticket #99 : Add sample project
1 parent 686036d commit dba8508

18 files changed

Lines changed: 237 additions & 220 deletions

File tree

src/RaftConsensus/FaasNet.RaftConsensus.Client/ConsensusClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public ConsensusClient(string url, int port) : this(new IPEndPoint(ResolveIPAddr
2626
public async Task LeaderHeartbeat(string termId, long termIndex, string url, int port, CancellationToken cancellationToken = default(CancellationToken))
2727
{
2828
var writeCtx = new WriteBufferContext();
29-
var package = PackageRequestBuilder.LeaderHeartbeat(termId, termIndex, url, port);
29+
var package = PackageRequestBuilder.LeaderHeartbeat(url, port, termId, termIndex);
3030
package.Serialize(writeCtx);
3131
var payload = writeCtx.Buffer.ToArray();
3232
await UdpClient.SendAsync(payload, payload.Count(), _target).WithCancellation(cancellationToken);

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/ConsensusCommands.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ public class ConsensusCommands : BaseCommands
55
public static ConsensusCommands LEADER_HEARTBEAT_REQUEST = new ConsensusCommands(0, "LEADER_HEARTBEAT_REQUEST");
66
public static ConsensusCommands VOTE_REQUEST = new ConsensusCommands(1, "VOTE_REQUEST");
77
public static ConsensusCommands VOTE_RESULT = new ConsensusCommands(2, "VOTE_RESULT");
8-
public static ConsensusCommands EMPTY_RESULT = new ConsensusCommands(3, "EMPTY_RESULT");
9-
public static ConsensusCommands APPEND_ENTRY_REQUEST = new ConsensusCommands(4, "APPEND_ENTRY_REQUEST");
10-
public static ConsensusCommands LEADER_HEARTBEAT_RESULT = new ConsensusCommands(5, "LEADER_HEARTBEAT_RESULT");
8+
public static ConsensusCommands APPEND_ENTRY_REQUEST = new ConsensusCommands(3, "APPEND_ENTRY_REQUEST");
9+
public static ConsensusCommands LEADER_HEARTBEAT_RESULT = new ConsensusCommands(4, "LEADER_HEARTBEAT_RESULT");
1110

1211
protected ConsensusCommands(int code)
1312
{

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/ConsensusPackage.cs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public static ConsensusPackage Deserialize(ReadBufferContext context)
2323
if (magicCode != MAGIC_CODE || version != PROTOCOL_VERSION) return null;
2424
var header = Header.Deserialize(context);
2525
if (header.Command == ConsensusCommands.VOTE_REQUEST) return new VoteRequest { Header = header };
26-
if (header.Command == ConsensusCommands.EMPTY_RESULT) return new EmptyResult { Header = header };
26+
if (header.Command == ConsensusCommands.LEADER_HEARTBEAT_REQUEST) return new LeaderHeartbeatRequest { Header = header };
27+
if (header.Command == ConsensusCommands.LEADER_HEARTBEAT_RESULT) return new LeaderHeartbeatResult { Header = header };
2728
if (header.Command == ConsensusCommands.VOTE_RESULT)
2829
{
2930
var result = new VoteResult
@@ -39,18 +40,6 @@ public static ConsensusPackage Deserialize(ReadBufferContext context)
3940
result.Extract(context);
4041
return result;
4142
}
42-
if (header.Command == ConsensusCommands.LEADER_HEARTBEAT_REQUEST)
43-
{
44-
var result = new LeaderHeartbeatRequest { Header = header };
45-
result.Extract(context);
46-
return result;
47-
}
48-
if(header.Command == ConsensusCommands.LEADER_HEARTBEAT_RESULT)
49-
{
50-
var result = new LeaderHeartbeatResult { Header = header };
51-
result.Extract(context);
52-
return result;
53-
}
5443

5544
return new ConsensusPackage
5645
{

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/EmptyResult.cs

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/Header.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,34 @@
22
{
33
public class Header
44
{
5-
public Header(ConsensusCommands command, string termId, long termIndex)
5+
public Header(ConsensusCommands command, string termId, long termIndex, string sourceUrl, int sourcePort)
66
{
77
Command = command;
88
TermId = termId;
99
TermIndex = termIndex;
10+
SourceUrl = sourceUrl;
11+
SourcePort = sourcePort;
1012
}
1113

1214
public ConsensusCommands Command { get; set; }
1315
public string TermId { get; set; }
1416
public long TermIndex { get; set; }
17+
public string SourceUrl { get; set; }
18+
public int SourcePort { get; set; }
1519

1620
public virtual void Serialize(WriteBufferContext context)
1721
{
1822
Command.Serialize(context);
1923
context.WriteString(TermId);
2024
context.WriteLong(TermIndex);
25+
context.WriteString(SourceUrl);
26+
context.WriteInteger(SourcePort);
2127
}
2228

2329
public static Header Deserialize(ReadBufferContext context)
2430
{
2531
var cmd = ConsensusCommands.Deserialize(context);
26-
return new Header(cmd, context.NextString(), context.NextLong());
32+
return new Header(cmd, context.NextString(), context.NextLong(), context.NextString(), context.NextInt());
2733
}
2834
}
2935
}

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/LeaderHeartbeatRequest.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,5 @@
22
{
33
public class LeaderHeartbeatRequest : ConsensusPackage
44
{
5-
public string Url { get; set; }
6-
public int Port { get; set; }
7-
8-
public override void Serialize(WriteBufferContext context)
9-
{
10-
base.Serialize(context);
11-
context.WriteString(Url);
12-
context.WriteInteger(Port);
13-
}
14-
15-
public void Extract(ReadBufferContext context)
16-
{
17-
Url = context.NextString();
18-
Port = context.NextInt();
19-
}
205
}
216
}

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/LeaderHeartbeatResult.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,5 @@
22
{
33
public class LeaderHeartbeatResult : ConsensusPackage
44
{
5-
public string Url { get; set; }
6-
public int Port { get; set; }
7-
8-
public override void Serialize(WriteBufferContext context)
9-
{
10-
base.Serialize(context);
11-
context.WriteString(Url);
12-
context.WriteInteger(Port);
13-
}
14-
15-
public void Extract(ReadBufferContext context)
16-
{
17-
Url = context.NextString();
18-
Port = context.NextInt();
19-
}
205
}
216
}

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/PackageRequestBuilder.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,27 @@
22
{
33
public class PackageRequestBuilder
44
{
5-
public static ConsensusPackage LeaderHeartbeat(string termId, long termIndex, string url, int port)
5+
public static ConsensusPackage LeaderHeartbeat(string url, int port, string termId, long termIndex)
66
{
77
return new LeaderHeartbeatRequest
88
{
9-
Header = new Header(ConsensusCommands.LEADER_HEARTBEAT_REQUEST, termId, termIndex),
10-
Url = url,
11-
Port = port
9+
Header = new Header(ConsensusCommands.LEADER_HEARTBEAT_REQUEST, termId, termIndex, url, port)
1210
};
1311
}
1412

15-
public static ConsensusPackage Vote(string termId, long termIndex)
13+
public static ConsensusPackage Vote(string url, int port, string termId, long termIndex)
1614
{
1715
return new VoteRequest
1816
{
19-
Header = new Header(ConsensusCommands.VOTE_REQUEST, termId, termIndex)
17+
Header = new Header(ConsensusCommands.VOTE_REQUEST, termId, termIndex, url, port)
2018
};
2119
}
2220

2321
public static ConsensusPackage AppendEntry(string termId, long termIndex, string value, bool isProxified = false)
2422
{
2523
return new AppendEntryRequest
2624
{
27-
Header = new Header(ConsensusCommands.APPEND_ENTRY_REQUEST, termId, termIndex),
25+
Header = new Header(ConsensusCommands.APPEND_ENTRY_REQUEST, termId, termIndex, string.Empty, 0),
2826
Value = value,
2927
IsProxified = isProxified
3028
};

src/RaftConsensus/FaasNet.RaftConsensus.Client/Messages/PackageResultBuilder.cs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,20 @@
22
{
33
public static class PackageResultBuilder
44
{
5-
public static ConsensusPackage Vote(string termId, long termIndex, bool voteGranted)
5+
public static ConsensusPackage Vote(string url, int port, string termId, long termIndex, bool voteGranted)
66
{
77
return new VoteResult
88
{
9-
Header = new Header(ConsensusCommands.VOTE_RESULT, termId, termIndex),
9+
Header = new Header(ConsensusCommands.VOTE_RESULT, termId, termIndex, url, port),
1010
VoteGranted = voteGranted
1111
};
1212
}
1313

14-
public static ConsensusPackage Empty(string termId, long termIndex)
15-
{
16-
return new EmptyResult
17-
{
18-
Header = new Header(ConsensusCommands.EMPTY_RESULT, termId, termIndex)
19-
};
20-
}
21-
2214
public static ConsensusPackage LeaderHeartbeat(string url, int port, string termId, long termIndex)
2315
{
2416
return new LeaderHeartbeatResult
2517
{
26-
Header = new Header(ConsensusCommands.LEADER_HEARTBEAT_RESULT, termId, termIndex),
27-
Port = port,
28-
Url = url
18+
Header = new Header(ConsensusCommands.LEADER_HEARTBEAT_RESULT, termId, termIndex, url, port)
2919
};
3020
}
3121
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/BaseNodeHost.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using Microsoft.Extensions.Options;
66
using System;
77
using System.Collections.Concurrent;
8-
using System.Collections.Generic;
98
using System.Linq;
109
using System.Net;
1110
using System.Net.Sockets;
@@ -16,6 +15,7 @@ namespace FaasNet.RaftConsensus.Core
1615
{
1716
public interface INodeHost
1817
{
18+
string NodeId { get; }
1919
BlockingCollection<IPeerHost> Peers { get; }
2020
Task Start(CancellationToken cancellationToken);
2121
Task Stop();
@@ -44,6 +44,7 @@ public BaseNodeHost(IPeerStore peerStore, IPeerHostFactory peerHostFactory, ILog
4444
public BlockingCollection<IPeerHost> Peers => _peers;
4545
public bool IsRunning { get; private set; }
4646
public UdpClient UdpServer { get; private set; }
47+
public string NodeId => _nodeId;
4748
public event EventHandler<EventArgs> NodeStarted;
4849
public event EventHandler<EventArgs> NodeStopped;
4950
protected CancellationTokenSource TokenSource { get; private set; }
@@ -107,11 +108,8 @@ private async Task HandleUDPPackage()
107108
var udpResult = await UdpServer.ReceiveAsync().WithCancellation(TokenSource.Token);
108109
var bufferContext = new ReadBufferContext(udpResult.Buffer.ToArray());
109110
var consensusPackage = ConsensusPackage.Deserialize(bufferContext);
110-
if(Ignore(consensusPackage)) return;
111111
var peerHost = _peers.First(p => p.Info.TermId == consensusPackage.Header.TermId);
112112
await _proxyClient.SendAsync(udpResult.Buffer, udpResult.Buffer.Count(), peerHost.UdpServerEdp);
113-
var proxifiedUdpResult = await _proxyClient.ReceiveAsync().WithCancellation(TokenSource.Token);
114-
await _proxyClient.SendAsync(proxifiedUdpResult.Buffer, proxifiedUdpResult.Buffer.Count(), udpResult.RemoteEndPoint);
115113
}
116114
catch(Exception ex)
117115
{
@@ -123,10 +121,5 @@ private UdpClient BuildUdpClient()
123121
{
124122
return new UdpClient(new IPEndPoint(IPAddress.Any, _options.Port));
125123
}
126-
127-
private bool Ignore(ConsensusPackage consensusPkg)
128-
{
129-
return consensusPkg.Header.Command == ConsensusCommands.EMPTY_RESULT;
130-
}
131124
}
132125
}

0 commit comments

Comments
 (0)