Skip to content

Commit f0db0a3

Browse files
Ticket #112 : Add command to enable/disable plugins
1 parent f08d57a commit f0db0a3

36 files changed

Lines changed: 620 additions & 24 deletions

FaasNet.EventMesh.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Sink.VpnB
6969
EndProject
7070
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Sink.RocksDB", "src\EventMesh\FaasNet.EventMesh.Sink.RocksDB\FaasNet.EventMesh.Sink.RocksDB.csproj", "{810659FD-5FAE-41B8-BC14-4499B6993CFB}"
7171
EndProject
72+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FaasNet.EventMesh.Plugin", "src\EventMesh\FaasNet.EventMesh.Plugin\FaasNet.EventMesh.Plugin.csproj", "{47847CD3-9CF4-4BF3-984C-94013E48D444}"
73+
EndProject
7274
Global
7375
GlobalSection(SolutionConfigurationPlatforms) = preSolution
7476
Debug|Any CPU = Debug|Any CPU
@@ -167,6 +169,10 @@ Global
167169
{810659FD-5FAE-41B8-BC14-4499B6993CFB}.Debug|Any CPU.Build.0 = Debug|Any CPU
168170
{810659FD-5FAE-41B8-BC14-4499B6993CFB}.Release|Any CPU.ActiveCfg = Release|Any CPU
169171
{810659FD-5FAE-41B8-BC14-4499B6993CFB}.Release|Any CPU.Build.0 = Release|Any CPU
172+
{47847CD3-9CF4-4BF3-984C-94013E48D444}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
173+
{47847CD3-9CF4-4BF3-984C-94013E48D444}.Debug|Any CPU.Build.0 = Debug|Any CPU
174+
{47847CD3-9CF4-4BF3-984C-94013E48D444}.Release|Any CPU.ActiveCfg = Release|Any CPU
175+
{47847CD3-9CF4-4BF3-984C-94013E48D444}.Release|Any CPU.Build.0 = Release|Any CPU
170176
EndGlobalSection
171177
GlobalSection(SolutionProperties) = preSolution
172178
HideSolutionNode = FALSE
@@ -198,6 +204,7 @@ Global
198204
{2DCA720C-ABB8-4917-B5FE-A2393E79A76F} = {B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}
199205
{01E4D8EF-A594-4497-B1CA-D17372656006} = {B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}
200206
{810659FD-5FAE-41B8-BC14-4499B6993CFB} = {B7CDEBEF-C508-4EFA-BEC9-9D9D6E5D80C5}
207+
{47847CD3-9CF4-4BF3-984C-94013E48D444} = {6E495E0A-0DC8-4E42-8C58-3C48506D3D24}
201208
EndGlobalSection
202209
GlobalSection(ExtensibilityGlobals) = postSolution
203210
SolutionGuid = {B9BD3B8C-B2C9-468F-BF54-66BFE9B565EC}

src/EventMesh/FaasNet.EventMesh.Client/Constants.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ public class Constants
2121
{ Commands.PUBLISH_MESSAGE_REQUEST, Commands.PUBLISH_MESSAGE_RESONSE },
2222
{ Commands.GET_ALL_VPNS_REQUEST, Commands.GET_ALL_VPNS_RESPONSE },
2323
{ Commands.ADD_VPN_REQUEST, Commands.ADD_VPN_RESPONSE },
24-
{ Commands.ADD_CLIENT_REQUEST, Commands.ADD_CLIENT_RESPONSE }
24+
{ Commands.ADD_CLIENT_REQUEST, Commands.ADD_CLIENT_RESPONSE },
25+
{ Commands.GET_ALL_PLUGINS_REQUEST, Commands.GET_ALL_PLUGINS_RESPONSE },
26+
{ Commands.ENABLE_PLUGIN_REQUEST, Commands.ENABLE_PLUGIN_RESPONSE },
27+
{ Commands.DISABLE_PLUGIN_REQUEST, Commands.DISABLE_PLUGIN_RESPONSE }
2528
};
2629
}
2730
}

src/EventMesh/FaasNet.EventMesh.Client/EventMeshClient.cs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,19 +159,61 @@ public EventMeshClient(string url = Constants.DefaultUrl, int port = Constants.D
159159
}
160160
}
161161

162-
internal static void EnsureSuccessStatus(Package packageRequest, Package packageResponse)
162+
public async Task<ICollection<PluginResponse>> GetAllPlugins(CancellationToken cancellationToken = default(CancellationToken))
163163
{
164-
if (packageResponse.Header.Status != HeaderStatus.SUCCESS)
164+
using (var udpClient = new UdpClient())
165165
{
166-
throw new RuntimeClientResponseException(packageResponse.Header.Status, packageResponse.Header.Error);
166+
var writeCtx = new WriteBufferContext();
167+
var package = PackageRequestBuilder.GetAllPlugins();
168+
package.Serialize(writeCtx);
169+
var payload = writeCtx.Buffer.ToArray();
170+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
171+
var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken);
172+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
173+
var packageResult = Package.Deserialize(readCtx);
174+
EnsureSuccessStatus(package, packageResult);
175+
return (packageResult as GetAllPluginsResponse).Plugins;
167176
}
177+
}
168178

169-
if (packageRequest.Header.Seq != packageRequest.Header.Seq)
179+
public async Task EnablePlugin(string pluginName, CancellationToken cancellationToken = default(CancellationToken))
180+
{
181+
using (var udpClient = new UdpClient())
170182
{
171-
throw new RuntimeClientResponseException(HeaderStatus.FAIL, Errors.INVALID_SEQ, "the seq in the request doesn't match the seq in the response");
183+
var writeCtx = new WriteBufferContext();
184+
var package = PackageRequestBuilder.EnablePlugin(pluginName);
185+
package.Serialize(writeCtx);
186+
var payload = writeCtx.Buffer.ToArray();
187+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
188+
var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken);
189+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
190+
var packageResult = Package.Deserialize(readCtx);
191+
EnsureSuccessStatus(package, packageResult);
172192
}
173193
}
174194

195+
public async Task DisablePlugin(string pluginName, CancellationToken cancellationToken = default(CancellationToken))
196+
{
197+
using (var udpClient = new UdpClient())
198+
{
199+
var writeCtx = new WriteBufferContext();
200+
var package = PackageRequestBuilder.DisablePlugin(pluginName);
201+
package.Serialize(writeCtx);
202+
var payload = writeCtx.Buffer.ToArray();
203+
await udpClient.SendAsync(payload, payload.Count(), new IPEndPoint(_ipAddr, _port)).WithCancellation(cancellationToken);
204+
var resultPayload = await udpClient.ReceiveAsync().WithCancellation(cancellationToken);
205+
var readCtx = new ReadBufferContext(resultPayload.Buffer);
206+
var packageResult = Package.Deserialize(readCtx);
207+
EnsureSuccessStatus(package, packageResult);
208+
}
209+
}
210+
211+
internal static void EnsureSuccessStatus(Package packageRequest, Package packageResponse)
212+
{
213+
if (packageResponse.Header.Status != HeaderStatus.SUCCESS) throw new RuntimeClientResponseException(packageResponse.Header.Status, packageResponse.Header.Error);
214+
if (packageRequest.Header.Seq != packageRequest.Header.Seq) throw new RuntimeClientResponseException(HeaderStatus.FAIL, Errors.INVALID_SEQ, "the seq in the request doesn't match the seq in the response");
215+
}
216+
175217
private async Task<HelloResponse> CreateSession(string vpn, string clientId, UserAgentPurpose purpose, TimeSpan? expirationTime = null, bool isInfinite = false, CancellationToken cancellationToken = default(CancellationToken))
176218
{
177219
var userAgent = new UserAgent { ClientId = clientId, Vpn = vpn, Purpose = purpose, Expiration = expirationTime, IsSessionInfinite = isInfinite };

src/EventMesh/FaasNet.EventMesh.Client/Messages/Commands.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,31 @@ public class Commands : IEquatable<Commands>
102102
/// Result returned when VPN are returned.
103103
/// </summary>
104104
public static Commands GET_ALL_BRIDGE_VPN_RESPONSE = new Commands(23, "GET_ALL_BRIDGE_VPN_RESPONSE");
105+
/// <summary>
106+
/// Request sent to get all the plugins.
107+
/// </summary>
108+
public static Commands GET_ALL_PLUGINS_REQUEST = new Commands(24, "GET_ALL_PLUGINS_REQUEST");
109+
/// <summary>
110+
/// Results returned when plugins are returned.
111+
/// </summary>
112+
public static Commands GET_ALL_PLUGINS_RESPONSE = new Commands(25, "GET_ALL_PLUGINS_RESPONSE");
113+
/// <summary>
114+
/// Request sent to enable a plugin.
115+
/// </summary>
116+
public static Commands ENABLE_PLUGIN_REQUEST = new Commands(26, "ENABLE_PLUGIN_REQUEST");
117+
/// <summary>
118+
/// Result returned when plugin is enabled.
119+
/// </summary>
120+
public static Commands ENABLE_PLUGIN_RESPONSE = new Commands(27, "ENABLE_PLUGIN_RESPONSE");
121+
/// <summary>
122+
/// Request sent to disable a plugin.
123+
/// </summary>
124+
public static Commands DISABLE_PLUGIN_REQUEST = new Commands(28, "DISABLE_PLUGIN_REQUEST");
125+
/// <summary>
126+
/// Result returned when a plugin is disabled.
127+
/// </summary>
128+
public static Commands DISABLE_PLUGIN_RESPONSE = new Commands(29, "DISABLE_PLUGIN_RESPONSE");
129+
105130

106131
private Commands(int code)
107132
{
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
3+
namespace FaasNet.EventMesh.Client.Messages
4+
{
5+
internal class DisablePluginRequest : Package
6+
{
7+
public string PluginName { get; set; }
8+
9+
public override void Serialize(WriteBufferContext context)
10+
{
11+
base.Serialize(context);
12+
context.WriteString(PluginName);
13+
}
14+
15+
public void Extract(ReadBufferContext context)
16+
{
17+
PluginName = context.NextString();
18+
}
19+
}
20+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
3+
namespace FaasNet.EventMesh.Client.Messages
4+
{
5+
public class EnablePluginRequest: Package
6+
{
7+
public string PluginName { get; set; }
8+
9+
public override void Serialize(WriteBufferContext context)
10+
{
11+
base.Serialize(context);
12+
context.WriteString(PluginName);
13+
}
14+
15+
public void Extract(ReadBufferContext context)
16+
{
17+
PluginName = context.NextString();
18+
}
19+
}
20+
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/Errors.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class Errors : IEquatable<Errors>
2020
public static Errors UNKNOWN_VPN = new Errors("UNKNOWN_VPN");
2121
public static Errors UNKNOWN_SOURCE_VPN = new Errors("UNKNOWN_SOURCE_VPN");
2222
public static Errors UNKNOWN_TARGET_VPN = new Errors("UNKNOWN_TARGET_VPN");
23+
public static Errors UNKNOWN_PLUGIN = new Errors("UNKNOWN_PLUGIN");
2324
public static Errors UNAUTHORIZED_PUBLISH = new Errors("UNAUTHORIZED_PUBLISH");
2425
public static Errors UNAUTHORIZED_SUBSCRIBE = new Errors("UNAUTHORIZED_SUBSCRIBE");
2526
public static Errors SESSION_LIFETIME_TOOLONG = new Errors("SESSION_LIFETIME_TOOLONG");
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace FaasNet.EventMesh.Client.Messages
2+
{
3+
public class GetAllPluginsRequest : Package
4+
{
5+
}
6+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using FaasNet.RaftConsensus.Client.Messages;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
5+
namespace FaasNet.EventMesh.Client.Messages
6+
{
7+
public class GetAllPluginsResponse : Package
8+
{
9+
public GetAllPluginsResponse()
10+
{
11+
Plugins = new List<PluginResponse>();
12+
}
13+
14+
public ICollection<PluginResponse> Plugins { get; set; }
15+
16+
public override void Serialize(WriteBufferContext context)
17+
{
18+
base.Serialize(context);
19+
context.WriteInteger(Plugins.Count());
20+
foreach(var plugin in Plugins) plugin.Serialize(context);
21+
}
22+
23+
public void Extract(ReadBufferContext context)
24+
{
25+
var plugins = new List<PluginResponse>();
26+
var nb = context.NextInt();
27+
for (int i = 0; i < nb; i++) PluginResponse.Extract(context);
28+
Plugins = plugins;
29+
}
30+
}
31+
32+
public class PluginResponse
33+
{
34+
public string Name { get; set; }
35+
public string Description { get; set; }
36+
public bool IsActive { get; set; }
37+
38+
public void Serialize(WriteBufferContext context)
39+
{
40+
context.WriteString(Name);
41+
context.WriteString(Description);
42+
context.WriteBoolean(IsActive);
43+
}
44+
45+
public static PluginResponse Extract(ReadBufferContext context)
46+
{
47+
return new PluginResponse
48+
{
49+
Name = context.NextString(),
50+
Description = context.NextString(),
51+
IsActive = context.NextBoolean()
52+
};
53+
}
54+
}
55+
}

src/EventMesh/FaasNet.EventMesh.Client/Messages/Package.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,27 @@ public static Package Deserialize(ReadBufferContext context)
155155
return result;
156156
}
157157

158+
if(Commands.GET_ALL_PLUGINS_RESPONSE == header.Command)
159+
{
160+
var result = new GetAllPluginsResponse { Header = header };
161+
result.Extract(context);
162+
return result;
163+
}
164+
165+
if(Commands.ENABLE_PLUGIN_REQUEST == header.Command)
166+
{
167+
var result = new EnablePluginRequest { Header = header };
168+
result.Extract(context);
169+
return result;
170+
}
171+
172+
if (Commands.DISABLE_PLUGIN_REQUEST == header.Command)
173+
{
174+
var result = new DisablePluginRequest { Header = header };
175+
result.Extract(context);
176+
return result;
177+
}
178+
158179
return new Package
159180
{
160181
Header = header

0 commit comments

Comments
 (0)