Skip to content

Commit 2d2c7bf

Browse files
Ticket #106 : Can accept AMQP connection
1 parent a0219be commit 2d2c7bf

5 files changed

Lines changed: 93 additions & 89 deletions

File tree

kubernetes/localrabbitmq.yml

Lines changed: 10 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Cluster operator must be installed : kubectl apply -f "https://github.com/rabbitmq/cluster-operator/releases/latest/download/cluster-operator.yml"
2+
# default user : hello-world-default-user
3+
# default password: hello-world-default-user
14
apiVersion: v1
25
kind: Service
36
metadata:
@@ -14,72 +17,14 @@ spec:
1417
protocol: TCP
1518
nodePort: 30007
1619
selector:
17-
app: rabbitmq
20+
app.kubernetes.io/name: hello-world
1821
type: NodePort
1922
---
20-
apiVersion: v1
21-
kind: Service
22-
metadata:
23-
name: rabbitmq-entry
24-
spec:
25-
type: ClusterIP
26-
selector:
27-
app: rabbitmq
28-
ports:
29-
- port: 5672
30-
protocol: TCP
31-
targetPort: 5672
32-
---
33-
apiVersion: apps/v1
34-
kind: StatefulSet
23+
apiVersion: rabbitmq.com/v1beta1
24+
kind: RabbitmqCluster
3525
metadata:
36-
name: rabbitmq
26+
name: hello-world
3727
spec:
38-
serviceName: "rabbitmq"
39-
selector:
40-
matchLabels:
41-
app: rabbitmq
42-
replicas: 1
43-
template:
44-
metadata:
45-
labels:
46-
app: rabbitmq
47-
spec:
48-
containers:
49-
- name: rabbitmq
50-
image: rabbitmq:3.6.6-management-alpine
51-
lifecycle:
52-
postStart:
53-
exec:
54-
command:
55-
- /bin/sh
56-
- -c
57-
- >
58-
if [ -z "$(grep rabbitmq /etc/resolv.conf)" ]; then
59-
sed "s/^search \([^ ]\+\)/search rabbitmq.\1 \1/" /etc/resolv.conf > /etc/resolv.conf.new;
60-
cat /etc/resolv.conf.new > /etc/resolv.conf;
61-
rm /etc/resolv.conf.new;
62-
fi;
63-
until rabbitmqctl node_health_check; do sleep 1; done;
64-
if [[ "$HOSTNAME" != "rabbitmq-0" && -z "$(rabbitmqctl cluster_status | grep rabbitmq-0)" ]]; then
65-
rabbitmqctl stop_app;
66-
rabbitmqctl join_cluster rabbit@rabbitmq-0;
67-
rabbitmqctl start_app;
68-
fi;
69-
rabbitmqctl set_policy ha-all "." '{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}'
70-
ports:
71-
- containerPort: 5672
72-
name: amqp
73-
volumeMounts:
74-
- name: rabbitmq
75-
mountPath: /var/lib/rabbitmq
76-
volumeClaimTemplates:
77-
- metadata:
78-
name: rabbitmq
79-
annotations:
80-
volume.alpha.kubernetes.io/storage-class: anything
81-
spec:
82-
accessModes: [ "ReadWriteOnce" ]
83-
resources:
84-
requests:
85-
storage: 1Gi
28+
rabbitmq:
29+
additionalPlugins:
30+
- rabbitmq_amqp1_0

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Framing/Frame.cs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Amqp;
22
using Amqp.Framing;
3+
using Amqp.Types;
34
using System;
45

56
namespace FaasNet.EventMesh.Protocols.AMQP.Framing
@@ -29,31 +30,33 @@ public class Frame
2930
public FrameTypes Type { get; set; }
3031
public ushort Channel { get; set; }
3132

32-
public static void Decode(ByteBuffer buffer, out ushort channel)
33+
public static void Decode(ByteBuffer buffer, out ushort channel, out DescribedList command)
3334
{
3435
AmqpBitConverter.ReadUInt(buffer);
3536
AmqpBitConverter.ReadUByte(buffer);
3637
AmqpBitConverter.ReadUByte(buffer);
3738
channel = AmqpBitConverter.ReadUShort(buffer);
39+
if (buffer.Length > 0)
40+
{
41+
var tt = Encoder.ReadDescribed(buffer, Encoder.ReadFormatCode(buffer));
42+
command = (DescribedList)tt;
43+
}
44+
else
45+
{
46+
command = null;
47+
}
3848
}
3949

40-
public byte[] Serialize()
50+
public ByteBuffer Serialize(DescribedList cmd)
4151
{
4252
var buffer = new ByteBuffer(CmdBufferSize, false);
4353
buffer.Append(FixedWidth.UInt);
4454
AmqpBitConverter.WriteUByte(buffer, DOFF);
4555
AmqpBitConverter.WriteUByte(buffer, (byte)Type);
46-
switch(Type)
47-
{
48-
case FrameTypes.Amqp:
49-
AmqpBitConverter.WriteUShort(buffer, Channel);
50-
break;
51-
}
52-
53-
var accepted = new Accepted();
54-
accepted.Encode(buffer);
56+
AmqpBitConverter.WriteUShort(buffer, Channel);
57+
cmd.Encode(buffer);
5558
AmqpBitConverter.WriteInt(buffer.Buffer, buffer.Offset, buffer.Length);
56-
return buffer.Buffer;
59+
return buffer;
5760
}
5861
}
5962
}

src/EventMesh/FaasNet.EventMesh.Protocols.AMQP/Receiver.cs

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
using FaasNet.EventMesh.Protocols.AMQP.Framing;
1+
using Amqp;
2+
using Amqp.Sasl;
3+
using Amqp.Types;
4+
using FaasNet.EventMesh.Protocols.AMQP.Framing;
25
using System.Net;
36
using System.Net.Sockets;
47

@@ -29,13 +32,35 @@ private void Launch()
2932

3033
private void Listen()
3134
{
35+
var saslHeaderNegotiation = new ProtocolHeader { Id = 3, Major = 1, Minor = 0, Revision = 0 };
3236
var handler = _server.Accept();
33-
// Header.
34-
var header = ReadHeader(handler);
35-
handler.Send(header.Serialize());
36-
// Frames
37-
var frame = new Frame { Channel = 0, Type = FrameTypes.Amqp };
38-
handler.Send(frame.Serialize());
37+
// Check header & protocol.
38+
ReadHeader(handler);
39+
handler.Send(saslHeaderNegotiation.Serialize(), SocketFlags.None);
40+
// Return supported authentication mechanisms.
41+
var saslFrame = new Frame { Channel = 0, Type = FrameTypes.Sasl };
42+
var cmd = new SaslMechanisms
43+
{
44+
SaslServerMechanisms = new Amqp.Types.Symbol[]
45+
{
46+
new Amqp.Types.Symbol("PLAIN")
47+
}
48+
};
49+
Send(handler, saslFrame.Serialize(cmd));
50+
// Authenticate the user.
51+
ReadFrame(handler);
52+
var sCmd = new SaslOutcome { Code = SaslCode.Ok };
53+
Send(handler, saslFrame.Serialize(sCmd));
54+
// Open
55+
var saslSecuredConnection = new ProtocolHeader { Id = 0, Major = 1, Minor = 0, Revision = 0 };
56+
handler.Send(saslSecuredConnection.Serialize(), SocketFlags.None);
57+
var amqpFrame = new Frame { Channel = 0, Type = FrameTypes.Amqp };
58+
var tCmd = new Amqp.Framing.Open
59+
{
60+
HostName = "localhost"
61+
};
62+
Send(handler, amqpFrame.Serialize(tCmd));
63+
string s = "";
3964
}
4065

4166
private ProtocolHeader ReadHeader(Socket socket)
@@ -45,5 +70,30 @@ private ProtocolHeader ReadHeader(Socket socket)
4570
var receivedHeader = ProtocolHeader.Create(headerBuffer, 0);
4671
return receivedHeader;
4772
}
73+
74+
private Sasl.SaslPlainProfile ReadSaslPlainProfile()
75+
{
76+
return null;
77+
}
78+
79+
private static void Send(Socket socket, ByteBuffer buffer)
80+
{
81+
socket.Send(buffer.Buffer, buffer.Offset, buffer.Length, SocketFlags.None);
82+
}
83+
84+
private static DescribedList ReadFrame(Socket socket)
85+
{
86+
var sizeFrameBuffer = new byte[FixedWidth.UInt];
87+
socket.Receive(sizeFrameBuffer, 0, FixedWidth.UInt, SocketFlags.None);
88+
int size = AmqpBitConverter.ReadInt(sizeFrameBuffer, 0);
89+
var frameBuffer = new ByteBuffer(size, false);
90+
AmqpBitConverter.WriteInt(frameBuffer, size);
91+
socket.Receive(frameBuffer.Buffer, frameBuffer.Length, frameBuffer.Size, SocketFlags.None);
92+
frameBuffer.Append(frameBuffer.Size);
93+
ushort channel;
94+
DescribedList command;
95+
Frame.Decode(frameBuffer, out channel, out command);
96+
return command;
97+
}
4898
}
4999
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace FaasNet.EventMesh.Protocols.AMQP.Sasl
2+
{
3+
public class SaslPlainProfile
4+
{
5+
public string User { get; set; }
6+
public string Password { get; set; }
7+
8+
9+
}
10+
}

src/EventMesh/FaasNet.EventMesh.Runtime.Console/Program.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,7 @@
1010
using FaasNet.RaftConsensus.Core.Stores;
1111
using Microsoft.Extensions.DependencyInjection;
1212
using RabbitMQ.Client;
13-
14-
var frame = new Frame { Channel = 4, Type = 0 };
15-
var payload = frame.Serialize();
16-
var buffer = new ByteBuffer(payload, 0, payload.Count(), 0);
17-
Frame.Decode(buffer, out ushort channel);
13+
using ConnectionFactory = RabbitMQ.Client.ConnectionFactory;
1814

1915
var amqpServer = new AMQPServer();
2016
amqpServer.Start();

0 commit comments

Comments
 (0)