Skip to content

Commit 1879c24

Browse files
Ticket #106 : Support amqp
1 parent 4818949 commit 1879c24

22 files changed

Lines changed: 325 additions & 117 deletions

File tree

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/AMQPProxy.cs

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using Amqp;
22
using Amqp.Sasl;
33
using Amqp.Types;
4-
using FaasNet.EventMesh.Protocols.AMQP.Extensions;
54
using FaasNet.EventMesh.Protocols.AMQP.Framing;
65
using FaasNet.EventMesh.Protocols.AMQP.Handlers;
76
using Microsoft.Extensions.Logging;
@@ -53,42 +52,55 @@ private void Handle(Socket server)
5352
private void AcceptCallback(IAsyncResult ar)
5453
{
5554
_lock.Set();
56-
var listener = (Socket)ar.AsyncState;
57-
var handler = listener.EndAccept(ar);
58-
var state = new StateObject
55+
try
5956
{
60-
WorkSocket = handler
61-
};
62-
handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), state);
57+
var listener = (Socket)ar.AsyncState;
58+
var handler = listener.EndAccept(ar);
59+
var state = new StateObject
60+
{
61+
WorkSocket = handler
62+
};
63+
handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), state);
64+
}
65+
catch(Exception ex)
66+
{
67+
_logger.LogError(ex.ToString());
68+
}
6369
}
6470

6571
private async void ReadCallback(IAsyncResult ar)
6672
{
6773
var state = (StateObject)ar.AsyncState;
6874
var handler = state.WorkSocket;
69-
if(!handler.IsConnected()) { Dispose(state); return; }
70-
int nbBytes = handler.EndReceive(ar);
71-
byte[] buffer = state.Buffer.Take(nbBytes).ToArray();
72-
if (ProtocolHeader.SASLHeaderNegotiation.Serialize().SequenceEqual(buffer))
75+
try
7376
{
74-
ReplySASHeaderNegotiation(state);
75-
return;
76-
}
77+
int nbBytes = handler.EndReceive(ar);
78+
byte[] buffer = state.Buffer.Take(nbBytes).ToArray();
79+
if (ProtocolHeader.SASLHeaderNegotiation.Serialize().SequenceEqual(buffer))
80+
{
81+
ReplySASHeaderNegotiation(state);
82+
return;
83+
}
7784

78-
if (ProtocolHeader.SASLHeaderSecuredConnection.Serialize().SequenceEqual(buffer))
85+
if (ProtocolHeader.SASLHeaderSecuredConnection.Serialize().SequenceEqual(buffer))
86+
{
87+
ReplySASLHeaderSecuredConnection(state);
88+
return;
89+
}
90+
91+
await ReplyFrame(state, buffer);
92+
}
93+
catch (Exception ex)
7994
{
80-
ReplySASLHeaderSecuredConnection(state);
81-
return;
95+
state.End();
96+
_logger.LogError(ex.ToString());
8297
}
83-
84-
await ReplyFrame(state, buffer);
8598
}
8699

87100
private void ReplySASHeaderNegotiation(StateObject stateObject)
88101
{
89102
var result = ProtocolHeader.SASLHeaderNegotiation.Serialize();
90103
var handler = stateObject.WorkSocket;
91-
if (!handler.IsConnected()) { Dispose(stateObject); return; }
92104
handler.Send(result, 0, result.Length, 0);
93105
var saslFrame = new Frame { Channel = 0, Type = FrameTypes.Sasl };
94106
var cmd = new SaslMechanisms
@@ -106,7 +118,6 @@ private void ReplySASLHeaderSecuredConnection(StateObject stateObject)
106118
{
107119
var result = ProtocolHeader.SASLHeaderSecuredConnection.Serialize();
108120
var handler = stateObject.WorkSocket;
109-
if (!handler.IsConnected()) { Dispose(stateObject); return; }
110121
handler.Send(result, 0, result.Length, 0);
111122
var newState = new StateObject { WorkSocket = stateObject.WorkSocket, Session = stateObject.Session };
112123
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
@@ -117,7 +128,6 @@ private async Task ReplyFrame(StateObject stateObject, byte[] buffer)
117128
ushort channel;
118129
DescribedList command;
119130
var handler = stateObject.WorkSocket;
120-
if (!handler.IsConnected()) { Dispose(stateObject); return; }
121131
var frameBuffer = new ByteBuffer(buffer, 0, buffer.Count(), 0);
122132
byte[] payload = null;
123133
Frame.Decode(frameBuffer, out channel, out command);
@@ -126,15 +136,28 @@ private async Task ReplyFrame(StateObject stateObject, byte[] buffer)
126136
var requestHandler = _requestHandlers.First(r => r.RequestName == command.Descriptor.Name);
127137
var parameter = new RequestParameter(command, payload, channel);
128138
var result = await requestHandler.Handle(stateObject, parameter, TokenSource.Token);
129-
if (result == null)
139+
if (result.Status == RequestResultStatus.EXIT_SESSION)
140+
{ if(result.Content != null && result.Content.Any()) handler.Send(result.Content.First().Buffer, result.Content.First().Offset, result.Content.First().Length, 0);
141+
var newState = new StateObject { WorkSocket = stateObject.WorkSocket, Session = stateObject.Session };
142+
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
143+
return;
144+
}
145+
146+
if (result.Status == RequestResultStatus.EXIT_CONNECTION)
147+
{
148+
if (result.Content != null && result.Content.Any()) handler.Send(result.Content.First().Buffer, result.Content.First().Offset, result.Content.First().Length, 0);
149+
return;
150+
}
151+
152+
if (result.Content == null || !result.Content.Any())
130153
{
131154
var newState = new StateObject { WorkSocket = stateObject.WorkSocket, Session = stateObject.Session };
132155
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
133156
}
134157
else
135158
{
136-
for (var i = 0; i < result.Count() - 1; i++) handler.Send(result.ElementAt(i).Buffer, result.ElementAt(i).Offset, result.ElementAt(i).Length, 0);
137-
handler.BeginSend(result.Last().Buffer, result.Last().Offset, result.Last().Length, 0, new AsyncCallback(SendCallback), stateObject);
159+
for (var i = 0; i < result.Content.Count() - 1; i++) handler.Send(result.Content.ElementAt(i).Buffer, result.Content.ElementAt(i).Offset, result.Content.ElementAt(i).Length, 0);
160+
handler.BeginSend(result.Content.Last().Buffer, result.Content.Last().Offset, result.Content.Last().Length, 0, new AsyncCallback(SendCallback), stateObject);
138161
}
139162
}
140163

@@ -144,10 +167,5 @@ private void SendCallback(IAsyncResult ar)
144167
var newState = new StateObject { WorkSocket = state.WorkSocket, Session = state.Session };
145168
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
146169
}
147-
148-
private void Dispose(StateObject stateObject)
149-
{
150-
if (stateObject.Session.EventMeshSubSession != null) stateObject.Session.EventMeshSubSession.Close();
151-
}
152170
}
153171
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/EventMeshAMQPOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ public EventMeshAMQPOptions()
66
{
77
Port = 5672;
88
MaxFrameSize = 1000;
9+
MaxChannel = 1000;
10+
SessionLinkCredit = 255;
911
EventMeshVpn = Client.Constants.DefaultVpn;
1012
EventMeshUrl = Client.Constants.DefaultUrl;
1113
EventMeshPort = Client.Constants.DefaultPort;
@@ -21,6 +23,10 @@ public EventMeshAMQPOptions()
2123
/// This value plus one is the maximum number of sessions that can simultaneously active on the connection.
2224
/// </summary>
2325
public ushort MaxChannel { get; set; }
26+
/// <summary>
27+
/// Current maximum number of messages that can be handled at the receiver endpoint of the link.
28+
/// </summary>
29+
public uint SessionLinkCredit { get; set; }
2430
public string EventMeshVpn { get; set; }
2531
public string EventMeshUrl { get; set; }
2632
public int EventMeshPort { get; set; }

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Extensions/SocketExtensions.cs

Lines changed: 0 additions & 16 deletions
This file was deleted.

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/AttachHandler.cs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using FaasNet.EventMesh.Protocols.AMQP.Extensions;
55
using FaasNet.EventMesh.Protocols.AMQP.Framing;
66
using Microsoft.Extensions.Options;
7-
using System.Collections.Generic;
87
using System.Threading;
98
using System.Threading.Tasks;
109

@@ -21,19 +20,19 @@ public AttachHandler(IOptions<EventMeshAMQPOptions> options)
2120

2221
public string RequestName => "amqp:attach:list";
2322

24-
public async Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
23+
public async Task<RequestResult> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
2524
{
2625
var attachCmd = parameter.Cmd as Attach;
2726
var target = attachCmd.Target as Target;
2827
var source = attachCmd.Source as Source;
2928
state.Session.Link = attachCmd;
3029
if (target != null && !string.IsNullOrWhiteSpace(target.Address)) state.Session.EventMeshPubSession = await CreatePubSession(state.Session, cancellationToken);
3130
if (source != null && !string.IsNullOrWhiteSpace(source.Address)) state.Session.EventMeshSubSession = await CreateSubSession(parameter, state, cancellationToken);
32-
return new ByteBuffer[]
31+
return RequestResult.Ok(new ByteBuffer[]
3332
{
3433
BuildAttachResponse(parameter.Channel, attachCmd),
3534
BuildFrameResponse(attachCmd, parameter.Channel, attachCmd)
36-
};
35+
});
3736
}
3837

3938
private async Task<SubscriptionResult> CreateSubSession(RequestParameter parameter, StateObject state, CancellationToken cancellationToken)
@@ -51,10 +50,11 @@ private async Task<SubscriptionResult> CreateSubSession(RequestParameter paramet
5150
var transfer = new Transfer
5251
{
5352
Handle = attachCmd.Handle,
54-
DeliveryId = 2
53+
DeliveryId = state.Session.DeliveryId
5554
};
5655
var byteBuffer = result.Serialize(transfer, messageByteBuffer);
5756
state.WorkSocket.Send(byteBuffer.Buffer, byteBuffer.Offset, byteBuffer.Length, 0);
57+
state.Session.DeliveryId = state.Session.DeliveryId + 1;
5858
}, cancellationToken);
5959
return subscriptionResult;
6060
}
@@ -68,24 +68,26 @@ private async Task<IEventMeshClientPubSession> CreatePubSession(StateSessionObje
6868
private static ByteBuffer BuildAttachResponse(ushort channel, Attach requestAttach)
6969
{
7070
var result = new Frame { Channel = channel, Type = FrameTypes.Amqp };
71-
var attach = new Attach { LinkName = requestAttach.LinkName, Handle = requestAttach.Handle };
71+
var attach = new Attach
72+
{
73+
LinkName = requestAttach.LinkName,
74+
Handle = requestAttach.Handle,
75+
Role = !requestAttach.Role,
76+
Source = requestAttach.Source,
77+
Target = requestAttach.Target
78+
};
7279
return result.Serialize(attach);
7380
}
7481

75-
private static ByteBuffer BuildFrameResponse(Attach attachCmd, ushort channel, Attach requestAttach)
82+
private ByteBuffer BuildFrameResponse(Attach attachCmd, ushort channel, Attach requestAttach)
7683
{
7784
var target = attachCmd.Target as Target;
7885
var isReceiverEdp = target != null && !string.IsNullOrWhiteSpace(target.Address);
7986
var result = new Frame { Channel = channel, Type = FrameTypes.Amqp };
8087
var flow = new Flow { Handle = requestAttach.Handle };
8188
if (isReceiverEdp)
8289
{
83-
flow.LinkCredit = 1;
84-
flow.IncomingWindow = 1;
85-
}
86-
else
87-
{
88-
flow.OutgoingWindow = 1;
90+
flow.LinkCredit = _options.SessionLinkCredit;
8991
}
9092

9193
return result.Serialize(flow);
Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
using Amqp;
2-
using Amqp.Framing;
1+
using Amqp.Framing;
32
using FaasNet.EventMesh.Protocols.AMQP.Framing;
4-
using System.Collections.Generic;
53
using System.Threading;
64
using System.Threading.Tasks;
75

@@ -11,15 +9,11 @@ public class BeginHandler : IRequestHandler
119
{
1210
public string RequestName => "amqp:begin:list";
1311

14-
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
12+
public Task<RequestResult> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
1513
{
1614
var beginResult = new Frame { Channel = parameter.Channel, Type = FrameTypes.Amqp };
1715
var begin = new Begin { RemoteChannel = parameter.Channel };
18-
IEnumerable<ByteBuffer> result = new[]
19-
{
20-
beginResult.Serialize(begin)
21-
};
22-
return Task.FromResult(result);
16+
return Task.FromResult(RequestResult.Ok(beginResult.Serialize(begin)));
2317
}
2418
}
2519
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using Amqp.Framing;
2+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
7+
{
8+
/// <summary>
9+
/// Sender will not be sending any more frames on the connection.
10+
/// </summary>
11+
public class CloseHandler : IRequestHandler
12+
{
13+
public string RequestName => "amqp:close:list";
14+
15+
public Task<RequestResult> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
16+
{
17+
var result = new Frame { Channel = parameter.Channel, Type = FrameTypes.Amqp };
18+
var close = new Close { };
19+
state.End();
20+
return Task.FromResult(RequestResult.ExitConnection(result.Serialize(close)));
21+
}
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using Amqp.Framing;
2+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
7+
{
8+
/// <summary>
9+
/// Indicates that the session has ended.
10+
/// </summary>
11+
public class EndHandler : IRequestHandler
12+
{
13+
public string RequestName => "amqp:end:list";
14+
15+
public Task<RequestResult> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
16+
{
17+
var result = new Frame { Channel = parameter.Channel, Type = FrameTypes.Amqp };
18+
var close = new End { };
19+
state.End();
20+
return Task.FromResult(RequestResult.ExitSession(result.Serialize(close)));
21+
}
22+
}
23+
}
Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
using Amqp;
2-
using System.Collections.Generic;
3-
using System.Threading;
1+
using System.Threading;
42
using System.Threading.Tasks;
53

64
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
@@ -12,9 +10,9 @@ public class FlowHandler : IRequestHandler
1210
{
1311
public string RequestName => "amqp:flow:list";
1412

15-
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
13+
public Task<RequestResult> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
1614
{
17-
return Task.FromResult((IEnumerable<ByteBuffer>)null);
15+
return Task.FromResult(RequestResult.Ok());
1816
}
1917
}
2018
}
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
using Amqp;
2-
using System.Collections.Generic;
3-
using System.Threading;
1+
using System.Threading;
42
using System.Threading.Tasks;
53

64
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
75
{
86
public interface IRequestHandler
97
{
108
string RequestName { get; }
11-
Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken);
9+
Task<RequestResult> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken);
1210
}
1311
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/OpenHandler.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public OpenHandler(IOptions<EventMeshAMQPOptions> options)
1818

1919
public string RequestName => "amqp:open:list";
2020

21-
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
21+
public Task<RequestResult> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
2222
{
2323
var openResult = new Frame { Channel = parameter.Channel, Type = FrameTypes.Amqp };
2424
var cmdResult = new Amqp.Framing.Open
@@ -27,11 +27,7 @@ public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter
2727
MaxFrameSize = _options.MaxFrameSize,
2828
ChannelMax = _options.MaxChannel
2929
};
30-
IEnumerable<ByteBuffer> result = new[]
31-
{
32-
openResult.Serialize(cmdResult)
33-
};
34-
return Task.FromResult(result);
30+
return Task.FromResult(RequestResult.Ok(openResult.Serialize(cmdResult)));
3531
}
3632
}
3733
}

0 commit comments

Comments
 (0)