Skip to content

Commit 081eeb8

Browse files
Ticket #104 : support rocksdb
1 parent bf755bf commit 081eeb8

4 files changed

Lines changed: 67 additions & 63 deletions

File tree

src/EventMesh/FaasNet.EventMesh.Runtime.Console/Program.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
using FaasNet.RaftConsensus.Client;
55
using FaasNet.RaftConsensus.Core;
66
using FaasNet.RaftConsensus.Core.Models;
7+
using FaasNet.RaftConsensus.Core.Stores;
78
using Microsoft.Extensions.DependencyInjection;
9+
using Microsoft.Extensions.Logging;
810
using System.Collections.Concurrent;
911

1012
int seedPort = 4000;
@@ -172,19 +174,17 @@ await session.PersistedSubscribe("person.created", groupId, (ce) =>
172174
INodeHost BuildNodeHost(int port, bool isSeed = false)
173175
{
174176
var serverBuilder = new ServiceCollection()
175-
.AddEventMeshServer(o => o.Port = port);
176-
// .UseRocksDB(o => { o.SubPath = $"node{port}"; });
177-
if (isSeed) serverBuilder.SetNodeStates(new ConcurrentBag<NodeState> { new ClusterNode { Port = 4000, Url = "localhost" }.ToNodeState() });
177+
.AddEventMeshServer(o => o.Port = port)
178+
.UseRocksDB(o => { o.SubPath = $"node{port}"; });
179+
// if (isSeed) serverBuilder.SetNodeStates(new ConcurrentBag<NodeState> { new ClusterNode { Port = 4000, Url = "localhost" }.ToNodeState() });
178180
var serviceProvider = serverBuilder.Services
179181
// .AddLogging(l => l.AddConsole())
180182
.BuildServiceProvider();
181-
/*
182183
if(isSeed)
183184
{
184185
var nodeStateStore = serviceProvider.GetRequiredService<INodeStateStore>();
185186
nodeStateStore.Add(new ClusterNode { Port = 4000, Url = "localhost" }.ToNodeState());
186187
}
187-
*/
188188

189189
return serviceProvider.GetRequiredService<INodeHost>();
190190
}

src/RaftConsensus/FaasNet.RaftConsensus.Core/BaseNodeHost.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ private async Task<bool> TryHandleGossipRequest(UdpReceiveResult udpResult)
181181
if (gossipPackage == null) return false;
182182
var packageResult = await HandleGossipRequest(gossipPackage);
183183
if(packageResult == null) return true;
184-
using(var client = new GossipClient(gossipPackage.Header.SourceUrl, gossipPackage.Header.SourcePort))
184+
using (var client = new GossipClient(gossipPackage.Header.SourceUrl, gossipPackage.Header.SourcePort))
185185
{
186186
await client.Send(packageResult, cancellationToken: TokenSource.Token);
187187
}
@@ -230,7 +230,6 @@ private async Task BroadcastGossipHeartbeat(object source, ElapsedEventArgs e)
230230
else clusterNode.ReactivationDateTime = DateTime.UtcNow.AddMilliseconds(_options.GossipClusterNodeDeactivationDurationMS);
231231
}
232232
}
233-
234233
StartGossipTimer();
235234
}
236235

src/RaftConsensus/FaasNet.RaftConsensus.RocksDB/RockDBNodeStateStore.cs

Lines changed: 39 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ public class RockDBNodeStateStore : INodeStateStore
1717
{
1818
private static SemaphoreSlim _lock = new SemaphoreSlim(1);
1919
private readonly RaftConsensusRocksDBOptions _options;
20+
private readonly RocksDBConnectionPool _connectionPool;
2021

2122
public RockDBNodeStateStore(IOptions<RaftConsensusRocksDBOptions> options)
2223
{
2324
_options = options.Value;
25+
_connectionPool = new RocksDBConnectionPool();
2426
}
2527

2628
public void Add(NodeState nodeState)
@@ -39,11 +41,11 @@ public void Update(NodeState nodeState)
3941

4042
public async Task<IEnumerable<NodeState>> GetAllLastEntityTypes(CancellationToken cancellationToken)
4143
{
42-
_lock.Wait();
44+
await _lock.WaitAsync(cancellationToken);
4345
var result = new List<NodeState>();
44-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByTypeValue()))
46+
var db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByTypeValue());
47+
using (var iterator = db.NewIterator())
4548
{
46-
var iterator = db.NewIterator();
4749
iterator.SeekToFirst();
4850
while (iterator.Valid())
4951
{
@@ -59,11 +61,11 @@ public async Task<IEnumerable<NodeState>> GetAllLastEntityTypes(CancellationToke
5961

6062
public async Task<IEnumerable<NodeState>> GetAllLastEntityTypes(string entityType, CancellationToken cancellationToken)
6163
{
62-
_lock.Wait();
64+
await _lock.WaitAsync(cancellationToken);
6365
var result = new List<NodeState>();
64-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByTypeVersion(entityType)))
66+
var db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByTypeVersion(entityType));
67+
using (var iterator = db.NewIterator())
6568
{
66-
var iterator = db.NewIterator();
6769
iterator.SeekToFirst();
6870
while (iterator.Valid())
6971
{
@@ -83,47 +85,41 @@ public async Task<NodeState> GetSpecificEntityType(string entityType, int entity
8385
return entityTypes.FirstOrDefault();
8486
}
8587

86-
public Task<IEnumerable<NodeState>> GetAllSpecificEntityTypes(List<(string EntityType, int EntityVersion)> parameter, CancellationToken cancellationToken)
88+
public async Task<IEnumerable<NodeState>> GetAllSpecificEntityTypes(List<(string EntityType, int EntityVersion)> parameter, CancellationToken cancellationToken)
8789
{
88-
_lock.Wait();
90+
await _lock.WaitAsync(cancellationToken);
8991
var result = new List<NodeState>();
90-
foreach(var grp in parameter.GroupBy(k => k.EntityType))
92+
foreach (var grp in parameter.GroupBy(k => k.EntityType))
9193
{
92-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByHistoryTypeVersionValue(grp.Key)))
94+
var db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByHistoryTypeVersionValue(grp.Key));
95+
foreach (var record in grp)
9396
{
94-
foreach (var record in grp)
95-
{
96-
var value = db.Get(record.EntityVersion.ToString());
97-
if (string.IsNullOrWhiteSpace(value)) continue;
98-
result.Add(JsonSerializer.Deserialize<NodeState>(value, new JsonSerializerOptions { PropertyNameCaseInsensitive = true }));
99-
}
97+
var value = db.Get(record.EntityVersion.ToString());
98+
if (string.IsNullOrWhiteSpace(value)) continue;
99+
result.Add(JsonSerializer.Deserialize<NodeState>(value, new JsonSerializerOptions { PropertyNameCaseInsensitive = true }));
100100
}
101101
}
102102

103103
_lock.Release();
104-
return Task.FromResult((IEnumerable<NodeState>)result);
104+
return result;
105105
}
106106

107-
public Task<NodeState> GetLastEntityId(string entityId, CancellationToken cancellationToken)
107+
public async Task<NodeState> GetLastEntityId(string entityId, CancellationToken cancellationToken)
108108
{
109-
_lock.Wait();
110-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByLastIdValue()))
111-
{
112-
var value = db.Get(entityId);
113-
_lock.Release();
114-
return Parse(value);
115-
}
109+
await _lock.WaitAsync(cancellationToken);
110+
var db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByLastIdValue());
111+
var value = db.Get(entityId);
112+
_lock.Release();
113+
return await Parse(value);
116114
}
117115

118-
public Task<NodeState> GetLastEntityType(string entityType, CancellationToken cancellationToken)
116+
public async Task<NodeState> GetLastEntityType(string entityType, CancellationToken cancellationToken)
119117
{
120-
_lock.Wait();
121-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByTypeValue()))
122-
{
123-
var value = db.Get(entityType);
124-
_lock.Release();
125-
return Parse(value);
126-
}
118+
await _lock.WaitAsync(cancellationToken);
119+
var db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByTypeValue());
120+
var value = db.Get(entityType);
121+
_lock.Release();
122+
return await Parse(value);
127123
}
128124

129125
private DbOptions BuildOptions()
@@ -141,33 +137,20 @@ private static Task<NodeState> Parse(string value)
141137

142138
private void StoreEntityId(NodeState nodeState, string json)
143139
{
144-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByLastIdValue()))
145-
{
146-
db.Put(nodeState.EntityId, json);
147-
}
148-
149-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByLastVersionValue()))
150-
{
151-
db.Put(nodeState.EntityId, nodeState.EntityVersion.ToString());
152-
}
140+
var db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByLastIdValue());
141+
db.Put(nodeState.EntityId, json);
142+
db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByLastVersionValue());
143+
db.Put(nodeState.EntityId, nodeState.EntityVersion.ToString());
153144
}
154145

155146
private void StoreEntityType(NodeState nodeState, string json)
156147
{
157-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByTypeValue()))
158-
{
159-
db.Put(nodeState.EntityType, json);
160-
}
161-
162-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByHistoryTypeVersionValue(nodeState.EntityType)))
163-
{
164-
db.Put(nodeState.EntityVersion.ToString(), json);
165-
}
166-
167-
using (var db = RocksDb.Open(BuildOptions(), GetFolderPathByTypeVersion(nodeState.EntityType)))
168-
{
169-
db.Put(nodeState.EntityId, json);
170-
}
148+
var db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByTypeValue());
149+
db.Put(nodeState.EntityType, json);
150+
db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByHistoryTypeVersionValue(nodeState.EntityType));
151+
db.Put(nodeState.EntityVersion.ToString(), json);
152+
db = _connectionPool.GetConnection(BuildOptions(), GetFolderPathByTypeVersion(nodeState.EntityType));
153+
db.Put(nodeState.EntityId, json);
171154
}
172155

173156
private string GetFolderPathByLastIdValue()
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using RocksDbSharp;
2+
using System.Collections.Generic;
3+
4+
namespace FaasNet.RaftConsensus.RocksDB
5+
{
6+
public class RocksDBConnectionPool
7+
{
8+
private static object _lock = new object();
9+
private static Dictionary<string, RocksDb> _dic = new Dictionary<string, RocksDb>();
10+
11+
public RocksDb GetConnection(DbOptions options, string path)
12+
{
13+
lock(_lock)
14+
{
15+
if (_dic.ContainsKey(path)) return _dic[path];
16+
var db = RocksDb.Open(options, path);
17+
_dic.Add(path, db);
18+
return db;
19+
}
20+
}
21+
}
22+
}

0 commit comments

Comments
 (0)