Skip to content

Commit 12a8fc9

Browse files
Ticket #102 : WIP
1 parent 67af85b commit 12a8fc9

5 files changed

Lines changed: 77 additions & 59 deletions

File tree

src/EventMesh/FaasNet.EventMesh.Client/Extensions/CloudEventExtensions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ namespace FaasNet.EventMesh.Client.Extensions
88
{
99
public static class CloudEventExtensions
1010
{
11+
public static string SerializeBase64(this CloudEvent cloudEvent)
12+
{
13+
var bufferContext = new WriteBufferContext();
14+
Serialize(cloudEvent, bufferContext);
15+
return Convert.ToBase64String(bufferContext.Buffer.ToArray());
16+
}
17+
1118
public static void Serialize(this CloudEvent cloudEvent, WriteBufferContext bufferContext)
1219
{
1320
var formatter = new JsonEventFormatter();
Lines changed: 49 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1-
using FaasNet.EventMesh.Client;
1+
using FaasNet.EventMesh.Client.Extensions;
22
using FaasNet.EventMesh.Client.Messages;
33
using FaasNet.EventMesh.Runtime.Exceptions;
44
using FaasNet.EventMesh.Runtime.Models;
55
using FaasNet.EventMesh.Runtime.Stores;
6+
using FaasNet.RaftConsensus.Client;
7+
using FaasNet.RaftConsensus.Core;
8+
using FaasNet.RaftConsensus.Core.Models;
9+
using FaasNet.RaftConsensus.Core.Stores;
10+
using Microsoft.Extensions.Options;
11+
using System;
612
using System.Collections.Generic;
713
using System.Diagnostics;
14+
using System.Linq;
815
using System.Threading;
916
using System.Threading.Tasks;
1017

@@ -13,10 +20,14 @@ namespace FaasNet.EventMesh.Runtime.Handlers
1320
public class PublishMessageRequestHandler : BaseMessageHandler, IMessageHandler
1421
{
1522
private readonly IMessageExchangeStore _messageExchangeStore;
23+
private readonly IClusterStore _clusterStore;
24+
private readonly ConsensusPeerOptions _peerOptions;
1625

17-
public PublishMessageRequestHandler(IVpnStore vpnStore, IClientSessionStore clientSessionStore, IMessageExchangeStore messageExchangeStore) : base(clientSessionStore, vpnStore)
26+
public PublishMessageRequestHandler(IVpnStore vpnStore, IClientSessionStore clientSessionStore, IMessageExchangeStore messageExchangeStore, IClusterStore clusterStore, IOptions<ConsensusPeerOptions> peerOption) : base(clientSessionStore, vpnStore)
1827
{
1928
_messageExchangeStore = messageExchangeStore;
29+
_clusterStore = clusterStore;
30+
_peerOptions = peerOption.Value;
2031
}
2132

2233
public Commands Command => Commands.PUBLISH_MESSAGE_REQUEST;
@@ -25,29 +36,10 @@ public async Task<EventMeshPackageResult> Run(Package package, CancellationToken
2536
{
2637
var publishMessageRequest = package as PublishMessageRequest;
2738
await CheckSession(publishMessageRequest, cancellationToken);
28-
29-
using (var activity = EventMeshMeter.RequestActivitySource.StartActivity("Publish to local message brokers"))
30-
{
31-
if (
32-
(!string.IsNullOrWhiteSpace(publishMessageRequest.Urn) && _runtimeOpts.Urn == publishMessageRequest.Urn) ||
33-
(string.IsNullOrWhiteSpace(publishMessageRequest.Urn)))
34-
{
35-
foreach (var publisher in _messagePublishers)
36-
{
37-
await publisher.Publish(publishMessageRequest.CloudEvent, publishMessageRequest.Topic, sessionResult.ClientSession);
38-
}
39-
}
40-
41-
activity?.SetStatus(ActivityStatusCode.Ok);
42-
}
43-
44-
using (var activity = EventMeshMeter.RequestActivitySource.StartActivity("Broadcast message"))
45-
{
46-
await Broadcast(publishMessageRequest, sessionResult.ClientSession, sessionResult.Vpn.BridgeServers);
47-
activity?.SetStatus(ActivityStatusCode.Ok);
48-
}
49-
50-
return PackageResponseBuilder.PublishMessage(package.Header.Seq);
39+
var queueNames = await GetQueueNames(publishMessageRequest, cancellationToken);
40+
await BroadcastMessage(publishMessageRequest, queueNames, cancellationToken);
41+
var result = PackageResponseBuilder.PublishMessage(package.Header.Seq);
42+
return EventMeshPackageResult.SendResult(result);
5143
}
5244

5345
private async Task CheckSession(PublishMessageRequest message, CancellationToken cancellationToken)
@@ -65,40 +57,47 @@ private async Task CheckSession(PublishMessageRequest message, CancellationToken
6557
}
6658
}
6759

68-
private async Task BroadcastMessage(PublishMessageRequest message, CancellationToken cancellationToken)
60+
private async Task<IEnumerable<string>> GetQueueNames(PublishMessageRequest message, CancellationToken cancellationToken)
6961
{
70-
// get all the queues from exchange.
71-
// append entry | append the message to the queue.
72-
// a client must pool the message from the queue (consensus).
62+
IEnumerable<MessageExchange> messageExchanges;
63+
using(var activity = EventMeshMeter.RequestActivitySource.StartActivity("Get all message exchange"))
64+
{
65+
messageExchanges = await _messageExchangeStore.GetAll(cancellationToken);
66+
activity?.SetStatus(ActivityStatusCode.Ok);
67+
}
68+
69+
var queueNames = new List<string>();
70+
foreach(var messageExchange in messageExchanges)
71+
{
72+
if (messageExchange.IsMatch(message.Topic)) queueNames.AddRange(messageExchange.ClientIds.Select(ci => Models.Client.BuildQueueName((ci))));
73+
}
74+
75+
return queueNames;
7376
}
7477

75-
private async Task Broadcast(PublishMessageRequest publishMessageRequest, Models.Client client, ICollection<BridgeServer> bridgeServers)
78+
private async Task BroadcastMessage(PublishMessageRequest message, IEnumerable<string> queueNames, CancellationToken cancellationToken)
7679
{
77-
foreach (var bridgeServer in bridgeServers)
80+
var rndClusterNode = await GetRandomClusterNode(cancellationToken);
81+
var base64Message = message.CloudEvent.SerializeBase64();
82+
using (var activity = EventMeshMeter.RequestActivitySource.StartActivity("Broadcast message"))
7883
{
79-
await Broadcast(publishMessageRequest, bridgeServer, client);
84+
using (var consensusClient = new ConsensusClient(rndClusterNode.Url, rndClusterNode.Port))
85+
{
86+
foreach (var queueName in queueNames)
87+
{
88+
await consensusClient.AppendEntry(queueName, base64Message, cancellationToken);
89+
}
90+
}
8091
}
8192
}
8293

83-
private async Task Broadcast(PublishMessageRequest publishMessageRequest, BridgeServer bridgeServer, Models.Client client)
94+
private async Task<ClusterNode> GetRandomClusterNode(CancellationToken cancellationToken)
8495
{
85-
var activeSession = client.GetActiveSession(publishMessageRequest.SessionId);
86-
var pid = Process.GetCurrentProcess().Id;
87-
var runtimeClient = new RuntimeClient(bridgeServer.TargetUrn, bridgeServer.TargetPort);
88-
var helloResponse = await runtimeClient.Hello(new UserAgent
89-
{
90-
ClientId = client.ClientId,
91-
Purpose = activeSession.Purpose,
92-
Environment = activeSession.Environment,
93-
BufferCloudEvents = activeSession.BufferCloudEvents,
94-
Urn = _runtimeOpts.Urn,
95-
Port = _runtimeOpts.Port,
96-
Pid = pid,
97-
IsServer = true,
98-
Vpn = bridgeServer.TargetVpn
99-
});
100-
await runtimeClient.PublishMessage(client.ClientId, helloResponse.SessionId, publishMessageRequest.Topic, publishMessageRequest.CloudEvent, publishMessageRequest.Urn, publishMessageRequest.Port);
101-
await runtimeClient.Disconnect(client.ClientId, helloResponse.SessionId);
96+
var nodes = await _clusterStore.GetAllNodes(cancellationToken);
97+
nodes = nodes.Where(n => n.Port != _peerOptions.Port || n.Url != _peerOptions.Url);
98+
var rnd = new Random();
99+
var rndIndex = rnd.Next(0, nodes.Count() - 1);
100+
return nodes.ElementAt(rndIndex);
102101
}
103102
}
104103
}

src/EventMesh/FaasNet.EventMesh.Runtime/Handlers/SubscribeMessageHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private async Task<IEnumerable<string>> AddMessageExchanges(SubscriptionRequest
5151
foreach (var topicFilter in subscriptionRequest.TopicFilters)
5252
{
5353
var messageExchange = await _messageExchangeStore.Get(subscriptionRequest.ClientId, topicFilter.Topic, cancellationToken);
54-
if (messageExchange == null) await _messageExchangeStore.Add(new Models.MessageExchange { ClientId = subscriptionRequest.ClientId, Topic = topicFilter.Topic }, cancellationToken);
54+
if (messageExchange == null) await _messageExchangeStore.Add(new Models.MessageExchange { ClientId = subscriptionRequest.ClientId, TopicFilter = topicFilter.Topic }, cancellationToken);
5555
result.Add(Models.Client.BuildQueueName(subscriptionRequest.ClientId));
5656
}
5757

src/EventMesh/FaasNet.EventMesh.Runtime/Models/MessageExchange.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
11
using FaasNet.RaftConsensus.Core.Models;
2+
using System.Collections.Generic;
23
using System.Text.Json;
4+
using System.Text.RegularExpressions;
35

46
namespace FaasNet.EventMesh.Runtime.Models
57
{
68
public class MessageExchange
79
{
8-
public string ClientId { get; set; }
9-
public string Topic { get; set; }
10+
public string TopicFilter { get; set; }
11+
public IEnumerable<string> ClientIds { get; set; }
1012

11-
public static string BuildId(string clientId, string topic)
13+
public bool IsMatch(string filter)
1214
{
13-
return $"{clientId}_{topic}";
15+
var regex = new Regex(TopicFilter);
16+
return regex.IsMatch(filter);
1417
}
1518

1619
public NodeState ToNodeState()
1720
{
1821
return new NodeState
1922
{
2023
EntityType = StandardEntityTypes.MessageExchange,
21-
EntityId = BuildId(ClientId, Topic),
24+
EntityId = TopicFilter,
2225
EntityVersion = 0,
2326
Value = JsonSerializer.Serialize(this)
2427
};

src/EventMesh/FaasNet.EventMesh.Runtime/Stores/MessageExchangeStore.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using FaasNet.EventMesh.Runtime.Models;
22
using FaasNet.RaftConsensus.Core.Stores;
3+
using System.Collections.Generic;
4+
using System.Linq;
35
using System.Text.Json;
46
using System.Threading;
57
using System.Threading.Tasks;
@@ -8,7 +10,8 @@ namespace FaasNet.EventMesh.Runtime.Stores
810
{
911
public interface IMessageExchangeStore
1012
{
11-
Task<MessageExchange> Get(string clientId, string topic, CancellationToken cancellationToken);
13+
Task<IEnumerable<MessageExchange>> GetAll(CancellationToken cancellationToken);
14+
Task<MessageExchange> Get(string topicFilter, CancellationToken cancellationToken);
1215
Task Add(MessageExchange messageExchange, CancellationToken cancellationToken);
1316
}
1417

@@ -21,6 +24,12 @@ public MessageExchangeStore(INodeStateStore nodeStateStore)
2124
_nodeStateStore = nodeStateStore;
2225
}
2326

27+
public async Task<IEnumerable<MessageExchange>> GetAll(CancellationToken cancellationToken)
28+
{
29+
var result = await _nodeStateStore.GetAllLastEntityTypes(cancellationToken);
30+
return result.Select(n => JsonSerializer.Deserialize<MessageExchange>(n.Value, new JsonSerializerOptions { PropertyNameCaseInsensitive = true }));
31+
}
32+
2433
public async Task Add(MessageExchange messageExchange, CancellationToken cancellationToken)
2534
{
2635
var nodeState = messageExchange.ToNodeState();
@@ -29,9 +38,9 @@ public async Task Add(MessageExchange messageExchange, CancellationToken cancell
2938
_nodeStateStore.Add(nodeState);
3039
}
3140

32-
public async Task<MessageExchange> Get(string clientId, string topic, CancellationToken cancellationToken)
41+
public async Task<MessageExchange> Get(string topicFilter, CancellationToken cancellationToken)
3342
{
34-
var lastEntityId = await _nodeStateStore.GetLastEntityId(MessageExchange.BuildId(clientId, topic), cancellationToken);
43+
var lastEntityId = await _nodeStateStore.GetLastEntityId(topicFilter, cancellationToken);
3544
if (lastEntityId == null) return null;
3645
return JsonSerializer.Deserialize<MessageExchange>(lastEntityId.Value, new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
3746
}

0 commit comments

Comments
 (0)