Skip to content

Commit 71ec661

Browse files
Ticket #108 : support websocket + fix all the UTs + add VPN bridge between two servers
1 parent bdd2aa7 commit 71ec661

18 files changed

Lines changed: 443 additions & 445 deletions

File tree

FaasNet.EventMesh.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.FirstCons
5959
EndProject
6060
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.SecondConsole", "src\EventMesh\FaasNet.EventMesh.SecondConsole\FaasNet.EventMesh.SecondConsole.csproj", "{FBD50A91-127A-44F7-BF12-4666448086B9}"
6161
EndProject
62+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Protocols.WebSocket", "src\EventMesh\FaasNet.EventMesh.Protocols.WebSocket\FaasNet.EventMesh.Protocols.WebSocket.csproj", "{01F8C927-42A6-48E2-99E0-E203B3EE534A}"
63+
EndProject
6264
Global
6365
GlobalSection(SolutionConfigurationPlatforms) = preSolution
6466
Debug|Any CPU = Debug|Any CPU
@@ -145,6 +147,10 @@ Global
145147
{FBD50A91-127A-44F7-BF12-4666448086B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
146148
{FBD50A91-127A-44F7-BF12-4666448086B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
147149
{FBD50A91-127A-44F7-BF12-4666448086B9}.Release|Any CPU.Build.0 = Release|Any CPU
150+
{01F8C927-42A6-48E2-99E0-E203B3EE534A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
151+
{01F8C927-42A6-48E2-99E0-E203B3EE534A}.Debug|Any CPU.Build.0 = Debug|Any CPU
152+
{01F8C927-42A6-48E2-99E0-E203B3EE534A}.Release|Any CPU.ActiveCfg = Release|Any CPU
153+
{01F8C927-42A6-48E2-99E0-E203B3EE534A}.Release|Any CPU.Build.0 = Release|Any CPU
148154
EndGlobalSection
149155
GlobalSection(SolutionProperties) = preSolution
150156
HideSolutionNode = FALSE
@@ -173,6 +179,7 @@ Global
173179
{E26134A4-0C4E-4362-A5A0-712254365135} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}
174180
{394401D8-D005-49E1-A735-47284C034D44} = {E26134A4-0C4E-4362-A5A0-712254365135}
175181
{FBD50A91-127A-44F7-BF12-4666448086B9} = {E26134A4-0C4E-4362-A5A0-712254365135}
182+
{01F8C927-42A6-48E2-99E0-E203B3EE534A} = {225E35C4-C3C2-4319-96BF-CBF86DCB5C46}
176183
EndGlobalSection
177184
GlobalSection(ExtensibilityGlobals) = postSolution
178185
SolutionGuid = {B9BD3B8C-B2C9-468F-BF54-66BFE9B565EC}

src/EventMesh/FaasNet.EventMesh.Client/Messages/Errors.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ public class Errors : IEquatable<Errors>
1616
public static Errors INVALID_SEQ = new Errors("INVALID_SEQ");
1717
public static Errors INVALID_BRIDGE = new Errors("INVALID_BRIDGE");
1818
public static Errors BRIDGE_NOT_ACTIVE = new Errors("BRIDGE_NOT_ACTIVE");
19-
public static Errors BRIDGE_EXISTS = new Errors("BRIDGE_EXISTS");
2019
public static Errors UNKNOWN_BRIDGE = new Errors("UNKNOWN_BRIDGE");
2120
public static Errors UNKNOWN_VPN = new Errors("UNKNOWN_VPN");
2221
public static Errors UNKNOWN_SOURCE_VPN = new Errors("UNKNOWN_SOURCE_VPN");

src/EventMesh/FaasNet.EventMesh.Common/ConsoleHelper.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,20 @@ public class ConsoleHelper
2020
{
2121
private static int _seedPort;
2222
private static int _amqpPort = 5672;
23+
private static int _webSocketPort = 2803;
2324
private static int _nbNode;
2425
private static List<INodeHost> _allNodes;
2526

26-
public static async Task Start(int seedPort, int amqpPort = 5672)
27+
public static async Task Start(int seedPort, int amqpPort = 5672, int webSocketPort = 2803)
2728
{
28-
// Ajouter des tests unitaires.
2929
// Supporter WS-Socket.
3030
_seedPort = seedPort;
3131
_amqpPort = amqpPort;
32+
_webSocketPort = webSocketPort;
3233
_nbNode = 1;
3334
_allNodes = new List<INodeHost> { BuildNodeHost(seedPort, true) };
3435
await StartAMQPProtocol();
36+
await StartWebSocketProtocol();
3537
await StartNodes(_allNodes);
3638
await DisplayMenu(_allNodes);
3739
}
@@ -50,6 +52,20 @@ private static async Task StartAMQPProtocol()
5052
await proxy.Start();
5153
}
5254

55+
private static async Task StartWebSocketProtocol()
56+
{
57+
var serviceCollection = new ServiceCollection();
58+
var serverBuilder = new ServerBuilder(serviceCollection);
59+
serverBuilder.UseWebSocket(o =>
60+
{
61+
o.EventMeshPort = _seedPort;
62+
o.Port = _webSocketPort;
63+
});
64+
var serviceProvider = serverBuilder.Services.BuildServiceProvider();
65+
var proxy = serviceProvider.GetRequiredService<IProxy>();
66+
await proxy.Start();
67+
}
68+
5369
private static async Task StartNodes(IEnumerable<INodeHost> allNodes)
5470
{
5571
for (int i = 0; i < allNodes.Count(); i++) await StartNode(allNodes.ElementAt(i), i);

src/EventMesh/FaasNet.EventMesh.Common/FaasNet.EventMesh.Common.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<ItemGroup>
1414
<ProjectReference Include="..\..\RaftConsensus\FaasNet.RaftConsensus.RocksDB\FaasNet.RaftConsensus.RocksDB.csproj" />
1515
<ProjectReference Include="..\FaasNet.EventMesh.Protocols.AMQP\FaasNet.EventMesh.Protocols.AMQP.csproj" />
16+
<ProjectReference Include="..\FaasNet.EventMesh.Protocols.WebSocket\FaasNet.EventMesh.Protocols.WebSocket.csproj" />
1617
<ProjectReference Include="..\FaasNet.EventMesh.Runtime\FaasNet.EventMesh.Runtime.csproj" />
1718
<ProjectReference Include="..\FaasNet.EventMesh.Seed.AMQP\FaasNet.EventMesh.Seed.AMQP.csproj" />
1819
<ProjectReference Include="..\FaasNet.EventMesh.Seed.Kafka\FaasNet.EventMesh.Seed.Kafka.csproj" />

src/EventMesh/FaasNet.EventMesh.FirstConsole/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ internal class Program
77
public static void Main(string[] args)
88
{
99
Console.Title = "First console";
10-
ConsoleHelper.Start(4000, 5672).Wait();
10+
ConsoleHelper.Start(4000, 5672, 2803).Wait();
1111
}
1212
}
1313
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/AMQPProxy.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class AMQPProxy : BaseProxy
2121
private readonly IEnumerable<IRequestHandler> _requestHandlers;
2222
private static ManualResetEvent _lock = new ManualResetEvent(false);
2323
private readonly ILogger<AMQPProxy> _logger;
24+
private Socket _server;
2425

2526
public AMQPProxy(IOptions<EventMeshAMQPOptions> options, IEnumerable<IRequestHandler> requestHandlers, ILogger<AMQPProxy> logger)
2627
{
@@ -32,10 +33,15 @@ public AMQPProxy(IOptions<EventMeshAMQPOptions> options, IEnumerable<IRequestHan
3233
protected override void Init()
3334
{
3435
var localEndPoint = new IPEndPoint(IPAddress.Loopback, _options.Port);
35-
var server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
36-
server.Bind(localEndPoint);
37-
server.Listen();
38-
Task.Run(() => Handle(server));
36+
_server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
37+
_server.Bind(localEndPoint);
38+
_server.Listen();
39+
Task.Run(() => Handle(_server));
40+
}
41+
42+
protected override void Shutdown()
43+
{
44+
_server.Close();
3945
}
4046

4147
private void Handle(Socket server)
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
using CloudNative.CloudEvents;
2+
using FaasNet.EventMesh.Client;
3+
using NetCoreServer;
4+
using System;
5+
using System.Text;
6+
using System.Text.Json;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
namespace FaasNet.EventMesh.Protocols.WebSocket
11+
{
12+
public class EventMeshServerWSSession: WsSession
13+
{
14+
private readonly EventMeshWebSocketOptions _options;
15+
16+
public EventMeshServerWSSession(EventMeshWebSocketOptions options, WsServer server) : base(server)
17+
{
18+
_options = options;
19+
}
20+
21+
public override void OnWsConnected(HttpRequest request)
22+
{
23+
base.OnWsConnected(request);
24+
}
25+
26+
public override void OnWsDisconnected()
27+
{
28+
base.OnWsDisconnected();
29+
}
30+
31+
public override async void OnWsReceived(byte[] buffer, long offset, long size)
32+
{
33+
var json = Encoding.UTF8.GetString(buffer, (int)offset, (int)size);
34+
if (await TryPublishMessage(json)) return;
35+
if (await TryDirectSubscribe(json)) return;
36+
}
37+
38+
private async Task<bool> TryPublishMessage(string json)
39+
{
40+
var publishMessageRequest = JsonSerializer.Deserialize<PublishMessageRequest>(json, new JsonSerializerOptions
41+
{
42+
PropertyNameCaseInsensitive = true
43+
});
44+
if (publishMessageRequest == null || publishMessageRequest.RequestType != "PUBLISH") return false;
45+
var cloudEvent = new CloudEvent
46+
{
47+
Type = publishMessageRequest.Topic,
48+
Source = new Uri("https://eventmeshserver.com"),
49+
Subject = publishMessageRequest.Subject,
50+
Id = publishMessageRequest.Id,
51+
Time = DateTimeOffset.Now,
52+
DataContentType = "application/json",
53+
Data = publishMessageRequest.Content
54+
};
55+
var eventMeshClient = new EventMeshClient(_options.EventMeshUrl, _options.EventMeshPort);
56+
var pubSession = await eventMeshClient.CreatePubSession(publishMessageRequest.Vpn, publishMessageRequest.ClientId, null, CancellationToken.None);
57+
await pubSession.Publish(publishMessageRequest.Topic, cloudEvent, CancellationToken.None);
58+
return true;
59+
}
60+
61+
private async Task<bool> TryDirectSubscribe(string json)
62+
{
63+
var subscribeRequest = JsonSerializer.Deserialize<DirectSubscribeRequest>(json, new JsonSerializerOptions
64+
{
65+
PropertyNameCaseInsensitive = true
66+
});
67+
if (subscribeRequest == null || subscribeRequest.RequestType != "DIRECTLY_SUBSCRIBE") return false;
68+
var eventMeshClient = new EventMeshClient(_options.EventMeshUrl, _options.EventMeshPort);
69+
var subSession = await eventMeshClient.CreateSubSession(subscribeRequest.Vpn, subscribeRequest.ClientId, null, CancellationToken.None);
70+
subSession.DirectSubscribe(subscribeRequest.Filter, (ce) =>
71+
{
72+
var json = JsonSerializer.Serialize(new CloudEventResult { Data = ce.Data.ToString(), Type = ce.Type });
73+
var session = SendText(json);
74+
}, CancellationToken.None);
75+
return true;
76+
}
77+
78+
private class PublishMessageRequest
79+
{
80+
public string RequestType { get; set; }
81+
public string Vpn { get; set; }
82+
public string ClientId { get; set; }
83+
public string Topic { get; set; }
84+
public string Id { get; set; }
85+
public string Subject { get; set; }
86+
public string Content { get; set; }
87+
}
88+
89+
private class DirectSubscribeRequest
90+
{
91+
public string RequestType { get; set; }
92+
public string Vpn { get; set; }
93+
public string ClientId { get; set; }
94+
public string Filter { get; set; }
95+
}
96+
97+
private class CloudEventResult
98+
{
99+
public string Type { get; set; }
100+
public string Data { get; set; }
101+
}
102+
}
103+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using NetCoreServer;
2+
using System.Net;
3+
4+
namespace FaasNet.EventMesh.Protocols.WebSocket
5+
{
6+
public class EventMeshWSServer : WsServer
7+
{
8+
private readonly EventMeshWebSocketOptions _options;
9+
10+
public EventMeshWSServer(EventMeshWebSocketOptions options, IPAddress address, int port) : base(address, port)
11+
{
12+
_options = options;
13+
}
14+
15+
protected override TcpSession CreateSession() { return new EventMeshServerWSSession(_options, this); }
16+
}
17+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
namespace FaasNet.EventMesh.Protocols.WebSocket
2+
{
3+
public class EventMeshWebSocketOptions
4+
{
5+
public EventMeshWebSocketOptions()
6+
{
7+
Port = 2803;
8+
EventMeshUrl = "localhost";
9+
EventMeshPort = 4000;
10+
}
11+
12+
/// <summary>
13+
/// Port of the websocket server.
14+
/// </summary>
15+
public int Port { get; set; }
16+
/// <summary>
17+
/// URL of the eventmesh server.
18+
/// </summary>
19+
public string EventMeshUrl { get; set; }
20+
/// <summary>
21+
/// Port of the eventmesh server.
22+
/// </summary>
23+
public int EventMeshPort { get; set; }
24+
}
25+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net6.0</TargetFramework>
4+
</PropertyGroup>
5+
<ItemGroup>
6+
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
7+
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
8+
<PackageReference Include="NetCoreServer" Version="6.2.0" />
9+
</ItemGroup>
10+
<ItemGroup>
11+
<ProjectReference Include="..\FaasNet.EventMesh.Protocols\FaasNet.EventMesh.Protocols.csproj" />
12+
</ItemGroup>
13+
</Project>

0 commit comments

Comments
 (0)