Skip to content

Commit 95dee9c

Browse files
Ticket #106 : Publish message
1 parent 2d2c7bf commit 95dee9c

37 files changed

Lines changed: 635 additions & 218 deletions

FaasNet.EventMesh.sln

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
3-
# Visual Studio Version 16
4-
VisualStudioVersion = 16.0.31624.102
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.1.32328.378
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "02. Core Layer", "02. Core Layer", "{6E495E0A-0DC8-4E42-8C58-3C48506D3D24}"
77
EndProject
@@ -49,6 +49,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "02. Protocols", "02. Protoc
4949
EndProject
5050
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Protocols.AMQP", "src\EventMesh\FaasNet.EventMesh.Protocols.AMQP\FaasNet.EventMesh.Protocols.AMQP.csproj", "{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}"
5151
EndProject
52+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Protocols", "src\EventMesh\FaasNet.EventMesh.Protocols\FaasNet.EventMesh.Protocols.csproj", "{E90801D3-9D40-4B3F-AA3F-4015C03262A5}"
53+
EndProject
5254
Global
5355
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5456
Debug|Any CPU = Debug|Any CPU
@@ -119,6 +121,10 @@ Global
119121
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}.Debug|Any CPU.Build.0 = Debug|Any CPU
120122
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}.Release|Any CPU.ActiveCfg = Release|Any CPU
121123
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}.Release|Any CPU.Build.0 = Release|Any CPU
124+
{E90801D3-9D40-4B3F-AA3F-4015C03262A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
125+
{E90801D3-9D40-4B3F-AA3F-4015C03262A5}.Debug|Any CPU.Build.0 = Debug|Any CPU
126+
{E90801D3-9D40-4B3F-AA3F-4015C03262A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
127+
{E90801D3-9D40-4B3F-AA3F-4015C03262A5}.Release|Any CPU.Build.0 = Release|Any CPU
122128
EndGlobalSection
123129
GlobalSection(SolutionProperties) = preSolution
124130
HideSolutionNode = FALSE
@@ -142,6 +148,7 @@ Global
142148
{07C9335C-511F-49A1-AD60-BB0C709971C4} = {B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}
143149
{225E35C4-C3C2-4319-96BF-CBF86DCB5C46} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
144150
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93} = {225E35C4-C3C2-4319-96BF-CBF86DCB5C46}
151+
{E90801D3-9D40-4B3F-AA3F-4015C03262A5} = {225E35C4-C3C2-4319-96BF-CBF86DCB5C46}
145152
EndGlobalSection
146153
GlobalSection(ExtensibilityGlobals) = postSolution
147154
SolutionGuid = {B9BD3B8C-B2C9-468F-BF54-66BFE9B565EC}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
using Amqp;
2+
using Amqp.Sasl;
3+
using Amqp.Types;
4+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
5+
using FaasNet.EventMesh.Protocols.AMQP.Handlers;
6+
using Microsoft.Extensions.Options;
7+
using System;
8+
using System.Collections.Generic;
9+
using System.Linq;
10+
using System.Net;
11+
using System.Net.Sockets;
12+
using System.Threading;
13+
using System.Threading.Tasks;
14+
15+
namespace FaasNet.EventMesh.Protocols.AMQP
16+
{
17+
public class AMQPProxy : BaseProxy
18+
{
19+
private readonly EventMeshAMQPOptions _options;
20+
private readonly IEnumerable<IRequestHandler> _requestHandlers;
21+
private static ManualResetEvent _lock = new ManualResetEvent(false);
22+
23+
public AMQPProxy(IOptions<EventMeshAMQPOptions> options, IEnumerable<IRequestHandler> requestHandlers)
24+
{
25+
_options = options.Value;
26+
_requestHandlers = requestHandlers;
27+
}
28+
29+
protected override void Init()
30+
{
31+
var localEndPoint = new IPEndPoint(IPAddress.Loopback, _options.Port);
32+
var server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
33+
server.Bind(localEndPoint);
34+
server.Listen();
35+
Task.Run(() => Handle(server));
36+
}
37+
38+
private void Handle(Socket server)
39+
{
40+
while (true)
41+
{
42+
TokenSource.Token.ThrowIfCancellationRequested();
43+
_lock.Reset();
44+
server.BeginAccept(new AsyncCallback(AcceptCallback), server);
45+
_lock.WaitOne();
46+
}
47+
}
48+
49+
private void AcceptCallback(IAsyncResult ar)
50+
{
51+
_lock.Set();
52+
var listener = (Socket)ar.AsyncState;
53+
var handler = listener.EndAccept(ar);
54+
var state = new StateObject();
55+
state.WorkSocket = handler;
56+
handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), state);
57+
}
58+
59+
private async void ReadCallback(IAsyncResult ar)
60+
{
61+
var state = (StateObject)ar.AsyncState;
62+
var handler = state.WorkSocket;
63+
int nbBytes = handler.EndReceive(ar);
64+
byte[] buffer = state.Buffer.Take(nbBytes).ToArray();
65+
if (ProtocolHeader.SASLHeaderNegotiation.Serialize().SequenceEqual(buffer))
66+
{
67+
ReplySASHeaderNegotiation(state);
68+
return;
69+
}
70+
71+
if (ProtocolHeader.SASLHeaderSecuredConnection.Serialize().SequenceEqual(buffer))
72+
{
73+
ReplySASLHeaderSecuredConnection(state);
74+
return;
75+
}
76+
77+
await ReplyFrame(state, buffer);
78+
}
79+
80+
private void ReplySASHeaderNegotiation(StateObject stateObject)
81+
{
82+
var result = ProtocolHeader.SASLHeaderNegotiation.Serialize();
83+
var handler = stateObject.WorkSocket;
84+
handler.Send(result, 0, result.Length, 0);
85+
var saslFrame = new Frame { Channel = 0, Type = FrameTypes.Sasl };
86+
var cmd = new SaslMechanisms
87+
{
88+
SaslServerMechanisms = new Symbol[]
89+
{
90+
new Symbol("PLAIN")
91+
}
92+
};
93+
var buffer = saslFrame.Serialize(cmd);
94+
handler.BeginSend(buffer.Buffer, buffer.Offset, buffer.Length, 0, new AsyncCallback(SendCallback), stateObject);
95+
}
96+
97+
private void ReplySASLHeaderSecuredConnection(StateObject stateObject)
98+
{
99+
var result = ProtocolHeader.SASLHeaderSecuredConnection.Serialize();
100+
var handler = stateObject.WorkSocket;
101+
handler.Send(result, 0, result.Length, 0);
102+
var newState = new StateObject { WorkSocket = stateObject.WorkSocket, Session = stateObject.Session };
103+
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
104+
}
105+
106+
private async Task ReplyFrame(StateObject stateObject, byte[] buffer)
107+
{
108+
ushort channel;
109+
DescribedList command;
110+
var handler = stateObject.WorkSocket;
111+
var frameBuffer = new ByteBuffer(buffer, 0, buffer.Count(), 0);
112+
byte[] payload = null;
113+
Frame.Decode(frameBuffer, out channel, out command);
114+
if (frameBuffer.Length > 0) payload = buffer.Skip(frameBuffer.Offset + FixedWidth.UInt).Take(frameBuffer.Length - FixedWidth.UInt).ToArray();
115+
Console.WriteLine($"Receive {command.Descriptor.Name}");
116+
var requestHandler = _requestHandlers.First(r => r.RequestName == command.Descriptor.Name);
117+
var result = await requestHandler.Handle(stateObject, command, payload, channel, TokenSource.Token);
118+
if (result == null)
119+
{
120+
var newState = new StateObject { WorkSocket = stateObject.WorkSocket, Session = stateObject.Session };
121+
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
122+
}
123+
else
124+
{
125+
for (var i = 0; i < result.Count() - 1; i++) handler.Send(result.ElementAt(i).Buffer, result.ElementAt(i).Offset, result.ElementAt(i).Length, 0);
126+
handler.BeginSend(result.Last().Buffer, result.Last().Offset, result.Last().Length, 0, new AsyncCallback(SendCallback), stateObject);
127+
}
128+
}
129+
130+
private void SendCallback(IAsyncResult ar)
131+
{
132+
var state = (StateObject)ar.AsyncState;
133+
var newState = new StateObject { WorkSocket = state.WorkSocket, Session = state.Session };
134+
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
135+
}
136+
}
137+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
namespace FaasNet.EventMesh.Protocols.AMQP
2+
{
3+
public class EventMeshAMQPOptions
4+
{
5+
public EventMeshAMQPOptions()
6+
{
7+
Port = 5672;
8+
EventMeshVpn = Client.Constants.DefaultVpn;
9+
EventMeshUrl = Client.Constants.DefaultUrl;
10+
EventMeshPort = Client.Constants.DefaultPort;
11+
}
12+
13+
public int Port { get; set; }
14+
public string EventMeshVpn { get; set; }
15+
public string EventMeshUrl { get; set; }
16+
public int EventMeshPort { get; set; }
17+
}
18+
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/FaasNet.EventMesh.Protocols.AMQP.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,9 @@
44
</PropertyGroup>
55
<ItemGroup>
66
<PackageReference Include="AMQPNetLite" Version="2.4.4" />
7+
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
8+
</ItemGroup>
9+
<ItemGroup>
10+
<ProjectReference Include="..\FaasNet.EventMesh.Protocols\FaasNet.EventMesh.Protocols.csproj" />
711
</ItemGroup>
812
</Project>

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Framing/Frame.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Amqp.Framing;
33
using Amqp.Types;
44
using System;
5+
using System.Linq;
56

67
namespace FaasNet.EventMesh.Protocols.AMQP.Framing
78
{

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Framing/ProtocolHeader.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@ namespace FaasNet.EventMesh.Protocols.AMQP.Framing
55
{
66
public struct ProtocolHeader
77
{
8+
public static ProtocolHeader SASLHeaderNegotiation = new ProtocolHeader
9+
{
10+
Id = 3,
11+
Major = 1
12+
};
13+
public static ProtocolHeader SASLHeaderSecuredConnection = new ProtocolHeader
14+
{
15+
Id = 0,
16+
Major = 1
17+
};
818
public byte Id;
919

1020
public byte Major;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using Amqp;
2+
using Amqp.Framing;
3+
using Amqp.Types;
4+
using FaasNet.EventMesh.Client;
5+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
6+
using Microsoft.Extensions.Options;
7+
using System.Collections.Generic;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
11+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
12+
{
13+
public class AttachHandler : IRequestHandler
14+
{
15+
private readonly EventMeshAMQPOptions _options;
16+
17+
public AttachHandler(IOptions<EventMeshAMQPOptions> options)
18+
{
19+
_options = options.Value;
20+
}
21+
22+
public string RequestName => "amqp:attach:list";
23+
24+
public async Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken)
25+
{
26+
var attachCmd = cmd as Attach;
27+
var target = attachCmd.Target as Target;
28+
if(target != null && !string.IsNullOrWhiteSpace(target.Address)) state.Session.EventMeshPubSession = await CreatePubSession(state.Session, cancellationToken);
29+
return new ByteBuffer[]
30+
{
31+
BuildAttachResponse(channel, attachCmd),
32+
BuildFrameResponse(channel, attachCmd)
33+
};
34+
}
35+
36+
private async Task CreateSubSession(StateSessionObject session, CancellationToken cancellationToken)
37+
{
38+
var evtMeshClient = new EventMeshClient(_options.EventMeshUrl, _options.EventMeshPort);
39+
var subSession = await evtMeshClient.CreateSubSession(_options.EventMeshVpn, session.ClientId, cancellationToken);
40+
}
41+
42+
private async Task<IEventMeshClientPubSession> CreatePubSession(StateSessionObject session, CancellationToken cancellationToken)
43+
{
44+
var evtMeshClient = new EventMeshClient(_options.EventMeshUrl, _options.EventMeshPort);
45+
return await evtMeshClient.CreatePubSession(_options.EventMeshVpn, session.ClientId, cancellationToken);
46+
}
47+
48+
private static ByteBuffer BuildAttachResponse(ushort channel, Attach requestAttach)
49+
{
50+
var result = new Frame { Channel = channel, Type = FrameTypes.Amqp };
51+
var attach = new Attach { LinkName = requestAttach.LinkName };
52+
return result.Serialize(attach);
53+
}
54+
55+
private static ByteBuffer BuildFrameResponse(ushort channel, Attach requestAttach)
56+
{
57+
var result = new Frame { Channel = channel, Type = FrameTypes.Amqp };
58+
var flow = new Flow { LinkCredit = 1, Handle = requestAttach.Handle };
59+
return result.Serialize(flow);
60+
}
61+
}
62+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using Amqp;
2+
using Amqp.Framing;
3+
using Amqp.Types;
4+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
5+
using System.Collections.Generic;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
10+
{
11+
public class BeginHandler : IRequestHandler
12+
{
13+
public string RequestName => "amqp:begin:list";
14+
15+
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken)
16+
{
17+
var requestBegin = cmd as Begin;
18+
var beginResult = new Frame { Channel = channel, Type = FrameTypes.Amqp };
19+
var begin = new Begin { RemoteChannel = channel };
20+
IEnumerable<ByteBuffer> result = new[]
21+
{
22+
beginResult.Serialize(begin)
23+
};
24+
return Task.FromResult(result);
25+
}
26+
}
27+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Amqp;
2+
using Amqp.Types;
3+
using System.Collections.Generic;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
8+
{
9+
public interface IRequestHandler
10+
{
11+
string RequestName { get; }
12+
Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken);
13+
}
14+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Amqp;
2+
using Amqp.Types;
3+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
4+
using System.Collections.Generic;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
8+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
9+
{
10+
public class OpenHandler : IRequestHandler
11+
{
12+
public string RequestName => "amqp:open:list";
13+
14+
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken)
15+
{
16+
var openResult = new Frame { Channel = channel, Type = FrameTypes.Amqp };
17+
var cmdResult = new Amqp.Framing.Open
18+
{
19+
HostName = "localhost"
20+
};
21+
IEnumerable<ByteBuffer> result = new[]
22+
{
23+
openResult.Serialize(cmdResult)
24+
};
25+
return Task.FromResult(result);
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)