Skip to content
This repository was archived by the owner on Nov 8, 2020. It is now read-only.

Commit 8236ddc

Browse files
committed
Support sync all observers on activate.
1 parent bbf33ce commit 8236ddc

8 files changed

Lines changed: 86 additions & 17 deletions

File tree

src/Ray.Core.Abstractions/Observer/IVersion.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ public interface IVersion
66
{
77
Task<long> GetVersion();
88
Task<long> GetAndSaveVersion(long compareVersion);
9+
Task<bool> SyncFromObservable(long compareVersion);
910
}
1011
}

src/Ray.Core/Configuration/CoreOptions.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,9 @@ public class CoreOptions
3131
/// 事务超时时间,单位为ms(默认为30s)
3232
/// </summary>
3333
public int TransactionTimeout { get; set; } = 30 * 1000;
34+
/// <summary>
35+
/// 在激活的时候同步所有观察者
36+
/// </summary>
37+
public bool SyncAllObserversOnActivate { get; set; } = false;
3438
}
3539
}

src/Ray.Core/Core/Abstractions/IObserverUnit.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ namespace Ray.Core.Abstractions
66
{
77
public interface IObserverUnit<PrimaryKey> : IGrainID
88
{
9+
/// <summary>
10+
/// 同步所有观察者
11+
/// </summary>
12+
/// <param name="primaryKey">GrainId</param>
13+
/// <param name="srcVersion">Observable的Version</param>
14+
/// <returns></returns>
15+
Task<bool[]> SyncAllObservers(PrimaryKey primaryKey, long srcVersion);
916
/// <summary>
1017
/// 获取所有监听者分组
1118
/// </summary>

src/Ray.Core/Core/Grains/ObserverGrain.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,7 @@ public override async Task OnActivateAsync()
314314
await ReadSnapshotAsync();
315315
if (FullyActive)
316316
{
317-
while (true)
318-
{
319-
var eventList = await EventStorage.GetList(GrainId, Snapshot.StartTimestamp, Snapshot.Version + 1, Snapshot.Version + ConfigOptions.NumberOfEventsPerRead);
320-
await UnsafeTell(eventList);
321-
if (eventList.Count < ConfigOptions.NumberOfEventsPerRead) break;
322-
};
317+
await RecoveryFromStorage();
323318
}
324319
if (Logger.IsEnabled(LogLevel.Trace))
325320
Logger.LogTrace("Activation completed: {0}->{1}", GrainType.FullName, Serializer.Serialize(Snapshot));
@@ -330,6 +325,19 @@ public override async Task OnActivateAsync()
330325
throw;
331326
}
332327
}
328+
/// <summary>
329+
/// 从库里恢复
330+
/// </summary>
331+
/// <returns></returns>
332+
private async Task RecoveryFromStorage()
333+
{
334+
while (true)
335+
{
336+
var eventList = await EventStorage.GetList(GrainId, Snapshot.StartTimestamp, Snapshot.Version + 1, Snapshot.Version + ConfigOptions.NumberOfEventsPerRead);
337+
await UnsafeTell(eventList);
338+
if (eventList.Count < ConfigOptions.NumberOfEventsPerRead) break;
339+
};
340+
}
333341
public override Task OnDeactivateAsync()
334342
{
335343
var needSaveSnap = Snapshot.Version - SnapshotEventVersion >= 1;
@@ -515,6 +523,14 @@ public async Task<long> GetAndSaveVersion(long compareVersion)
515523
}
516524
return Snapshot.Version;
517525
}
526+
public async Task<bool> SyncFromObservable(long compareVersion)
527+
{
528+
if (Snapshot.Version < compareVersion)
529+
{
530+
await RecoveryFromStorage();
531+
}
532+
return Snapshot.Version == compareVersion;
533+
}
518534
protected async ValueTask Tell(FullyEvent<PrimaryKey> fullyEvent)
519535
{
520536
try

src/Ray.Core/Core/Grains/RayGrain.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,15 @@ public override async Task OnActivateAsync()
194194
};
195195
}
196196
}
197-
var onActivatedTask = OnBaseActivated();
197+
if (CoreOptions.SyncAllObserversOnActivate)
198+
{
199+
var syncResults = await ObserverUnit.SyncAllObservers(GrainId, Snapshot.Base.Version);
200+
if (syncResults.Any(r => !r))
201+
{
202+
throw new SyncAllObserversException(GrainId.ToString(), GrainType);
203+
}
204+
}
205+
var onActivatedTask = OnActivationCompleted();
198206
if (!onActivatedTask.IsCompletedSuccessfully)
199207
await onActivatedTask;
200208
if (Logger.IsEnabled(LogLevel.Trace))
@@ -207,7 +215,7 @@ public override async Task OnActivateAsync()
207215
}
208216
}
209217
[MethodImpl(MethodImplOptions.AggressiveInlining)]
210-
protected virtual ValueTask OnBaseActivated() => Consts.ValueTaskDone;
218+
protected virtual ValueTask OnActivationCompleted() => Consts.ValueTaskDone;
211219
protected virtual async Task RecoverySnapshot()
212220
{
213221
try

src/Ray.Core/Core/Grains/ShadowGrain.cs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,7 @@ public override async Task OnActivateAsync()
138138
await ReadSnapshotAsync();
139139
if (FullyActive)
140140
{
141-
while (true)
142-
{
143-
var eventList = await EventStorage.GetList(GrainId, Snapshot.Base.StartTimestamp, Snapshot.Base.Version + 1, Snapshot.Base.Version + NumberOfEventsPerRead);
144-
var task = Tell(eventList);
145-
if (!task.IsCompletedSuccessfully)
146-
await task;
147-
if (eventList.Count < NumberOfEventsPerRead) break;
148-
};
141+
await RecoveryFromStorage();
149142
}
150143
if (Logger.IsEnabled(LogLevel.Trace))
151144
Logger.LogTrace("Activation completed: {0}->{1}", GrainType.FullName, Serializer.Serialize(Snapshot));
@@ -156,6 +149,21 @@ public override async Task OnActivateAsync()
156149
throw;
157150
}
158151
}
152+
/// <summary>
153+
/// 从库里恢复
154+
/// </summary>
155+
/// <returns></returns>
156+
private async Task RecoveryFromStorage()
157+
{
158+
while (true)
159+
{
160+
var eventList = await EventStorage.GetList(GrainId, Snapshot.Base.StartTimestamp, Snapshot.Base.Version + 1, Snapshot.Base.Version + NumberOfEventsPerRead);
161+
var task = Tell(eventList);
162+
if (!task.IsCompletedSuccessfully)
163+
await task;
164+
if (eventList.Count < NumberOfEventsPerRead) break;
165+
};
166+
}
159167
protected virtual async ValueTask Tell(IEnumerable<FullyEvent<PrimaryKey>> eventList)
160168
{
161169
foreach (var @event in eventList)
@@ -298,6 +306,14 @@ public Task<long> GetAndSaveVersion(long compareVersion)
298306
{
299307
return Task.FromResult(Snapshot.Base.Version);
300308
}
309+
public async Task<bool> SyncFromObservable(long compareVersion)
310+
{
311+
if (Snapshot.Base.Version < compareVersion)
312+
{
313+
await RecoveryFromStorage();
314+
}
315+
return Snapshot.Base.Version == compareVersion;
316+
}
301317
protected async ValueTask Tell(FullyEvent<PrimaryKey> @event)
302318
{
303319
if (@event.Base.Version == Snapshot.Base.Version + 1)

src/Ray.Core/Core/Observer/ObserverUnit.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ObserverUnit<PrimaryKey> : IObserverUnit<PrimaryKey>
2727
readonly List<Func<List<byte[]>, Task>> batchEventHandlers = new List<Func<List<byte[]>, Task>>();
2828
readonly List<Func<PrimaryKey, long, Task<long>>> observerVersionHandlers = new List<Func<PrimaryKey, long, Task<long>>>();
2929
readonly List<Func<PrimaryKey, Task>> observerResetHandlers = new List<Func<PrimaryKey, Task>>();
30+
readonly List<Func<PrimaryKey, long, Task<bool>>> observerSyncHandlers = new List<Func<PrimaryKey, long, Task<bool>>>();
3031
protected ILogger Logger { get; private set; }
3132
public Type GrainType { get; }
3233

@@ -47,7 +48,10 @@ public Task<long[]> GetAndSaveVersion(PrimaryKey primaryKey, long srcVersion)
4748
{
4849
return Task.WhenAll(observerVersionHandlers.Select(func => func(primaryKey, srcVersion)));
4950
}
50-
51+
public Task<bool[]> SyncAllObservers(PrimaryKey primaryKey, long srcVersion)
52+
{
53+
return Task.WhenAll(observerSyncHandlers.Select(func => func(primaryKey, srcVersion)));
54+
}
5155
public Task Reset(PrimaryKey primaryKey)
5256
{
5357
return Task.WhenAll(observerResetHandlers.Select(func => func(primaryKey)));
@@ -149,6 +153,7 @@ public void Observer(string group, Type observerType)
149153
eventHandlers.Add(EventHandler);
150154
batchEventHandlers.Add(BatchEventHandler);
151155
observerVersionHandlers.Add((actorId, version) => GetObserver(observerType, actorId).GetAndSaveVersion(version));
156+
observerSyncHandlers.Add((actorId, version) => GetObserver(observerType, actorId).SyncFromObservable(version));
152157
observerResetHandlers.Add((actorId) => GetObserver(observerType, actorId).Reset());
153158
//内部函数
154159
Task EventHandler(byte[] bytes)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
3+
namespace Ray.Core.Exceptions
4+
{
5+
public class SyncAllObserversException : Exception
6+
{
7+
public SyncAllObserversException(string id, Type grainType) :
8+
base($"Sync all observers failed of Grain type {grainType.FullName} and Id {id}")
9+
{
10+
}
11+
}
12+
}

0 commit comments

Comments
 (0)