Skip to content

Commit 97b5dd5

Browse files
Ticket #113 : Add documentation on AMQP1.0 plugin
1 parent a585842 commit 97b5dd5

11 files changed

Lines changed: 233 additions & 11 deletions

File tree

FaasNet.Samples.sln

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.1.32328.378
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "01. EventMesh", "01. EventMesh", "{7A371086-78D9-4632-96F4-DCF31581F3AE}"
7+
EndProject
8+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FaasNet.EventMesh.AMQPClient", "src\Samples\FaasNet.EventMesh.AMQPClient\FaasNet.EventMesh.AMQPClient.csproj", "{6AE025D4-3488-4809-A370-5682A30E820D}"
9+
EndProject
10+
Global
11+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
12+
Debug|Any CPU = Debug|Any CPU
13+
Release|Any CPU = Release|Any CPU
14+
EndGlobalSection
15+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
16+
{6AE025D4-3488-4809-A370-5682A30E820D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
17+
{6AE025D4-3488-4809-A370-5682A30E820D}.Debug|Any CPU.Build.0 = Debug|Any CPU
18+
{6AE025D4-3488-4809-A370-5682A30E820D}.Release|Any CPU.ActiveCfg = Release|Any CPU
19+
{6AE025D4-3488-4809-A370-5682A30E820D}.Release|Any CPU.Build.0 = Release|Any CPU
20+
EndGlobalSection
21+
GlobalSection(SolutionProperties) = preSolution
22+
HideSolutionNode = FALSE
23+
EndGlobalSection
24+
GlobalSection(NestedProjects) = preSolution
25+
{6AE025D4-3488-4809-A370-5682A30E820D} = {7A371086-78D9-4632-96F4-DCF31581F3AE}
26+
EndGlobalSection
27+
GlobalSection(ExtensibilityGlobals) = postSolution
28+
SolutionGuid = {007D7A77-3286-48F7-974F-20075BCB6160}
29+
EndGlobalSection
30+
EndGlobal

docs/documentation/eventmesh/pluginamqp.md

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,110 @@ The ZIP file can be downloaded [here]().
2626

2727
## Quick start
2828

29-
TODO
29+
Once you have an up and running EventMesh server with `ProtocolAmqp` plugin enabled, you can start using any client compliant with the AMQP 1.0 protocol.
30+
31+
### Configure client and VPN
32+
33+
Before going further, a Virtual Private Network (VPN) and two clients must be configured.
34+
Those information will be used to publish message and subscribe to one or more topic.
35+
36+
Open a command prompt and create a topic named `default` :
37+
38+
```
39+
FaasNet.EventMeshCTL.CLI.exe add_vpn --name=default
40+
```
41+
42+
Add a client `publishClient`, as the name suggests, it will be used to publish message.
43+
44+
```
45+
FaasNet.EventMeshCTL.CLI.exe add_client --vpn=default --identifier=publishClient --publish_enabled=true --subscription_enabled=false
46+
```
47+
48+
Add a client `subscribeClient`, this client will be used for subscription.
49+
50+
```
51+
FaasNet.EventMeshCTL.CLI.exe add_client --vpn=default --identifier=subscribeClient --publish_enabled=false --subscription_enabled=true
52+
```
53+
54+
### Configure the plugin
55+
56+
If the plugin is not yet configured, it can be enabled like this
57+
58+
```
59+
FaasNet.EventMeshCTL.CLI.exe get_plugin_configuration --name=ProtocolAmqp
60+
```
61+
62+
Its configuration can be updated either by [using CLI](cli.md) or by updating the configuration file `appsettings.json`.
63+
64+
Don't forget that the EventMesh server must be restarted, otherwise the changes are not taken into account.
65+
66+
When the configuration is finished, a client can be created and can start publishing message.
67+
68+
## Source Code
69+
70+
The source code of this project can be found [here]().
71+
72+
### Create a client
73+
74+
In this tutorial, we will explain how to create a C# client to publish message and subscribe to one topic `q1`.
75+
76+
Create a new console application and install the Nuget package `AmqpNetLite`
77+
78+
```
79+
dotnet add package AmqpNetLite
80+
```
81+
82+
In the `Program.cs` file add a static method used to send message.
83+
84+
```
85+
static void SendMessage()
86+
{
87+
var address = new Address($"amqp://publishClient:publishClient@localhost:{port}");
88+
var connection = new Connection(address);
89+
var session = new Session(connection);
90+
var message = new Message("Hello AMQP!");
91+
var sender = new SenderLink(session, "sender-link", "q1");
92+
Console.WriteLine("Sent Hello AMQP!");
93+
sender.Send(message);
94+
sender.Close();
95+
session.Close();
96+
connection.Close();
97+
}
98+
```
99+
100+
Add a second static method used to subscribe to a topic :
101+
102+
```
103+
static void ReceiveMessage()
104+
{
105+
Task.Run(() =>
106+
{
107+
var address = new Address($"amqp://subscribeClient:subscribeClient@localhost:{port}");
108+
var connection = new Connection(address);
109+
var session = new Session(connection);
110+
var receiver = new ReceiverLink(session, "receiver-link", "q1");
111+
var message = receiver.Receive();
112+
Console.WriteLine("Received " + message.Body.ToString());
113+
});
114+
}
115+
```
116+
117+
Call both methods like this :
118+
119+
```
120+
const int port = 5672;
121+
Console.WriteLine("Hello, World!");
122+
ReceiveMessage();
123+
SendMessage();
124+
Console.WriteLine("Press enter to quit the application");
125+
Console.ReadLine();
126+
```
127+
128+
Build the project and run.
129+
130+
The console application should displayed two messages :
131+
132+
```
133+
Send Hello AMQP!
134+
Received Hello AMQP!
135+
```

docs/documentation/eventmesh/plugins.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ FaasNet.EventMeshCTL.CLI.exe update_plugin_configuration --name=<PLUGIN_NAME> --
4141
```
4242

4343
> [!IMPORTANT]
44-
> Once the plugin is properly configured. The EventMesh server must be restarted in order to take into consideration your modifications.
44+
> Once the plugin is properly configured. The EventMesh server must be restarted in order to take into account your modifications.
4545
4646
## Supported plugins
4747

src/EventMesh/FaasNet.EventMesh.Plugin/PluginEntryDiscovery.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,23 @@ namespace FaasNet.EventMesh.Plugin
1111
{
1212
public class PluginEntryDiscovery
1313
{
14-
public static bool TryExtract(string pluginDirectoryPath, IEnumerable<string> activePlugins, out IDiscoveredPlugin discoveryPlugin)
14+
public static bool TryExtract(string pluginDirectoryPath, IEnumerable<string> activePlugins, IEnumerable<Type> additionalSharedTypes, out IDiscoveredPlugin discoveryPlugin)
1515
{
1616
discoveryPlugin = null;
17+
var sharedTypes = new List<Type> { typeof(IPlugin<>), typeof(IServiceCollection) };
18+
if(additionalSharedTypes != null)
19+
{
20+
foreach(var additionalSharedType in additionalSharedTypes) sharedTypes.Add(additionalSharedType);
21+
}
22+
1723
var pluginEntry = PluginConfigurationFile.Read(pluginDirectoryPath);
1824
if(pluginEntry == null) return false;
1925
if (!activePlugins.Contains(pluginEntry.Name)) return false;
2026
var dllPath = Path.Combine(pluginDirectoryPath, pluginEntry.DllName);
2127
if (!File.Exists(dllPath)) return false;
2228
var loader = PluginLoader.CreateFromAssemblyFile(
2329
dllPath,
24-
sharedTypes: new[] { typeof(IPlugin<>), typeof(IServiceCollection) });
30+
sharedTypes: sharedTypes.ToArray());
2531
var assembly = loader.LoadDefaultAssembly();
2632
var types = assembly.GetTypes();
2733
var pluginType = types.FirstOrDefault(t => t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IPlugin<>)));

src/EventMesh/FaasNet.EventMesh.Runtime/Handlers/PublishMessageRequestHandler.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ private async Task SendMessage(PublishMessageRequest message, CancellationToken
8686
private async Task<ClusterNode> GetRandomClusterNode(CancellationToken cancellationToken)
8787
{
8888
var nodes = await _clusterStore.GetAllNodes(cancellationToken);
89+
if (nodes.Count() == 1) return nodes.First();
8990
nodes = nodes.Where(n => n.Port != _nodeOptions.Port || n.Url != _nodeOptions.Url);
9091
var rnd = new Random();
9192
var rndIndex = rnd.Next(0, nodes.Count() - 1);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using FaasNet.EventMesh.Protocols;
2+
3+
namespace FaasNet.EventMesh.Service
4+
{
5+
public class EventMeshProxyWorker : IHostedService
6+
{
7+
private readonly IEnumerable<IProxy> _proxyLst;
8+
9+
public EventMeshProxyWorker(IEnumerable<IProxy> proxyLst)
10+
{
11+
_proxyLst = proxyLst;
12+
}
13+
14+
public async Task StartAsync(CancellationToken cancellationToken)
15+
{
16+
foreach (var proxy in _proxyLst) await proxy.Start();
17+
}
18+
19+
public Task StopAsync(CancellationToken cancellationToken)
20+
{
21+
foreach (var proxy in _proxyLst) proxy.Stop();
22+
return Task.CompletedTask;
23+
}
24+
}
25+
}

src/EventMesh/FaasNet.EventMesh.Service/Program.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
using FaasNet.Common;
22
using FaasNet.EventMesh.Plugin;
3+
using FaasNet.EventMesh.Protocols;
34
using FaasNet.EventMesh.Runtime;
45
using FaasNet.EventMesh.Runtime.Stores;
56
using FaasNet.EventMesh.Service;
7+
using FaasNet.EventMesh.Sink;
68

79
using IHost host = Host.CreateDefaultBuilder(args)
810
.UseWindowsService(o =>
@@ -26,6 +28,7 @@
2628
var activePlugins = pluginStore.GetActivePlugins();
2729
LoadProtocolPlugins(serverBuilder.Services, activePlugins);
2830
LoadSinkPlugins(serverBuilder.Services, activePlugins);
31+
services.AddHostedService<EventMeshServerWorker>();
2932
}).Build();
3033
await host.RunAsync();
3134

@@ -36,11 +39,11 @@ static void LoadProtocolPlugins(IServiceCollection services, IEnumerable<string>
3639
if (Directory.Exists(pluginsDirectory)) pluginPaths = Directory.EnumerateDirectories(pluginsDirectory);
3740
var discoveredProtocolPlugins = pluginPaths.Select(p =>
3841
{
39-
if (PluginEntryDiscovery.TryExtract(p, activePlugins, out IDiscoveredPlugin discoveredPlugin)) return discoveredPlugin;
42+
if (PluginEntryDiscovery.TryExtract(p, activePlugins, new[] { typeof(IProxy) }, out IDiscoveredPlugin discoveredPlugin)) return discoveredPlugin;
4043
return null;
4144
}).Where(p => p != null);
4245
foreach (var discoveredPlugin in discoveredProtocolPlugins) discoveredPlugin.Load(services);
43-
services.AddHostedService<EventMeshServerWorker>();
46+
services.AddHostedService<EventMeshProxyWorker>();
4447
}
4548

4649
static void LoadSinkPlugins(IServiceCollection services, IEnumerable<string> activePlugins)
@@ -50,7 +53,7 @@ static void LoadSinkPlugins(IServiceCollection services, IEnumerable<string> act
5053
if (Directory.Exists(pluginsDirectory)) pluginPaths = Directory.EnumerateDirectories(pluginsDirectory);
5154
var discoveredSinkPlugins = pluginPaths.Select(p =>
5255
{
53-
if (PluginEntryDiscovery.TryExtract(p, activePlugins, out IDiscoveredPlugin discoveredPlugin)) return discoveredPlugin;
56+
if (PluginEntryDiscovery.TryExtract(p, activePlugins, new[] { typeof(ISinkJob) }, out IDiscoveredPlugin discoveredPlugin)) return discoveredPlugin;
5457
return null;
5558
}).Where(p => p != null);
5659
foreach (var discoveredPlugin in discoveredSinkPlugins) discoveredPlugin.Load(services);

src/EventMesh/FaasNet.EventMeshCTL.CLI/Program.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private static void AddVpnCommand(CommandLineApplication app)
8484
return;
8585
}
8686

87-
Console.WriteLine($"VPN {vpnName} has been added");
87+
DisplaySuccess($"VPN {vpnName} has been added");
8888
});
8989
});
9090
}
@@ -115,7 +115,7 @@ private static void AddClientCommand(CommandLineApplication app)
115115
idOption.DefaultValue = "clientId";
116116
var pubEnabled = addClientCmd.Option<bool>("-p|--publish_enabled <PUBLISH>", "Enable publish", CommandOptionType.SingleValue);
117117
pubEnabled.DefaultValue = true;
118-
var subEnabled = addClientCmd.Option<bool>("-s|--subription_enabled <SUBSCRIPTION>", "Enable subscription", CommandOptionType.SingleValue);
118+
var subEnabled = addClientCmd.Option<bool>("-s|--subscription_enabled <SUBSCRIPTION>", "Enable subscription", CommandOptionType.SingleValue);
119119
subEnabled.DefaultValue = true;
120120
addClientCmd.OnExecuteAsync(async (token) =>
121121
{
@@ -134,7 +134,7 @@ private static void AddClientCommand(CommandLineApplication app)
134134
return;
135135
}
136136

137-
Console.WriteLine($"Client {idOption.ParsedValue} has been added");
137+
DisplaySuccess($"Client {idOption.ParsedValue} has been added");
138138
});
139139
});
140140
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/BasePeerHost.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,14 @@ private async Task StartElection()
183183
_logger.LogInformation("{Node}:{PeerId}:{TermId}, Start to vote", _nodeId, _peerId, Info.TermId);
184184
var nodes = await _clusterStore.GetAllNodes(TokenSource.Token);
185185
nodes = nodes.Where(n => n.Port != _nodeOptions.Port || n.Url != _nodeOptions.Url);
186-
_quorum = (nodes.Count() / 2) + 1;
186+
_quorum = !nodes.Any() ? 0 : (nodes.Count() / 2) + 1;
187+
/*
187188
if (_quorum == 0)
188189
{
189190
_logger.LogError("{Node}:{PeerId}:{TermId}, There is no enough nodes", _nodeId, _peerId, Info.TermId);
190191
return;
191192
}
193+
*/
192194

193195
_nbPositiveVote = 0;
194196
_expirationCheckElectionDateTime = DateTime.UtcNow.AddMilliseconds(_options.ElectionCheckDurationMS);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net6.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="AmqpNetLite" Version="2.4.4" />
12+
</ItemGroup>
13+
14+
</Project>

0 commit comments

Comments
 (0)