Skip to content

Commit a0219be

Browse files
Ticket #106 : Start to support AMQP
1 parent 6fa04d2 commit a0219be

28 files changed

Lines changed: 384 additions & 416 deletions

FaasNet.EventMesh.sln

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
3-
# Visual Studio Version 17
4-
VisualStudioVersion = 17.1.32328.378
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.31624.102
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "02. Core Layer", "02. Core Layer", "{6E495E0A-0DC8-4E42-8C58-3C48506D3D24}"
77
EndProject
@@ -13,8 +13,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Runtime.T
1313
EndProject
1414
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Runtime", "src\EventMesh\FaasNet.EventMesh.Runtime\FaasNet.EventMesh.Runtime.csproj", "{01F03240-2025-4F1B-B1F6-76A8FD694784}"
1515
EndProject
16-
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Runtime.Kafka", "src\EventMesh\FaasNet.EventMesh.Runtime.Kafka\FaasNet.EventMesh.Runtime.Kafka.csproj", "{E03361DA-DFB2-4DC0-A6D6-0F3306891EB4}"
17-
EndProject
1816
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Runtime.Website", "src\EventMesh\FaasNet.EventMesh.Runtime.Website\FaasNet.EventMesh.Runtime.Website.csproj", "{0E50F0C2-5E8B-4F4F-90B5-CC9EE62805B1}"
1917
EndProject
2018
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Client", "src\EventMesh\FaasNet.EventMesh.Client\FaasNet.EventMesh.Client.csproj", "{3A26857C-8F60-4BC2-8B9C-56BEE26B59BF}"
@@ -45,6 +43,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "01. Seeds", "01. Seeds", "{
4543
EndProject
4644
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.Common", "src\Common\FaasNet.Common\FaasNet.Common.csproj", "{2D703121-6391-42AB-8D68-8AD85EB5BB4B}"
4745
EndProject
46+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Seed.Kafka", "src\EventMesh\FaasNet.EventMesh.Seed.Kafka\FaasNet.EventMesh.Seed.Kafka.csproj", "{07C9335C-511F-49A1-AD60-BB0C709971C4}"
47+
EndProject
48+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "02. Protocols", "02. Protocols", "{225E35C4-C3C2-4319-96BF-CBF86DCB5C46}"
49+
EndProject
50+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Protocols.AMQP", "src\EventMesh\FaasNet.EventMesh.Protocols.AMQP\FaasNet.EventMesh.Protocols.AMQP.csproj", "{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}"
51+
EndProject
4852
Global
4953
GlobalSection(SolutionConfigurationPlatforms) = preSolution
5054
Debug|Any CPU = Debug|Any CPU
@@ -59,10 +63,6 @@ Global
5963
{01F03240-2025-4F1B-B1F6-76A8FD694784}.Debug|Any CPU.Build.0 = Debug|Any CPU
6064
{01F03240-2025-4F1B-B1F6-76A8FD694784}.Release|Any CPU.ActiveCfg = Release|Any CPU
6165
{01F03240-2025-4F1B-B1F6-76A8FD694784}.Release|Any CPU.Build.0 = Release|Any CPU
62-
{E03361DA-DFB2-4DC0-A6D6-0F3306891EB4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
63-
{E03361DA-DFB2-4DC0-A6D6-0F3306891EB4}.Debug|Any CPU.Build.0 = Debug|Any CPU
64-
{E03361DA-DFB2-4DC0-A6D6-0F3306891EB4}.Release|Any CPU.ActiveCfg = Release|Any CPU
65-
{E03361DA-DFB2-4DC0-A6D6-0F3306891EB4}.Release|Any CPU.Build.0 = Release|Any CPU
6666
{0E50F0C2-5E8B-4F4F-90B5-CC9EE62805B1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
6767
{0E50F0C2-5E8B-4F4F-90B5-CC9EE62805B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
6868
{0E50F0C2-5E8B-4F4F-90B5-CC9EE62805B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -111,14 +111,21 @@ Global
111111
{2D703121-6391-42AB-8D68-8AD85EB5BB4B}.Debug|Any CPU.Build.0 = Debug|Any CPU
112112
{2D703121-6391-42AB-8D68-8AD85EB5BB4B}.Release|Any CPU.ActiveCfg = Release|Any CPU
113113
{2D703121-6391-42AB-8D68-8AD85EB5BB4B}.Release|Any CPU.Build.0 = Release|Any CPU
114+
{07C9335C-511F-49A1-AD60-BB0C709971C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
115+
{07C9335C-511F-49A1-AD60-BB0C709971C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
116+
{07C9335C-511F-49A1-AD60-BB0C709971C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
117+
{07C9335C-511F-49A1-AD60-BB0C709971C4}.Release|Any CPU.Build.0 = Release|Any CPU
118+
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
119+
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}.Debug|Any CPU.Build.0 = Debug|Any CPU
120+
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}.Release|Any CPU.ActiveCfg = Release|Any CPU
121+
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93}.Release|Any CPU.Build.0 = Release|Any CPU
114122
EndGlobalSection
115123
GlobalSection(SolutionProperties) = preSolution
116124
HideSolutionNode = FALSE
117125
EndGlobalSection
118126
GlobalSection(NestedProjects) = preSolution
119127
{AA3F0C66-7575-47D9-B45E-6B38F78E632D} = {2A14F697-FFC8-402F-82B4-8128FE897DC9}
120128
{01F03240-2025-4F1B-B1F6-76A8FD694784} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
121-
{E03361DA-DFB2-4DC0-A6D6-0F3306891EB4} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
122129
{0E50F0C2-5E8B-4F4F-90B5-CC9EE62805B1} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}
123130
{3A26857C-8F60-4BC2-8B9C-56BEE26B59BF} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
124131
{33040232-E628-448D-B85F-BC68D757D24B} = {3A7C8C56-A93C-4722-9397-EF82EB5E4655}
@@ -132,6 +139,9 @@ Global
132139
{2DDF1EB1-B20F-419F-9A09-34A5550D4467} = {B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}
133140
{B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
134141
{2D703121-6391-42AB-8D68-8AD85EB5BB4B} = {7A13262A-D1E6-4210-BA95-03A0741CBFA4}
142+
{07C9335C-511F-49A1-AD60-BB0C709971C4} = {B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}
143+
{225E35C4-C3C2-4319-96BF-CBF86DCB5C46} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
144+
{3442A638-E7C0-45CC-8A7D-F2A40D65AB93} = {225E35C4-C3C2-4319-96BF-CBF86DCB5C46}
135145
EndGlobalSection
136146
GlobalSection(ExtensibilityGlobals) = postSolution
137147
SolutionGuid = {B9BD3B8C-B2C9-468F-BF54-66BFE9B565EC}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFramework>net6.0</TargetFramework>
4+
</PropertyGroup>
5+
<ItemGroup>
6+
<PackageReference Include="AMQPNetLite" Version="2.4.4" />
7+
</ItemGroup>
8+
</Project>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace FaasNet.EventMesh.Protocols.AMQP
2+
{
3+
public static class FixedWidth
4+
{
5+
public const int ULong = 8;
6+
public const int UInt = 4;
7+
}
8+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
using Amqp;
2+
using Amqp.Framing;
3+
using System;
4+
5+
namespace FaasNet.EventMesh.Protocols.AMQP.Framing
6+
{
7+
public enum FrameTypes : byte
8+
{
9+
/// <summary>
10+
/// AMQP Frame.
11+
/// </summary>
12+
Amqp = 0,
13+
/// <summary>
14+
/// SASL Frame.
15+
/// </summary>
16+
Sasl = 1
17+
}
18+
19+
public class Frame
20+
{
21+
public const int CmdBufferSize = 128;
22+
23+
/// <summary>
24+
/// Byte 4 of the frame header is the data offset.
25+
/// This gives the position of the body within the frame.
26+
/// </summary>
27+
private const byte DOFF = 2;
28+
29+
public FrameTypes Type { get; set; }
30+
public ushort Channel { get; set; }
31+
32+
public static void Decode(ByteBuffer buffer, out ushort channel)
33+
{
34+
AmqpBitConverter.ReadUInt(buffer);
35+
AmqpBitConverter.ReadUByte(buffer);
36+
AmqpBitConverter.ReadUByte(buffer);
37+
channel = AmqpBitConverter.ReadUShort(buffer);
38+
}
39+
40+
public byte[] Serialize()
41+
{
42+
var buffer = new ByteBuffer(CmdBufferSize, false);
43+
buffer.Append(FixedWidth.UInt);
44+
AmqpBitConverter.WriteUByte(buffer, DOFF);
45+
AmqpBitConverter.WriteUByte(buffer, (byte)Type);
46+
switch(Type)
47+
{
48+
case FrameTypes.Amqp:
49+
AmqpBitConverter.WriteUShort(buffer, Channel);
50+
break;
51+
}
52+
53+
var accepted = new Accepted();
54+
accepted.Encode(buffer);
55+
AmqpBitConverter.WriteInt(buffer.Buffer, buffer.Offset, buffer.Length);
56+
return buffer.Buffer;
57+
}
58+
}
59+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using System;
2+
using System.Text;
3+
4+
namespace FaasNet.EventMesh.Protocols.AMQP.Framing
5+
{
6+
public struct ProtocolHeader
7+
{
8+
public byte Id;
9+
10+
public byte Major;
11+
12+
public byte Minor;
13+
14+
public byte Revision;
15+
16+
public static ProtocolHeader Create(byte[] buffer, int offset)
17+
{
18+
if (buffer[offset + 0] != (byte)'A' ||
19+
buffer[offset + 1] != (byte)'M' ||
20+
buffer[offset + 2] != (byte)'Q' ||
21+
buffer[offset + 3] != (byte)'P')
22+
{
23+
throw new InvalidOperationException("ProtocolName Expect:AMQP Actual:" + new string(Encoding.UTF8.GetChars(buffer, offset, 4)));
24+
}
25+
26+
return new ProtocolHeader()
27+
{
28+
Id = buffer[offset + 4],
29+
Major = buffer[offset + 5],
30+
Minor = buffer[offset + 6],
31+
Revision = buffer[offset + 7]
32+
};
33+
}
34+
35+
public byte[] Serialize()
36+
{
37+
return new byte[FixedWidth.ULong]
38+
{
39+
(byte)'A',
40+
(byte)'M',
41+
(byte)'Q',
42+
(byte)'P',
43+
Id,
44+
Major,
45+
Minor,
46+
Revision
47+
};
48+
}
49+
}
50+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
2+
using System.Net;
3+
using System.Net.Sockets;
4+
5+
namespace FaasNet.EventMesh.Protocols.AMQP
6+
{
7+
public class AMQPServer
8+
{
9+
private Socket _server;
10+
11+
public AMQPServer()
12+
{
13+
14+
}
15+
16+
public void Start()
17+
{
18+
Launch();
19+
Listen();
20+
}
21+
22+
private void Launch()
23+
{
24+
IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Loopback, 5672);
25+
_server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
26+
_server.Bind(localEndPoint);
27+
_server.Listen();
28+
}
29+
30+
private void Listen()
31+
{
32+
var handler = _server.Accept();
33+
// Header.
34+
var header = ReadHeader(handler);
35+
handler.Send(header.Serialize());
36+
// Frames
37+
var frame = new Frame { Channel = 0, Type = FrameTypes.Amqp };
38+
handler.Send(frame.Serialize());
39+
}
40+
41+
private ProtocolHeader ReadHeader(Socket socket)
42+
{
43+
var headerBuffer = new byte[FixedWidth.ULong];
44+
socket.Receive(headerBuffer, 0, FixedWidth.ULong, SocketFlags.None);
45+
var receivedHeader = ProtocolHeader.Create(headerBuffer, 0);
46+
return receivedHeader;
47+
}
48+
}
49+
}

src/EventMesh/FaasNet.EventMesh.Runtime.Console/FaasNet.EventMesh.Runtime.Console.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
1111
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
1212
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
13+
<PackageReference Include="RabbitMQ.Client" Version="6.2.2" />
1314
</ItemGroup>
1415

1516
<ItemGroup>
1617
<ProjectReference Include="..\..\RaftConsensus\FaasNet.RaftConsensus.RocksDB\FaasNet.RaftConsensus.RocksDB.csproj" />
18+
<ProjectReference Include="..\FaasNet.EventMesh.Protocols.AMQP\FaasNet.EventMesh.Protocols.AMQP.csproj" />
1719
<ProjectReference Include="..\FaasNet.EventMesh.Runtime\FaasNet.EventMesh.Runtime.csproj" />
1820
</ItemGroup>
1921
</Project>

src/EventMesh/FaasNet.EventMesh.Runtime.Console/Program.cs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,43 @@
1-
using FaasNet.Common;
1+
using Amqp;
2+
using FaasNet.Common;
23
using FaasNet.EventMesh.Client;
34
using FaasNet.EventMesh.Client.Messages;
5+
using FaasNet.EventMesh.Protocols.AMQP;
6+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
47
using FaasNet.RaftConsensus.Client;
58
using FaasNet.RaftConsensus.Core;
69
using FaasNet.RaftConsensus.Core.Models;
710
using FaasNet.RaftConsensus.Core.Stores;
811
using Microsoft.Extensions.DependencyInjection;
12+
using RabbitMQ.Client;
913

14+
var frame = new Frame { Channel = 4, Type = 0 };
15+
var payload = frame.Serialize();
16+
var buffer = new ByteBuffer(payload, 0, payload.Count(), 0);
17+
Frame.Decode(buffer, out ushort channel);
18+
19+
var amqpServer = new AMQPServer();
20+
amqpServer.Start();
21+
Console.WriteLine("Press enter");
22+
Console.ReadLine();
23+
24+
// Address address = new Address("amqp://guest:guest@localhost:5672");
25+
// Connection connection = new Connection(address);
26+
// Session session = new Session(connection);
27+
/*
28+
var factory = new ConnectionFactory() { HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" };
29+
using (var connection = factory.CreateConnection())
30+
using (var channel = connection.CreateModel())
31+
{
32+
channel.QueueDeclare(queue: "hello",
33+
durable: false,
34+
exclusive: false,
35+
autoDelete: false,
36+
arguments: null);
37+
}
38+
*/
39+
40+
/*
1041
const int seedPort = 4000;
1142
int nbNode = 1;
1243
var allNodes = new List<INodeHost> { BuildNodeHost(seedPort, true) };
@@ -182,4 +213,5 @@ INodeHost BuildNodeHost(int port, bool isSeed = false)
182213
}
183214
184215
return serviceProvider.GetRequiredService<INodeHost>();
185-
}
216+
}
217+
*/

src/EventMesh/FaasNet.EventMesh.Runtime.Kafka/BrokerConfigurationExtensions.cs

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/EventMesh/FaasNet.EventMesh.Runtime.Kafka/Constants.cs

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)