Skip to content

Commit 6fa04d2

Browse files
Ticket #105 : Add seed AMQP
1 parent 828c8a9 commit 6fa04d2

10 files changed

Lines changed: 110 additions & 5 deletions

File tree

FaasNet.EventMesh.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Seed.AMQP
4343
EndProject
4444
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "01. Seeds", "01. Seeds", "{B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}"
4545
EndProject
46+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.Common", "src\Common\FaasNet.Common\FaasNet.Common.csproj", "{2D703121-6391-42AB-8D68-8AD85EB5BB4B}"
47+
EndProject
4648
Global
4749
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4850
Debug|Any CPU = Debug|Any CPU
@@ -105,6 +107,10 @@ Global
105107
{2DDF1EB1-B20F-419F-9A09-34A5550D4467}.Debug|Any CPU.Build.0 = Debug|Any CPU
106108
{2DDF1EB1-B20F-419F-9A09-34A5550D4467}.Release|Any CPU.ActiveCfg = Release|Any CPU
107109
{2DDF1EB1-B20F-419F-9A09-34A5550D4467}.Release|Any CPU.Build.0 = Release|Any CPU
110+
{2D703121-6391-42AB-8D68-8AD85EB5BB4B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
111+
{2D703121-6391-42AB-8D68-8AD85EB5BB4B}.Debug|Any CPU.Build.0 = Debug|Any CPU
112+
{2D703121-6391-42AB-8D68-8AD85EB5BB4B}.Release|Any CPU.ActiveCfg = Release|Any CPU
113+
{2D703121-6391-42AB-8D68-8AD85EB5BB4B}.Release|Any CPU.Build.0 = Release|Any CPU
108114
EndGlobalSection
109115
GlobalSection(SolutionProperties) = preSolution
110116
HideSolutionNode = FALSE
@@ -125,6 +131,7 @@ Global
125131
{19A0FABD-C40F-4E83-ADB6-DF285BAE4C99} = {20A0BD99-A4F3-4FD5-A6FA-1935D7464DB8}
126132
{2DDF1EB1-B20F-419F-9A09-34A5550D4467} = {B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}
127133
{B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
134+
{2D703121-6391-42AB-8D68-8AD85EB5BB4B} = {7A13262A-D1E6-4210-BA95-03A0741CBFA4}
128135
EndGlobalSection
129136
GlobalSection(ExtensibilityGlobals) = postSolution
130137
SolutionGuid = {B9BD3B8C-B2C9-468F-BF54-66BFE9B565EC}

src/EventMesh/FaasNet.EventMesh.Seed.AMQP/BasicDeliverEventArgsExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public static CloudEvent ToCloudEvent(this BasicDeliverEventArgs message, CloudE
2222
if (HasCloudEventsContentType(message, out var contentType)) return cloudEventFormatter.DecodeStructuredModeMessage(new MemoryStream(message.Body.ToArray()), new ContentType(contentType), null);
2323

2424
var cloudEvent = new CloudEvent();
25-
if (properties.Headers.ContainsKey(SpecVersionAmqpHeader))
25+
if (properties.Headers != null && properties.Headers.ContainsKey(SpecVersionAmqpHeader))
2626
{
2727
var version = Encoding.ASCII.GetString(properties.Headers[SpecVersionAmqpHeader] as byte[]);
2828
var specVersion = CloudEventsSpecVersion.FromVersionId(version);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using FaasNet.EventMesh.Seed;
2+
using FaasNet.EventMesh.Seed.AMQP;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using System;
5+
6+
namespace FaasNet.Common
7+
{
8+
public static class ServerBuilderExtensions
9+
{
10+
public static ServerBuilder AddAMQPSeed(this ServerBuilder serverBuilder, Action<SeedOptions> seedOptionsCallback = null, Action<EventMeshSeedAMQPOptions> amqpOptionsCallback = null)
11+
{
12+
serverBuilder.AddAMQPSeed(seedOptionsCallback, amqpOptionsCallback);
13+
return serverBuilder;
14+
}
15+
}
16+
}
Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
1-
namespace FaasNet.EventMesh.Seed.AMQP
1+
using FaasNet.EventMesh.Seed;
2+
using FaasNet.EventMesh.Seed.AMQP;
3+
using System;
4+
5+
namespace Microsoft.Extensions.DependencyInjection
26
{
37
public static class ServiceCollectionExtensions
48
{
9+
public static IServiceCollection AddAMQPSeed(this IServiceCollection services, Action<SeedOptions> seedOptionsCallback = null, Action<EventMeshSeedAMQPOptions> amqpOptionsCallback = null)
10+
{
11+
services.AddSeed(seedOptionsCallback);
12+
if (amqpOptionsCallback == null) services.Configure<EventMeshSeedAMQPOptions>((o) => { });
13+
else services.Configure(amqpOptionsCallback);
14+
services.AddTransient<ISeedJob, AMQPSeedJob>();
15+
return services;
16+
}
517
}
618
}
Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,34 @@
1-

1+

2+
using FaasNet.EventMesh.Seed;
3+
using Microsoft.Extensions.DependencyInjection;
4+
5+
var seedJobs = BuildJobs();
6+
Console.WriteLine("Press Enter to launch the jobs");
7+
Console.ReadLine();
8+
foreach (var seedJob in seedJobs) await seedJob.Start(CancellationToken.None);
9+
10+
Console.WriteLine("Press Enter to stop the jobs");
11+
Console.ReadLine();
12+
foreach (var seedJob in seedJobs) await seedJob.Stop();
13+
14+
15+
static IEnumerable<ISeedJob> BuildJobs()
16+
{
17+
var serviceProvider = new ServiceCollection()
18+
.AddAMQPSeed(o =>
19+
{
20+
o.EventMeshPort = 4000;
21+
o.EventMeshUrl = "localhost";
22+
o.Vpn = "default";
23+
o.ClientId = "publishClientId";
24+
}, o =>
25+
{
26+
o.JobId = "RabbitMQ";
27+
o.ConnectionFactory = (o) =>
28+
{
29+
o.Port = 30007;
30+
};
31+
})
32+
.BuildServiceProvider();
33+
return serviceProvider.GetRequiredService<IEnumerable<ISeedJob>>();
34+
}

src/EventMesh/FaasNet.EventMesh.Seed/BaseSeedJob.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace FaasNet.EventMesh.Seed
1010
{
11-
public abstract class BaseSeedJob
11+
public abstract class BaseSeedJob : ISeedJob
1212
{
1313
private CancellationTokenSource _tokenSource;
1414
private SeedOptions _seedOptions;
@@ -31,7 +31,7 @@ public async Task Start(CancellationToken cancellationToken)
3131
IsRunning = true;
3232
}
3333

34-
public async void Stop()
34+
public async Task Stop()
3535
{
3636
if (!IsRunning) throw new InvalidOperationException("Seed is not running");
3737
_tokenSource.Cancel();

src/EventMesh/FaasNet.EventMesh.Seed/FaasNet.EventMesh.Seed.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
77
</ItemGroup>
88
<ItemGroup>
9+
<ProjectReference Include="..\..\Common\FaasNet.Common\FaasNet.Common.csproj" />
910
<ProjectReference Include="..\FaasNet.EventMesh.Client\FaasNet.EventMesh.Client.csproj" />
1011
</ItemGroup>
1112
</Project>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace FaasNet.EventMesh.Seed
5+
{
6+
public interface ISeedJob
7+
{
8+
Task Start(CancellationToken cancellationToken);
9+
Task Stop();
10+
}
11+
}

src/EventMesh/FaasNet.EventMesh.Seed/SeedOptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22
{
33
public class SeedOptions
44
{
5+
public SeedOptions()
6+
{
7+
EventMeshUrl = "localhost";
8+
EventMeshPort = 4000;
9+
Vpn = "default";
10+
ClientId = "publishClientId";
11+
}
12+
513
public string EventMeshUrl { get; set; }
614
public int EventMeshPort { get; set; }
715
public string Vpn { get; set; }
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using FaasNet.EventMesh.Seed;
2+
using FaasNet.EventMesh.Seed.Stores;
3+
using System;
4+
5+
namespace Microsoft.Extensions.DependencyInjection
6+
{
7+
public static class ServiceCollectionExtensions
8+
{
9+
public static IServiceCollection AddSeed(this IServiceCollection services, Action<SeedOptions> seedOptionsCallback)
10+
{
11+
if (seedOptionsCallback == null) services.Configure<SeedOptions>(o => { });
12+
else services.Configure(seedOptionsCallback);
13+
services.AddSingleton<ISubscriptionStore, InMemorySubscriptionStore>();
14+
return services;
15+
}
16+
}
17+
}

0 commit comments

Comments
 (0)