Skip to content

Commit 4818949

Browse files
Ticket #106 : Can send & receive message
1 parent 4b2c3fb commit 4818949

8 files changed

Lines changed: 131 additions & 7 deletions

File tree

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/AMQPProxy.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Amqp;
22
using Amqp.Sasl;
33
using Amqp.Types;
4+
using FaasNet.EventMesh.Protocols.AMQP.Extensions;
45
using FaasNet.EventMesh.Protocols.AMQP.Framing;
56
using FaasNet.EventMesh.Protocols.AMQP.Handlers;
67
using Microsoft.Extensions.Logging;
@@ -54,15 +55,18 @@ private void AcceptCallback(IAsyncResult ar)
5455
_lock.Set();
5556
var listener = (Socket)ar.AsyncState;
5657
var handler = listener.EndAccept(ar);
57-
var state = new StateObject();
58-
state.WorkSocket = handler;
58+
var state = new StateObject
59+
{
60+
WorkSocket = handler
61+
};
5962
handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), state);
6063
}
6164

6265
private async void ReadCallback(IAsyncResult ar)
6366
{
6467
var state = (StateObject)ar.AsyncState;
6568
var handler = state.WorkSocket;
69+
if(!handler.IsConnected()) { Dispose(state); return; }
6670
int nbBytes = handler.EndReceive(ar);
6771
byte[] buffer = state.Buffer.Take(nbBytes).ToArray();
6872
if (ProtocolHeader.SASLHeaderNegotiation.Serialize().SequenceEqual(buffer))
@@ -84,6 +88,7 @@ private void ReplySASHeaderNegotiation(StateObject stateObject)
8488
{
8589
var result = ProtocolHeader.SASLHeaderNegotiation.Serialize();
8690
var handler = stateObject.WorkSocket;
91+
if (!handler.IsConnected()) { Dispose(stateObject); return; }
8792
handler.Send(result, 0, result.Length, 0);
8893
var saslFrame = new Frame { Channel = 0, Type = FrameTypes.Sasl };
8994
var cmd = new SaslMechanisms
@@ -101,6 +106,7 @@ private void ReplySASLHeaderSecuredConnection(StateObject stateObject)
101106
{
102107
var result = ProtocolHeader.SASLHeaderSecuredConnection.Serialize();
103108
var handler = stateObject.WorkSocket;
109+
if (!handler.IsConnected()) { Dispose(stateObject); return; }
104110
handler.Send(result, 0, result.Length, 0);
105111
var newState = new StateObject { WorkSocket = stateObject.WorkSocket, Session = stateObject.Session };
106112
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
@@ -111,6 +117,7 @@ private async Task ReplyFrame(StateObject stateObject, byte[] buffer)
111117
ushort channel;
112118
DescribedList command;
113119
var handler = stateObject.WorkSocket;
120+
if (!handler.IsConnected()) { Dispose(stateObject); return; }
114121
var frameBuffer = new ByteBuffer(buffer, 0, buffer.Count(), 0);
115122
byte[] payload = null;
116123
Frame.Decode(frameBuffer, out channel, out command);
@@ -137,5 +144,10 @@ private void SendCallback(IAsyncResult ar)
137144
var newState = new StateObject { WorkSocket = state.WorkSocket, Session = state.Session };
138145
newState.WorkSocket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReadCallback), newState);
139146
}
147+
148+
private void Dispose(StateObject stateObject)
149+
{
150+
if (stateObject.Session.EventMeshSubSession != null) stateObject.Session.EventMeshSubSession.Close();
151+
}
140152
}
141153
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using Amqp;
2+
using Amqp.Types;
3+
4+
namespace FaasNet.EventMesh.Protocols.AMQP.Extensions
5+
{
6+
public static class MessageExtensions
7+
{
8+
public static ByteBuffer Serialize(this Message message)
9+
{
10+
const int reservedBytes = 40;
11+
var messageByteBuffer = new ByteBuffer(reservedBytes + message.GetEstimatedMessageSize(), false);
12+
messageByteBuffer.AdjustPosition(messageByteBuffer.Offset + reservedBytes, 0);
13+
EncodeIfNotNull(message.Header, messageByteBuffer);
14+
EncodeIfNotNull(message.DeliveryAnnotations, messageByteBuffer);
15+
EncodeIfNotNull(message.MessageAnnotations, messageByteBuffer);
16+
EncodeIfNotNull(message.Properties, messageByteBuffer);
17+
EncodeIfNotNull(message.ApplicationProperties, messageByteBuffer);
18+
EncodeIfNotNull(message.BodySection, messageByteBuffer);
19+
EncodeIfNotNull(message.Footer, messageByteBuffer);
20+
return messageByteBuffer;
21+
}
22+
23+
private static void EncodeIfNotNull(RestrictedDescribed section, ByteBuffer buffer)
24+
{
25+
if (section != null)
26+
{
27+
section.Encode(buffer);
28+
}
29+
}
30+
}
31+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System.Net.Sockets;
2+
3+
namespace FaasNet.EventMesh.Protocols.AMQP.Extensions
4+
{
5+
public static class SocketExtensions
6+
{
7+
public static bool IsConnected(this Socket socket)
8+
{
9+
try
10+
{
11+
return !(socket.Poll(1, SelectMode.SelectRead) && socket.Available == 0);
12+
}
13+
catch (SocketException) { return false; }
14+
}
15+
}
16+
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Framing/Frame.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,5 +59,16 @@ public ByteBuffer Serialize(DescribedList cmd)
5959
AmqpBitConverter.WriteInt(buffer.Buffer, buffer.Offset, buffer.Length);
6060
return buffer;
6161
}
62+
63+
public ByteBuffer Serialize(DescribedList cmd, ByteBuffer payload)
64+
{
65+
var buffer = Serialize(cmd);
66+
int payloadSize = payload.Length;
67+
int frameSize = buffer.Length + payloadSize;
68+
AmqpBitConverter.WriteInt(buffer.Buffer, buffer.Offset, frameSize);
69+
AmqpBitConverter.WriteBytes(buffer, payload.Buffer, payload.Offset, payloadSize);
70+
payload.Complete(payloadSize);
71+
return buffer;
72+
}
6273
}
6374
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/AttachHandler.cs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Amqp;
22
using Amqp.Framing;
33
using FaasNet.EventMesh.Client;
4+
using FaasNet.EventMesh.Protocols.AMQP.Extensions;
45
using FaasNet.EventMesh.Protocols.AMQP.Framing;
56
using Microsoft.Extensions.Options;
67
using System.Collections.Generic;
@@ -24,19 +25,38 @@ public async Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestPara
2425
{
2526
var attachCmd = parameter.Cmd as Attach;
2627
var target = attachCmd.Target as Target;
28+
var source = attachCmd.Source as Source;
2729
state.Session.Link = attachCmd;
2830
if (target != null && !string.IsNullOrWhiteSpace(target.Address)) state.Session.EventMeshPubSession = await CreatePubSession(state.Session, cancellationToken);
31+
if (source != null && !string.IsNullOrWhiteSpace(source.Address)) state.Session.EventMeshSubSession = await CreateSubSession(parameter, state, cancellationToken);
2932
return new ByteBuffer[]
3033
{
3134
BuildAttachResponse(parameter.Channel, attachCmd),
32-
BuildFrameResponse(parameter.Channel, attachCmd)
35+
BuildFrameResponse(attachCmd, parameter.Channel, attachCmd)
3336
};
3437
}
3538

36-
private async Task CreateSubSession(StateSessionObject session, CancellationToken cancellationToken)
39+
private async Task<SubscriptionResult> CreateSubSession(RequestParameter parameter, StateObject state, CancellationToken cancellationToken)
3740
{
41+
var session = state.Session;
42+
var attachCmd = parameter.Cmd as Attach;
43+
var source = attachCmd.Source as Source;
3844
var evtMeshClient = new EventMeshClient(_options.EventMeshUrl, _options.EventMeshPort);
3945
var subSession = await evtMeshClient.CreateSubSession(_options.EventMeshVpn, session.ClientId, cancellationToken);
46+
var subscriptionResult = subSession.DirectSubscribe(source.Address, (cb) =>
47+
{
48+
var message = new Message(cb.Data.ToString());
49+
var messageByteBuffer = message.Serialize();
50+
var result = new Frame { Channel = parameter.Channel, Type = FrameTypes.Amqp };
51+
var transfer = new Transfer
52+
{
53+
Handle = attachCmd.Handle,
54+
DeliveryId = 2
55+
};
56+
var byteBuffer = result.Serialize(transfer, messageByteBuffer);
57+
state.WorkSocket.Send(byteBuffer.Buffer, byteBuffer.Offset, byteBuffer.Length, 0);
58+
}, cancellationToken);
59+
return subscriptionResult;
4060
}
4161

4262
private async Task<IEventMeshClientPubSession> CreatePubSession(StateSessionObject session, CancellationToken cancellationToken)
@@ -48,14 +68,26 @@ private async Task<IEventMeshClientPubSession> CreatePubSession(StateSessionObje
4868
private static ByteBuffer BuildAttachResponse(ushort channel, Attach requestAttach)
4969
{
5070
var result = new Frame { Channel = channel, Type = FrameTypes.Amqp };
51-
var attach = new Attach { LinkName = requestAttach.LinkName };
71+
var attach = new Attach { LinkName = requestAttach.LinkName, Handle = requestAttach.Handle };
5272
return result.Serialize(attach);
5373
}
5474

55-
private static ByteBuffer BuildFrameResponse(ushort channel, Attach requestAttach)
75+
private static ByteBuffer BuildFrameResponse(Attach attachCmd, ushort channel, Attach requestAttach)
5676
{
77+
var target = attachCmd.Target as Target;
78+
var isReceiverEdp = target != null && !string.IsNullOrWhiteSpace(target.Address);
5779
var result = new Frame { Channel = channel, Type = FrameTypes.Amqp };
58-
var flow = new Flow { LinkCredit = 1, Handle = requestAttach.Handle };
80+
var flow = new Flow { Handle = requestAttach.Handle };
81+
if (isReceiverEdp)
82+
{
83+
flow.LinkCredit = 1;
84+
flow.IncomingWindow = 1;
85+
}
86+
else
87+
{
88+
flow.OutgoingWindow = 1;
89+
}
90+
5991
return result.Serialize(flow);
6092
}
6193
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using Amqp;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
7+
{
8+
/// <summary>
9+
/// Update link state.
10+
/// </summary>
11+
public class FlowHandler : IRequestHandler
12+
{
13+
public string RequestName => "amqp:flow:list";
14+
15+
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
16+
{
17+
return Task.FromResult((IEnumerable<ByteBuffer>)null);
18+
}
19+
}
20+
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public static IServiceCollection AddAMQPProtocol(this IServiceCollection service
1818
services.AddTransient<IRequestHandler, BeginHandler>();
1919
services.AddTransient<IRequestHandler, AttachHandler>();
2020
services.AddTransient<IRequestHandler, TransferHandler>();
21+
services.AddTransient<IRequestHandler, FlowHandler>();
2122
return services;
2223
}
2324
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/StateSessionObject.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public class StateSessionObject
99
public string ClientId { get; set; }
1010
public string Password { get; set; }
1111
public IEventMeshClientPubSession EventMeshPubSession { get; set; }
12+
public SubscriptionResult EventMeshSubSession { get; set; }
1213
public Attach Link { get; set; }
1314
}
1415
}

0 commit comments

Comments
 (0)