Skip to content

Commit f08d57a

Browse files
Ticket #111 : Add plugin mechanism
1 parent d56cd24 commit f08d57a

24 files changed

Lines changed: 250 additions & 49 deletions

default.ps1

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ task publishDocker {
6363
task publishEventMeshPlugins {
6464
exec { dotnet publish $source_dir\EventMesh\FaasNet.EventMesh.Protocols.AMQP\FaasNet.EventMesh.Protocols.AMQP.csproj -c $config -o $result_dir\protocolPlugins\FaasNet.EventMesh.Protocols.AMQP }
6565
exec { dotnet publish $source_dir\EventMesh\FaasNet.EventMesh.Protocols.WebSocket\FaasNet.EventMesh.Protocols.WebSocket.csproj -c $config -o $result_dir\protocolPlugins\FaasNet.EventMesh.Protocols.WebSocket }
66+
exec { dotnet publish $source_dir\EventMesh\FaasNet.EventMesh.Sink.AMQP\FaasNet.EventMesh.Sink.AMQP.csproj -c $config -o $result_dir\sinkPlugins\FaasNet.EventMesh.Sink.AMQP }
67+
exec { dotnet publish $source_dir\EventMesh\FaasNet.EventMesh.Sink.Kafka\FaasNet.EventMesh.Sink.Kafka.csproj -c $config -o $result_dir\sinkPlugins\FaasNet.EventMesh.Sink.Kafka }
68+
exec { dotnet publish $source_dir\EventMesh\FaasNet.EventMesh.Sink.VpnBridge\FaasNet.EventMesh.Sink.VpnBridge.csproj -c $config -o $result_dir\sinkPlugins\FaasNet.EventMesh.Sink.VpnBridgea }
6669
}
6770

6871
task publishEventMeshService {

src/EventMesh/FaasNet.EventMesh.Common/ConsoleHelper.cs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -261,13 +261,10 @@ private static async Task StartAMQPSeed()
261261
seed.EventMeshUrl = "localhost";
262262
}, amqp =>
263263
{
264-
amqp.ConnectionFactory = (c) =>
265-
{
266-
c.UserName = "default_user_TBRVU8sYJmyVBXKeiTD";
267-
c.Password = "d67v-U305u8Sh0dOwn02pTIuo2jsnLwY";
268-
c.Port = 30007;
269-
};
270-
}).UseSeedRocksDB();
264+
amqp.AMQPUserName = "default_user_TBRVU8sYJmyVBXKeiTD";
265+
amqp.AMQPPassword = "d67v-U305u8Sh0dOwn02pTIuo2jsnLwY";
266+
amqp.AMQPPort = 30007;
267+
}).UseSinkRocksDB();
271268
var serviceProvider = serviceCollection.BuildServiceProvider();
272269
var seedJob = serviceProvider.GetRequiredService<ISinkJob>();
273270
await seedJob.Start(CancellationToken.None);
@@ -279,7 +276,7 @@ private static async Task StartVpnBridgeSeed()
279276
serviceCollection.AddVpnBridgeSeed(o =>
280277
{
281278
o.EventMeshPort = _seedPort;
282-
}).UseSeedRocksDB(); ;
279+
}).UseSinkRocksDB(); ;
283280
var serviceProvider = serviceCollection.BuildServiceProvider();
284281
var seedJob = serviceProvider.GetRequiredService<ISinkJob>();
285282
await seedJob.Start(CancellationToken.None);
@@ -297,7 +294,7 @@ private static async Task StartKafkaSeed()
297294
}, kafka =>
298295
{
299296
kafka.BootstrapServers = "localhost:29092";
300-
}).UseSeedRocksDB();
297+
}).UseSinkRocksDB();
301298
var serviceProvider = serviceCollection.BuildServiceProvider();
302299
var seedJob = serviceProvider.GetRequiredService<ISinkJob>();
303300
await seedJob.Start(CancellationToken.None);
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
{
22
"DllName": "FaasNet.EventMesh.Protocols.AMQP.dll",
33
"Configuration": {
4-
"Port": 5672
54
}
65
}

src/EventMesh/FaasNet.EventMesh.Protocols.WebSocket/FaasNet.EventMesh.Protocols.WebSocket.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<TargetFramework>net6.0</TargetFramework>
4+
<EnableDynamicLoading>true</EnableDynamicLoading>
45
</PropertyGroup>
56
<ItemGroup>
67
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
{
22
"DllName": "FaasNet.EventMesh.Protocols.WebSocket.dll",
33
"Configuration": {
4-
"Port": 2803
54
}
65
}
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
1-
namespace FaasNet.EventMesh.Service
1+
using FaasNet.EventMesh.Sink;
2+
3+
namespace FaasNet.EventMesh.Service
24
{
3-
public class EventMeshServerSinkWorker
5+
public class EventMeshServerSinkWorker : IHostedService
46
{
7+
private readonly IEnumerable<ISinkJob> _sinkJobs;
8+
9+
public EventMeshServerSinkWorker(IEnumerable<ISinkJob> sinkJobs)
10+
{
11+
_sinkJobs = sinkJobs;
12+
}
13+
14+
public async Task StartAsync(CancellationToken cancellationToken)
15+
{
16+
foreach (var job in _sinkJobs) await job.Start(cancellationToken);
17+
}
18+
19+
public async Task StopAsync(CancellationToken cancellationToken)
20+
{
21+
foreach (var job in _sinkJobs) await job.Stop();
22+
}
523
}
624
}

src/EventMesh/FaasNet.EventMesh.Service/EventMeshServerWorker.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
11
using FaasNet.RaftConsensus.Core;
2-
using Microsoft.Extensions.Hosting;
3-
using System.Threading;
4-
using System.Threading.Tasks;
52

63
namespace FaasNet.EventMesh.Service
74
{
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
<Project Sdk="Microsoft.NET.Sdk.Worker">
1+
<Project Sdk="Microsoft.NET.Sdk.Web">
22
<PropertyGroup>
33
<OutputType>Exe</OutputType>
44
<TargetFramework>net6.0</TargetFramework>
55
<ImplicitUsings>true</ImplicitUsings>
6-
<PublishSingleFile>true</PublishSingleFile>
7-
<RuntimeIdentifier>win-x64</RuntimeIdentifier>
8-
<PlatformTarget>x64</PlatformTarget>
96
</PropertyGroup>
107
<ItemGroup>
118
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.0" />
@@ -15,5 +12,7 @@
1512
<ProjectReference Include="..\..\RaftConsensus\FaasNet.RaftConsensus.RocksDB\FaasNet.RaftConsensus.RocksDB.csproj" />
1613
<ProjectReference Include="..\FaasNet.EventMesh.Protocols\FaasNet.EventMesh.Protocols.csproj" />
1714
<ProjectReference Include="..\FaasNet.EventMesh.Runtime\FaasNet.EventMesh.Runtime.csproj" />
15+
<ProjectReference Include="..\FaasNet.EventMesh.Sink.RocksDB\FaasNet.EventMesh.Sink.RocksDB.csproj" />
16+
<ProjectReference Include="..\FaasNet.EventMesh.Sink\FaasNet.EventMesh.Sink.csproj" />
1817
</ItemGroup>
1918
</Project>
Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using FaasNet.Common;
22
using FaasNet.EventMesh.Protocols;
33
using FaasNet.EventMesh.Service;
4+
using FaasNet.EventMesh.Sink;
45

56
using IHost host = Host.CreateDefaultBuilder(args)
67
.UseWindowsService(o =>
@@ -13,21 +14,42 @@
1314
})
1415
.ConfigureServices((hostContext, services) =>
1516
{
16-
var pluginsDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "plugins");
17-
IEnumerable<string> pluginPaths = new string[0];
18-
if(Directory.Exists(pluginsDirectory)) pluginPaths = Directory.EnumerateDirectories(pluginsDirectory);
19-
var discoveredProtocolPlugins = pluginPaths.Select(p =>
20-
{
21-
if (ProtocolPluginEntryDiscovery.TryExtract(p, out IDiscoveredPlugin discoveredPlugin)) return discoveredPlugin;
22-
return null;
23-
}).Where(p => p != null);
2417
var options = hostContext.Configuration.Get<EventMeshServerOptions>();
2518
var serverBuilder = services.AddEventMeshServer(consensusNodeCallback: o => o.Port = options.Port)
26-
.UseRocksDB(o =>
27-
{
28-
o.SubPath = $"node{options.Port}";
29-
});
30-
foreach (var discoveredPlugin in discoveredProtocolPlugins) discoveredPlugin.Load(serverBuilder.Services);
31-
services.AddHostedService<EventMeshServerWorker>();
19+
.UseRocksDB(o =>
20+
{
21+
o.SubPath = $"node{options.Port}";
22+
})
23+
.UseSinkRocksDB();
24+
LoadProtocolPlugins(serverBuilder.Services);
25+
LoadSinkPlugins(serverBuilder.Services);
3226
}).Build();
33-
await host.RunAsync();
27+
await host.RunAsync();
28+
29+
static void LoadProtocolPlugins(IServiceCollection services)
30+
{
31+
var pluginsDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "protocolPlugins");
32+
IEnumerable<string> pluginPaths = new string[0];
33+
if (Directory.Exists(pluginsDirectory)) pluginPaths = Directory.EnumerateDirectories(pluginsDirectory);
34+
var discoveredProtocolPlugins = pluginPaths.Select(p =>
35+
{
36+
if (ProtocolPluginEntryDiscovery.TryExtract(p, out FaasNet.EventMesh.Protocols.IDiscoveredPlugin discoveredPlugin)) return discoveredPlugin;
37+
return null;
38+
}).Where(p => p != null);
39+
foreach (var discoveredPlugin in discoveredProtocolPlugins) discoveredPlugin.Load(services);
40+
services.AddHostedService<EventMeshServerWorker>();
41+
}
42+
43+
static void LoadSinkPlugins(IServiceCollection services)
44+
{
45+
var pluginsDirectory = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "sinkPlugins");
46+
IEnumerable<string> pluginPaths = new string[0];
47+
if (Directory.Exists(pluginsDirectory)) pluginPaths = Directory.EnumerateDirectories(pluginsDirectory);
48+
var discoveredSinkPlugins = pluginPaths.Select(p =>
49+
{
50+
if (SinkPluginEntryDiscovery.TryExtract(p, out FaasNet.EventMesh.Sink.IDiscoveredPlugin discoveredPlugin)) return discoveredPlugin;
51+
return null;
52+
}).Where(p => p != null);
53+
foreach (var discoveredPlugin in discoveredSinkPlugins) discoveredPlugin.Load(services);
54+
services.AddHostedService<EventMeshServerSinkWorker>();
55+
}

src/EventMesh/FaasNet.EventMesh.Sink.AMQP/AMQPSinkJob.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ private IConnection BuildConnection()
5454
{
5555
if (_connection != null) return _connection;
5656
var connectionFactory = new ConnectionFactory();
57-
_options.ConnectionFactory(connectionFactory);
57+
connectionFactory.HostName = _options.AMQPHostName;
58+
connectionFactory.Port = _options.AMQPPort;
59+
connectionFactory.UserName = _options.AMQPUserName;
60+
connectionFactory.Password = _options.AMQPPassword;
5861
_connection = connectionFactory.CreateConnection();
5962
return _connection;
6063
}

0 commit comments

Comments
 (0)