Skip to content
This repository was archived by the owner on Nov 8, 2020. It is now read-only.

Commit 77ba4c8

Browse files
committed
Add reset method of grain.
1 parent 4cfdedb commit 77ba4c8

6 files changed

Lines changed: 34 additions & 1 deletion

File tree

src/Ray.Core.Abstractions/Observer/IObserver.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,10 @@ public interface IObserver : IVersion
88
{
99
Task OnNext(Immutable<byte[]> bytes);
1010
Task OnNext(Immutable<List<byte[]>> items);
11+
/// <summary>
12+
/// 重置状态
13+
/// </summary>
14+
/// <returns></returns>
15+
Task Reset();
1116
}
1217
}

src/Ray.Core/Core/Abstractions/IObserverUnit.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ public interface IObserverUnit<PrimaryKey> : IGrainID
1212
/// <returns></returns>
1313
List<string> GetGroups();
1414
Task<long[]> GetAndSaveVersion(PrimaryKey primaryKey, long srcVersion);
15+
/// <summary>
16+
/// 重置Grain
17+
/// </summary>
18+
/// <param name="primaryKey">重置Grain</param>
19+
/// <returns></returns>
20+
Task Reset(PrimaryKey primaryKey);
1521
List<Func<byte[], Task>> GetEventHandlers(string observerGroup);
1622
List<Func<byte[], Task>> GetAllEventHandlers();
1723
List<Func<List<byte[]>, Task>> GetBatchEventHandlers(string observerGroup);

src/Ray.Core/Core/Grains/ObserverGrain.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,5 +667,11 @@ protected virtual async ValueTask SaveSnapshotAsync(bool force = false)
667667
}
668668
}
669669
}
670+
public virtual async Task Reset()
671+
{
672+
await ObserverSnapshotStorage.Delete(GrainId);
673+
UnprocessedEventList.Clear();
674+
await ReadSnapshotAsync();
675+
}
670676
}
671677
}

src/Ray.Core/Core/Grains/RayGrain.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,6 @@ protected async Task Over(OverType overType)
395395
{
396396
await ArchiveStorage.Over(Snapshot.Base.StateId, true);
397397
}
398-
399398
}
400399
private async Task DeleteArchive(string briefId)
401400
{
@@ -443,6 +442,12 @@ protected async ValueTask DeleteSnapshot()
443442
SnapshotEventVersion = 0;
444443
}
445444
}
445+
protected async Task Reset()
446+
{
447+
await Over(OverType.DeleteAll);
448+
await ReadSnapshotAsync();
449+
await ObserverUnit.Reset(Snapshot.Base.StateId);
450+
}
446451
protected virtual async Task<bool> RaiseEvent(IEvent @event, EventUID eUID = null)
447452
{
448453
if (Snapshot.Base.IsOver)

src/Ray.Core/Core/Grains/ShadowGrain.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,5 +343,9 @@ protected virtual ValueTask OnEventDelivered(FullyEvent<PrimaryKey> @event)
343343
}
344344
return Consts.ValueTaskDone;
345345
}
346+
public virtual Task Reset()
347+
{
348+
return ReadSnapshotAsync();
349+
}
346350
}
347351
}

src/Ray.Core/Core/Observer/ObserverUnit.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class ObserverUnit<PrimaryKey> : IObserverUnit<PrimaryKey>
2626
readonly List<Func<byte[], Task>> eventHandlers = new List<Func<byte[], Task>>();
2727
readonly List<Func<List<byte[]>, Task>> batchEventHandlers = new List<Func<List<byte[]>, Task>>();
2828
readonly List<Func<PrimaryKey, long, Task<long>>> observerVersionHandlers = new List<Func<PrimaryKey, long, Task<long>>>();
29+
readonly List<Func<PrimaryKey, Task>> observerResetHandlers = new List<Func<PrimaryKey, Task>>();
2930
protected ILogger Logger { get; private set; }
3031
public Type GrainType { get; }
3132

@@ -46,6 +47,11 @@ public Task<long[]> GetAndSaveVersion(PrimaryKey primaryKey, long srcVersion)
4647
{
4748
return Task.WhenAll(observerVersionHandlers.Select(func => func(primaryKey, srcVersion)));
4849
}
50+
51+
public Task Reset(PrimaryKey primaryKey)
52+
{
53+
return Task.WhenAll(observerResetHandlers.Select(func => func(primaryKey)));
54+
}
4955
public List<string> GetGroups() => eventHandlerGroups.Keys.ToList();
5056
public List<Func<byte[], Task>> GetAllEventHandlers()
5157
{
@@ -143,6 +149,7 @@ public void Observer(string group, Type observerType)
143149
eventHandlers.Add(EventHandler);
144150
batchEventHandlers.Add(BatchEventHandler);
145151
observerVersionHandlers.Add((actorId, version) => GetObserver(observerType, actorId).GetAndSaveVersion(version));
152+
observerResetHandlers.Add((actorId) => GetObserver(observerType, actorId).Reset());
146153
//内部函数
147154
Task EventHandler(byte[] bytes)
148155
{

0 commit comments

Comments
 (0)