Skip to content

Commit 4b2c3fb

Browse files
Ticket #106 : Support AMQP, can publish message
1 parent 95dee9c commit 4b2c3fb

13 files changed

Lines changed: 95 additions & 46 deletions

src/EventMesh/FaasNet.EventMesh.Client/EventMeshClient.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ internal static void EnsureSuccessStatus(Package packageRequest, Package package
139139
public interface IEventMeshClientPubSession
140140
{
141141
Task Publish(string topicName, object obj, CancellationToken cancellationToken = default(CancellationToken));
142+
Task Publish(string topicName, string str, CancellationToken cancellationToken = default(CancellationToken));
142143
Task Publish(string topicName, CloudEvent cloudEvent, CancellationToken cancellationToken = default(CancellationToken));
143144
}
144145

@@ -177,6 +178,11 @@ public EventMeshClientPubSession(HelloResponse session, IPAddress ipAddr, int po
177178
}
178179

179180
public Task Publish(string topicName, object obj, CancellationToken cancellationToken = default(CancellationToken))
181+
{
182+
return Publish(topicName, JsonSerializer.Serialize(obj), cancellationToken);
183+
}
184+
185+
public Task Publish(string topicName, string str, CancellationToken cancellationToken = default(CancellationToken))
180186
{
181187
var cloudEvt = new CloudEvent
182188
{
@@ -185,7 +191,7 @@ public EventMeshClientPubSession(HelloResponse session, IPAddress ipAddr, int po
185191
Source = new Uri("http://localhost"),
186192
Type = topicName,
187193
DataContentType = "application/json",
188-
Data = JsonSerializer.Serialize(obj),
194+
Data = str,
189195
Time = DateTimeOffset.UtcNow
190196
};
191197
return Publish(topicName, cloudEvt, cancellationToken);

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/AMQPProxy.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Amqp.Types;
44
using FaasNet.EventMesh.Protocols.AMQP.Framing;
55
using FaasNet.EventMesh.Protocols.AMQP.Handlers;
6+
using Microsoft.Extensions.Logging;
67
using Microsoft.Extensions.Options;
78
using System;
89
using System.Collections.Generic;
@@ -19,11 +20,13 @@ public class AMQPProxy : BaseProxy
1920
private readonly EventMeshAMQPOptions _options;
2021
private readonly IEnumerable<IRequestHandler> _requestHandlers;
2122
private static ManualResetEvent _lock = new ManualResetEvent(false);
23+
private readonly ILogger<AMQPProxy> _logger;
2224

23-
public AMQPProxy(IOptions<EventMeshAMQPOptions> options, IEnumerable<IRequestHandler> requestHandlers)
25+
public AMQPProxy(IOptions<EventMeshAMQPOptions> options, IEnumerable<IRequestHandler> requestHandlers, ILogger<AMQPProxy> logger)
2426
{
2527
_options = options.Value;
2628
_requestHandlers = requestHandlers;
29+
_logger = logger;
2730
}
2831

2932
protected override void Init()
@@ -112,9 +115,10 @@ private async Task ReplyFrame(StateObject stateObject, byte[] buffer)
112115
byte[] payload = null;
113116
Frame.Decode(frameBuffer, out channel, out command);
114117
if (frameBuffer.Length > 0) payload = buffer.Skip(frameBuffer.Offset + FixedWidth.UInt).Take(frameBuffer.Length - FixedWidth.UInt).ToArray();
115-
Console.WriteLine($"Receive {command.Descriptor.Name}");
118+
_logger.LogInformation("Receive {command}", command.Descriptor.Name);
116119
var requestHandler = _requestHandlers.First(r => r.RequestName == command.Descriptor.Name);
117-
var result = await requestHandler.Handle(stateObject, command, payload, channel, TokenSource.Token);
120+
var parameter = new RequestParameter(command, payload, channel);
121+
var result = await requestHandler.Handle(stateObject, parameter, TokenSource.Token);
118122
if (result == null)
119123
{
120124
var newState = new StateObject { WorkSocket = stateObject.WorkSocket, Session = stateObject.Session };

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/EventMeshAMQPOptions.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,22 @@ public class EventMeshAMQPOptions
55
public EventMeshAMQPOptions()
66
{
77
Port = 5672;
8+
MaxFrameSize = 1000;
89
EventMeshVpn = Client.Constants.DefaultVpn;
910
EventMeshUrl = Client.Constants.DefaultUrl;
1011
EventMeshPort = Client.Constants.DefaultPort;
1112
}
1213

1314
public int Port { get; set; }
15+
/// <summary>
16+
/// Largest frame size that the sending peer is able to accept on this connection.
17+
/// </summary>
18+
public uint MaxFrameSize { get; set; }
19+
/// <summary>
20+
/// The channel-max value is the highest channel number that can be used on the connection.
21+
/// This value plus one is the maximum number of sessions that can simultaneously active on the connection.
22+
/// </summary>
23+
public ushort MaxChannel { get; set; }
1424
public string EventMeshVpn { get; set; }
1525
public string EventMeshUrl { get; set; }
1626
public int EventMeshPort { get; set; }

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/FaasNet.EventMesh.Protocols.AMQP.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<ItemGroup>
66
<PackageReference Include="AMQPNetLite" Version="2.4.4" />
77
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
8+
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
89
</ItemGroup>
910
<ItemGroup>
1011
<ProjectReference Include="..\FaasNet.EventMesh.Protocols\FaasNet.EventMesh.Protocols.csproj" />

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/AttachHandler.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using Amqp;
22
using Amqp.Framing;
3-
using Amqp.Types;
43
using FaasNet.EventMesh.Client;
54
using FaasNet.EventMesh.Protocols.AMQP.Framing;
65
using Microsoft.Extensions.Options;
@@ -21,15 +20,16 @@ public AttachHandler(IOptions<EventMeshAMQPOptions> options)
2120

2221
public string RequestName => "amqp:attach:list";
2322

24-
public async Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken)
23+
public async Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
2524
{
26-
var attachCmd = cmd as Attach;
25+
var attachCmd = parameter.Cmd as Attach;
2726
var target = attachCmd.Target as Target;
28-
if(target != null && !string.IsNullOrWhiteSpace(target.Address)) state.Session.EventMeshPubSession = await CreatePubSession(state.Session, cancellationToken);
27+
state.Session.Link = attachCmd;
28+
if (target != null && !string.IsNullOrWhiteSpace(target.Address)) state.Session.EventMeshPubSession = await CreatePubSession(state.Session, cancellationToken);
2929
return new ByteBuffer[]
3030
{
31-
BuildAttachResponse(channel, attachCmd),
32-
BuildFrameResponse(channel, attachCmd)
31+
BuildAttachResponse(parameter.Channel, attachCmd),
32+
BuildFrameResponse(parameter.Channel, attachCmd)
3333
};
3434
}
3535

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/BeginHandler.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using Amqp;
22
using Amqp.Framing;
3-
using Amqp.Types;
43
using FaasNet.EventMesh.Protocols.AMQP.Framing;
54
using System.Collections.Generic;
65
using System.Threading;
@@ -12,11 +11,10 @@ public class BeginHandler : IRequestHandler
1211
{
1312
public string RequestName => "amqp:begin:list";
1413

15-
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken)
14+
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
1615
{
17-
var requestBegin = cmd as Begin;
18-
var beginResult = new Frame { Channel = channel, Type = FrameTypes.Amqp };
19-
var begin = new Begin { RemoteChannel = channel };
16+
var beginResult = new Frame { Channel = parameter.Channel, Type = FrameTypes.Amqp };
17+
var begin = new Begin { RemoteChannel = parameter.Channel };
2018
IEnumerable<ByteBuffer> result = new[]
2119
{
2220
beginResult.Serialize(begin)
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Amqp;
2-
using Amqp.Types;
32
using System.Collections.Generic;
43
using System.Threading;
54
using System.Threading.Tasks;
@@ -9,6 +8,6 @@ namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
98
public interface IRequestHandler
109
{
1110
string RequestName { get; }
12-
Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken);
11+
Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken);
1312
}
1413
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/OpenHandler.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using Amqp;
2-
using Amqp.Types;
32
using FaasNet.EventMesh.Protocols.AMQP.Framing;
3+
using Microsoft.Extensions.Options;
44
using System.Collections.Generic;
55
using System.Threading;
66
using System.Threading.Tasks;
@@ -9,14 +9,23 @@ namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
99
{
1010
public class OpenHandler : IRequestHandler
1111
{
12+
private readonly EventMeshAMQPOptions _options;
13+
14+
public OpenHandler(IOptions<EventMeshAMQPOptions> options)
15+
{
16+
_options = options.Value;
17+
}
18+
1219
public string RequestName => "amqp:open:list";
1320

14-
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken)
21+
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
1522
{
16-
var openResult = new Frame { Channel = channel, Type = FrameTypes.Amqp };
23+
var openResult = new Frame { Channel = parameter.Channel, Type = FrameTypes.Amqp };
1724
var cmdResult = new Amqp.Framing.Open
1825
{
19-
HostName = "localhost"
26+
HostName = "localhost",
27+
MaxFrameSize = _options.MaxFrameSize,
28+
ChannelMax = _options.MaxChannel
2029
};
2130
IEnumerable<ByteBuffer> result = new[]
2231
{
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using Amqp.Types;
2+
3+
namespace FaasNet.EventMesh.Protocols.AMQP.Handlers
4+
{
5+
public class RequestParameter
6+
{
7+
public RequestParameter(DescribedList cmd, byte[] payload, ushort channel)
8+
{
9+
Cmd = cmd;
10+
Payload = payload;
11+
Channel = channel;
12+
}
13+
14+
public DescribedList Cmd { get; set; }
15+
public byte[] Payload { get; set; }
16+
public ushort Channel { get; set; }
17+
}
18+
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Handlers/SASLInitHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ public class SASLInitHandler : IRequestHandler
1414
{
1515
public string RequestName => "amqp:sasl-init:list";
1616

17-
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, DescribedList cmd, byte[] payload, ushort channel, CancellationToken cancellationToken)
17+
public Task<IEnumerable<ByteBuffer>> Handle(StateObject state, RequestParameter parameter, CancellationToken cancellationToken)
1818
{
19-
var saslInit = cmd as SaslInit;
19+
var saslInit = parameter.Cmd as SaslInit;
2020
if (saslInit.Mechanism.ToString() != "PLAIN") throw new InvalidOperationException("Only PLAIN authentication mechanisme is supported");
2121
var saslInitResult = new Frame { Channel = 0, Type = FrameTypes.Sasl };
2222
var salsOutcome = new SaslOutcome { Code = SaslCode.Auth };

0 commit comments

Comments
 (0)