|
1 | 1 | using CloudNative.CloudEvents; |
| 2 | +using FaasNet.EventMesh.Client.Exceptions; |
2 | 3 | using FaasNet.EventMesh.Client.Messages; |
| 4 | +using FaasNet.RaftConsensus.Client; |
| 5 | +using FaasNet.RaftConsensus.Client.Extensions; |
3 | 6 | using System; |
4 | 7 | using System.Collections.Generic; |
5 | | -using System.Diagnostics; |
| 8 | +using System.Linq; |
| 9 | +using System.Net; |
| 10 | +using System.Net.Sockets; |
6 | 11 | using System.Text.Json; |
7 | 12 | using System.Threading; |
8 | 13 | using System.Threading.Tasks; |
9 | 14 |
|
10 | 15 | namespace FaasNet.EventMesh.Client |
11 | 16 | { |
12 | | - public class EventMeshClient : IDisposable |
| 17 | + public class EventMeshClient |
13 | 18 | { |
14 | | - private readonly string _clientId; |
15 | | - private readonly string _password; |
16 | | - private readonly string _vpn; |
17 | | - private readonly RuntimeClient _runtimeClient; |
18 | | - private readonly int _bufferCloudEvents; |
19 | | - private HelloResponse _publishSession; |
20 | | - private HelloResponse _subscribeSession; |
21 | | - |
22 | | - public EventMeshClient(string clientId, string password, string vpn = Constants.DefaultVpn, string url = Constants.DefaultUrl, int port = Constants.DefaultPort, int bufferCloudEvents = 1) |
| 19 | + private readonly IPAddress _ipAddr; |
| 20 | + private readonly int _port; |
| 21 | + |
| 22 | + public EventMeshClient(string url = Constants.DefaultUrl, int port = Constants.DefaultPort) |
23 | 23 | { |
24 | | - _clientId = clientId; |
25 | | - _password = password; |
26 | | - _vpn = vpn; |
27 | | - _bufferCloudEvents = bufferCloudEvents; |
28 | | - _runtimeClient = new RuntimeClient(url, port); |
| 24 | + _ipAddr = IPAddressHelper.ResolveIPAddress(url); |
| 25 | + _port = port; |
29 | 26 | } |
30 | 27 |
|
31 | | - public async Task Connect(CancellationToken cancellationToken = default(CancellationToken)) |
| 28 | + public async Task Ping(CancellationToken cancellationToken = default(CancellationToken)) |
32 | 29 | { |
33 | | - await _runtimeClient.HeartBeat(cancellationToken); |
| 30 | + using (var udpClient = new UdpClient()) |
| 31 | + { |
| 32 | + var writeCtx = new WriteBufferContext(); |
| 33 | + var package = PackageRequestBuilder.HeartBeat(); |
| 34 | + package.Serialize(writeCtx); |
| 35 | + var payload = writeCtx.Buffer.ToArray(); |
| 36 | + await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken); |
| 37 | + var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken); |
| 38 | + var readCtx = new ReadBufferContext(resultPayload.Buffer); |
| 39 | + var packageResult = Package.Deserialize(readCtx); |
| 40 | + EnsureSuccessStatus(package, packageResult); |
| 41 | + } |
34 | 42 | } |
35 | 43 |
|
36 | 44 | public async Task<IEnumerable<string>> GetAllVpns(CancellationToken cancellationToken = default(CancellationToken)) |
37 | 45 | { |
38 | | - var vpnResponse = await _runtimeClient.GetAllVpns(cancellationToken); |
39 | | - return vpnResponse.Vpns; |
| 46 | + using (var udpClient = new UdpClient()) |
| 47 | + { |
| 48 | + var writeCtx = new WriteBufferContext(); |
| 49 | + var package = PackageRequestBuilder.GetAllVpns(); |
| 50 | + package.Serialize(writeCtx); |
| 51 | + var payload = writeCtx.Buffer.ToArray(); |
| 52 | + await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken); |
| 53 | + var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken); |
| 54 | + var readCtx = new ReadBufferContext(resultPayload.Buffer); |
| 55 | + var packageResult = Package.Deserialize(readCtx); |
| 56 | + EnsureSuccessStatus(package, packageResult); |
| 57 | + var result = packageResult as GetAllVpnResponse; |
| 58 | + return result.Vpns; |
| 59 | + } |
40 | 60 | } |
41 | 61 |
|
42 | | - public Task Publish(string topicName, object obj, CancellationToken cancellationToken = default(CancellationToken)) |
| 62 | + public async Task<IEventMeshClientPubSession> CreatePubSession(string clientId, CancellationToken cancellationToken = default(CancellationToken)) |
43 | 63 | { |
44 | | - var cloudEvt = new CloudEvent |
45 | | - { |
46 | | - Id = Guid.NewGuid().ToString(), |
47 | | - Subject = topicName, |
48 | | - Source = new Uri("http://localhost"), |
49 | | - Type = topicName, |
50 | | - DataContentType = "application/json", |
51 | | - Data = JsonSerializer.Serialize(obj), |
52 | | - Time = DateTimeOffset.UtcNow |
53 | | - }; |
54 | | - return Publish(topicName, cloudEvt, cancellationToken); |
| 64 | + var pubSession = await CreateSession(clientId, cancellationToken); |
| 65 | + return new EventMeshClientPubSession(pubSession, _ipAddr, _port); |
55 | 66 | } |
56 | 67 |
|
57 | | - public async Task Publish(string topicName, CloudEvent cloudEvent, TimeSpan? expirationTimeSpan = null, bool isSessionInfinite = false, CancellationToken cancellationToken = default(CancellationToken)) |
| 68 | + public async Task<IEventMeshClientSubSession> CreateSubSession(string clientId, CancellationToken cancellationToken = default(CancellationToken)) |
58 | 69 | { |
59 | | - if (_publishSession == null) |
| 70 | + var subSession = await CreateSession(clientId, cancellationToken); |
| 71 | + return new EventMeshClientSubSession(subSession, _ipAddr, _port); |
| 72 | + } |
| 73 | + |
| 74 | + public async Task AddVpn(string vpn, CancellationToken cancellationToken = default(CancellationToken)) |
| 75 | + { |
| 76 | + using (var udpClient = new UdpClient()) |
60 | 77 | { |
61 | | - _publishSession = await CreateSession(_clientId, _password, UserAgentPurpose.PUB, expirationTimeSpan, isSessionInfinite, cancellationToken); |
| 78 | + var writeCtx = new WriteBufferContext(); |
| 79 | + var package = PackageRequestBuilder.AddVPN(vpn); |
| 80 | + package.Serialize(writeCtx); |
| 81 | + var payload = writeCtx.Buffer.ToArray(); |
| 82 | + await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken); |
| 83 | + var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken); |
| 84 | + var readCtx = new ReadBufferContext(resultPayload.Buffer); |
| 85 | + var packageResult = Package.Deserialize(readCtx); |
| 86 | + EnsureSuccessStatus(package, packageResult); |
62 | 87 | } |
63 | | - |
64 | | - await _runtimeClient.PublishMessage(_clientId, _publishSession.SessionId, topicName, cloudEvent); |
65 | 88 | } |
66 | 89 |
|
67 | | - public async Task<SubscriptionResult> SubscribeMessages<TMessage>(string topicName, Action<TMessage> callback, TimeSpan? expirationTimeSpan = null, bool isSessionInfinite = false, CancellationToken cancellationToken = default(CancellationToken)) where TMessage : class |
| 90 | + public async Task AddClient(string vpn, string clientId, CancellationToken cancellationToken = default(CancellationToken)) |
68 | 91 | { |
69 | | - return await Subscribe(topicName, (msg) => |
| 92 | + using (var udpClient = new UdpClient()) |
70 | 93 | { |
71 | | - foreach(var cloudEvt in msg.CloudEvents) |
72 | | - { |
73 | | - var deserialize = JsonSerializer.Deserialize(cloudEvt.Data.ToString(), typeof(TMessage)) as TMessage; |
74 | | - callback(deserialize); |
75 | | - } |
76 | | - }, expirationTimeSpan, isSessionInfinite, cancellationToken); |
| 94 | + var writeCtx = new WriteBufferContext(); |
| 95 | + var package = PackageRequestBuilder.AddClient(vpn, clientId); |
| 96 | + package.Serialize(writeCtx); |
| 97 | + var payload = writeCtx.Buffer.ToArray(); |
| 98 | + await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken); |
| 99 | + var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken); |
| 100 | + var readCtx = new ReadBufferContext(resultPayload.Buffer); |
| 101 | + var packageResult = Package.Deserialize(readCtx); |
| 102 | + EnsureSuccessStatus(package, packageResult); |
| 103 | + } |
77 | 104 | } |
78 | 105 |
|
79 | | - public async Task<SubscriptionResult> Subscribe(string topicName, Action<AsyncMessageToClient> callback, TimeSpan? expirationTimeSpan = null, bool isSessionInfinite = false, CancellationToken cancellationToken = default(CancellationToken)) |
| 106 | + internal static void EnsureSuccessStatus(Package packageRequest, Package packageResponse) |
80 | 107 | { |
81 | | - if (_subscribeSession == null) |
| 108 | + if (packageResponse.Header.Status != HeaderStatus.SUCCESS) |
82 | 109 | { |
83 | | - _subscribeSession = await CreateSession(_clientId, _password, UserAgentPurpose.SUB, expirationTimeSpan, isSessionInfinite, cancellationToken); |
| 110 | + throw new RuntimeClientResponseException(packageResponse.Header.Status, packageResponse.Header.Error); |
84 | 111 | } |
85 | 112 |
|
86 | | - return await _runtimeClient.Subscribe(_clientId, _subscribeSession.SessionId, new List<SubscriptionItem> |
| 113 | + if (packageRequest.Header.Seq != packageRequest.Header.Seq) |
87 | 114 | { |
88 | | - new SubscriptionItem |
89 | | - { |
90 | | - Topic = topicName |
91 | | - } |
92 | | - }, callback, cancellationToken: cancellationToken); |
| 115 | + throw new RuntimeClientResponseException(HeaderStatus.FAIL, Errors.INVALID_SEQ, "the seq in the request doesn't match the seq in the response"); |
| 116 | + } |
93 | 117 | } |
94 | 118 |
|
95 | | - public async Task Disconnect() |
| 119 | + private async Task<HelloResponse> CreateSession(string clientId, CancellationToken cancellationToken = default(CancellationToken)) |
96 | 120 | { |
97 | | - if (_publishSession != null) |
| 121 | + var userAgent = new UserAgent { ClientId = clientId, Purpose = UserAgentPurpose.PUB }; |
| 122 | + using (var udpClient = new UdpClient()) |
98 | 123 | { |
99 | | - await _runtimeClient.Disconnect(_clientId, _publishSession.SessionId); |
100 | | - } |
101 | | - |
102 | | - if (_subscribeSession != null) |
103 | | - { |
104 | | - await _runtimeClient.Disconnect(_clientId, _subscribeSession.SessionId); |
| 124 | + var writeCtx = new WriteBufferContext(); |
| 125 | + var package = PackageRequestBuilder.Hello(userAgent); |
| 126 | + package.Serialize(writeCtx); |
| 127 | + var payload = writeCtx.Buffer.ToArray(); |
| 128 | + await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken); |
| 129 | + var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken); |
| 130 | + var readCtx = new ReadBufferContext(resultPayload.Buffer); |
| 131 | + var packageResult = Package.Deserialize(readCtx); |
| 132 | + EnsureSuccessStatus(package, packageResult); |
| 133 | + return packageResult as HelloResponse; |
105 | 134 | } |
106 | 135 | } |
| 136 | + } |
107 | 137 |
|
108 | | - public void Close() |
| 138 | + public interface IEventMeshClientPubSession |
| 139 | + { |
| 140 | + Task Publish(string topicName, object obj, CancellationToken cancellationToken = default(CancellationToken)); |
| 141 | + Task Publish(string topicName, CloudEvent cloudEvent, CancellationToken cancellationToken = default(CancellationToken)); |
| 142 | + } |
| 143 | + |
| 144 | + public interface IEventMeshClientSubSession |
| 145 | + { |
| 146 | + |
| 147 | + } |
| 148 | + |
| 149 | + internal class EventMeshClientPubSession : IEventMeshClientPubSession |
| 150 | + { |
| 151 | + private readonly HelloResponse _session; |
| 152 | + private readonly IPAddress _ipAddr; |
| 153 | + private readonly int _port; |
| 154 | + |
| 155 | + public EventMeshClientPubSession(HelloResponse session, IPAddress ipAddr, int port) |
109 | 156 | { |
110 | | - _runtimeClient.Close(); |
| 157 | + _session = session; |
| 158 | + _ipAddr = ipAddr; |
| 159 | + _port = port; |
111 | 160 | } |
112 | 161 |
|
113 | | - public void Dispose() |
| 162 | + public Task Publish(string topicName, object obj, CancellationToken cancellationToken = default(CancellationToken)) |
114 | 163 | { |
115 | | - Disconnect().Wait(); |
116 | | - _runtimeClient.Close(); |
| 164 | + var cloudEvt = new CloudEvent |
| 165 | + { |
| 166 | + Id = Guid.NewGuid().ToString(), |
| 167 | + Subject = topicName, |
| 168 | + Source = new Uri("http://localhost"), |
| 169 | + Type = topicName, |
| 170 | + DataContentType = "application/json", |
| 171 | + Data = JsonSerializer.Serialize(obj), |
| 172 | + Time = DateTimeOffset.UtcNow |
| 173 | + }; |
| 174 | + return Publish(topicName, cloudEvt, cancellationToken); |
117 | 175 | } |
118 | 176 |
|
119 | | - private Task<HelloResponse> CreateSession(string clientId, string password, UserAgentPurpose purpose, TimeSpan? expirationTimeSpan = null, bool isSessionInfinite = false, CancellationToken cancellationToken = default(CancellationToken)) |
| 177 | + public async Task Publish(string topicName, CloudEvent cloudEvent, CancellationToken cancellationToken = default(CancellationToken)) |
120 | 178 | { |
121 | | - var processId = Process.GetCurrentProcess().Id; |
122 | | - return _runtimeClient.Hello(new UserAgent |
| 179 | + using (var udpClient = new UdpClient()) |
123 | 180 | { |
124 | | - ClientId = clientId, |
125 | | - Environment = "TST", |
126 | | - Password = password, |
127 | | - Pid = processId, |
128 | | - Purpose = purpose, |
129 | | - Version = "0", |
130 | | - BufferCloudEvents = _bufferCloudEvents, |
131 | | - Expiration = expirationTimeSpan, |
132 | | - IsSessionInfinite = isSessionInfinite, |
133 | | - Vpn = _vpn |
134 | | - }, cancellationToken); |
| 181 | + var writeCtx = new WriteBufferContext(); |
| 182 | + var package = PackageRequestBuilder.PublishMessage(_session.SessionId, topicName, cloudEvent); |
| 183 | + package.Serialize(writeCtx); |
| 184 | + var payload = writeCtx.Buffer.ToArray(); |
| 185 | + await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)); |
| 186 | + var resultPayload = await udpClient.ReceiveAsync(); |
| 187 | + var readCtx = new ReadBufferContext(resultPayload.Buffer); |
| 188 | + var packageResult = Package.Deserialize(readCtx); |
| 189 | + EventMeshClient.EnsureSuccessStatus(package, packageResult); |
| 190 | + } |
| 191 | + } |
| 192 | + } |
| 193 | + |
| 194 | + internal class EventMeshClientSubSession : IEventMeshClientSubSession |
| 195 | + { |
| 196 | + private readonly HelloResponse _session; |
| 197 | + private readonly IPAddress _ipAddr; |
| 198 | + private readonly int _port; |
| 199 | + |
| 200 | + public EventMeshClientSubSession(HelloResponse session, IPAddress ipAddr, int port) |
| 201 | + { |
| 202 | + _session = session; |
| 203 | + _ipAddr = ipAddr; |
| 204 | + _port = port; |
135 | 205 | } |
136 | 206 | } |
137 | 207 | } |
0 commit comments