Skip to content

Commit b002552

Browse files
Ticket #9 : Start to manage scheduled job
1 parent 65f40ff commit b002552

20 files changed

Lines changed: 312 additions & 36 deletions

src/CaseManagement.CMMN/CaseInstance/CommandHandlers/ConfirmFormCommandHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public ConfirmFormCommandHandler(IQueueProvider queueProvider, IFormQueryReposit
2525

2626
public async Task<bool> Handle(ConfirmFormCommand confirmFormCommand)
2727
{
28-
var caseInstance = await _eventStoreRepository.GetLastAggregate<ProcessFlowInstance>(confirmFormCommand.CaseInstanceId, ProcessFlowInstance.GetStreamName(confirmFormCommand.CaseInstanceId));
28+
var caseInstance = await _eventStoreRepository.GetLastAggregate<CMMNProcessFlowInstance>(confirmFormCommand.CaseInstanceId, CMMNProcessFlowInstance.GetCMMNStreamName(confirmFormCommand.CaseInstanceId));
2929
if (caseInstance == null || string.IsNullOrWhiteSpace(caseInstance.Id))
3030
{
3131
throw new UnknownCaseInstanceException(confirmFormCommand.CaseInstanceId);

src/CaseManagement.CMMN/CaseInstance/Processors/CMMNTimerEventListenerProcessor.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
using CaseManagement.CMMN.Extensions;
33
using CaseManagement.Workflow.Domains;
44
using CaseManagement.Workflow.Engine;
5+
using CaseManagement.Workflow.Infrastructure.Scheduler;
56
using CaseManagement.Workflow.ISO8601;
67
using CaseManagement.Workflow.Persistence;
78
using Hangfire;
89
using System;
910
using System.Linq;
1011
using System.Threading;
1112
using System.Threading.Tasks;
13+
using CaseManagement.CMMN.Infrastructures.Scheduler;
1214

1315
namespace CaseManagement.CMMN.CaseInstance.Processors
1416
{
@@ -17,21 +19,22 @@ public class CMMNTimerEventListenerProcessor : IProcessFlowElementProcessor
1719
private readonly IServiceProvider _serviceProvider;
1820
private readonly IProcessFlowInstanceQueryRepository _processFlowInstanceQueryRepository;
1921
private readonly IProcessFlowInstanceCommandRepository _processFlowInstanceCommandRepository;
22+
private readonly IScheduleJobStore _scheduleJobStore;
2023
private readonly IBackgroundJobClient _backgroundJobClient;
2124

22-
public CMMNTimerEventListenerProcessor(IServiceProvider serviceProvider, IProcessFlowInstanceQueryRepository processFlowInstanceQueryRepository, IProcessFlowInstanceCommandRepository processFlowInstanceCommandRepository, IBackgroundJobClient backgroundJobClient)
25+
public CMMNTimerEventListenerProcessor(IServiceProvider serviceProvider, IProcessFlowInstanceQueryRepository processFlowInstanceQueryRepository, IProcessFlowInstanceCommandRepository processFlowInstanceCommandRepository, IScheduleJobStore schedulerJobStore, IBackgroundJobClient backgroundJobClient)
2326
{
2427
_serviceProvider = serviceProvider;
2528
_processFlowInstanceQueryRepository = processFlowInstanceQueryRepository;
2629
_processFlowInstanceCommandRepository = processFlowInstanceCommandRepository;
30+
_scheduleJobStore = schedulerJobStore;
2731
_backgroundJobClient = backgroundJobClient;
2832
}
2933

3034
public string ProcessFlowElementType => Enum.GetName(typeof(CMMNPlanItemDefinitionTypes), CMMNPlanItemDefinitionTypes.TimerEventListener).ToLowerInvariant();
3135

3236
public Task Handle(WorkflowHandlerContext context, CancellationToken token)
3337
{
34-
context.Start();
3538
var pf = context.ProcessFlowInstance;
3639
var planItem = context.GetCMMNPlanItem();
3740
var timerEventListener = planItem.PlanItemDefinitionTimerEventListener;
@@ -54,15 +57,16 @@ public Task Handle(WorkflowHandlerContext context, CancellationToken token)
5457
for(var i = 0; i < repeatingInterval.RecurringTimeInterval; i++)
5558
{
5659
currentDateTime = currentDateTime.Add(newTimespan);
57-
_backgroundJobClient.Schedule(() => HandleListener(pf.Id, planItem.Id), currentDateTime);
60+
_scheduleJobStore.ScheduleJob(new TimerEventMessage { ProcessId = pf.Id, ElementId = planItem.Id}, currentDateTime);
5861
}
5962
}
6063
}
6164

6265
if (time != null && currentDateTime < time.Value)
6366
{
6467
pf.CreatePlanItem(planItem);
65-
_backgroundJobClient.Schedule(() => HandleListener(pf.Id, planItem.Id), time.Value);
68+
_scheduleJobStore.ScheduleJob(new TimerEventMessage { ProcessId = pf.Id, ElementId = planItem.Id }, currentDateTime);
69+
// _backgroundJobClient.Schedule(() => HandleListener(pf.Id, planItem.Id), time.Value);
6670
}
6771

6872
return Task.FromResult(0);
Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,89 @@
1-
namespace CaseManagement.CMMN.Infrastructures.Scheduler
1+
using CaseManagement.CMMN.Domains;
2+
using CaseManagement.Workflow.Domains;
3+
using CaseManagement.Workflow.Engine;
4+
using CaseManagement.Workflow.Infrastructure;
5+
using CaseManagement.Workflow.Infrastructure.EvtStore;
6+
using CaseManagement.Workflow.Infrastructure.Scheduler;
7+
using CaseManagement.Workflow.ISO8601;
8+
using System.Collections.Generic;
9+
using System.Linq;
10+
using System.Threading;
11+
using System.Threading.Tasks;
12+
13+
namespace CaseManagement.CMMN.Infrastructures.Scheduler
214
{
3-
public class CMMNTimerEventHandler
15+
public class CMMNTimerEventHandler : IScheduleJobHandler<TimerEventMessage>
416
{
17+
private readonly IProcessFlowElementProcessorFactory _factory;
18+
private readonly ICommitAggregateHelper _commitAggregateHelper;
19+
private readonly IEventStoreRepository _eventStoreRepository;
20+
21+
public CMMNTimerEventHandler(IProcessFlowElementProcessorFactory factory, ICommitAggregateHelper commitAggregateHelper, IEventStoreRepository eventStoreRepository)
22+
{
23+
_factory = factory;
24+
_commitAggregateHelper = commitAggregateHelper;
25+
_eventStoreRepository = eventStoreRepository;
26+
}
27+
28+
public async Task Handle(TimerEventMessage message, CancellationToken token)
29+
{
30+
var flowInstance = await _eventStoreRepository.GetLastAggregate<ProcessFlowInstance>(message.ProcessId, ProcessFlowInstance.GetStreamName(message.ProcessId));
31+
if (flowInstance == null)
32+
{
33+
return;
34+
}
35+
36+
try
37+
{
38+
var tasks = new List<Task>();
39+
var element = flowInstance.Elements.First(e => e.Id == message.ElementId) as CMMNPlanItem;
40+
flowInstance.StartElement(element);
41+
flowInstance.OccurPlanItem(element);
42+
foreach(var nextElt in flowInstance.NextElements(element.Id))
43+
{
44+
var context = new WorkflowHandlerContext(flowInstance, nextElt, _factory);
45+
tasks.Add(Task.Run(() => Start(context, token)));
46+
}
47+
48+
await Task.WhenAll(tasks);
49+
flowInstance.CompleteElement(element);
50+
var nbOccures = element.TransitionHistories.Where(t => t.Transition == CMMNPlanItemTransitions.Occur).Count();
51+
var timerEventListener = element.PlanItemDefinitionTimerEventListener;
52+
var repeatingInterval = ISO8601Parser.ParseRepeatingTimeInterval(timerEventListener.TimerExpression.Body);
53+
var time = ISO8601Parser.ParseTime(timerEventListener.TimerExpression.Body);
54+
if ((repeatingInterval.RecurringTimeInterval == nbOccures || time != null) && flowInstance.IsFinished())
55+
{
56+
if (flowInstance.IsFinished())
57+
{
58+
flowInstance.Complete();
59+
}
60+
}
61+
}
62+
finally
63+
{
64+
await _commitAggregateHelper.Commit(flowInstance, flowInstance.GetStreamName());
65+
}
66+
}
67+
68+
private async Task Start(WorkflowHandlerContext context, CancellationToken token)
69+
{
70+
var processor = _factory.Build(context.CurrentElement);
71+
await processor.Handle(context, token);
72+
if (context.CurrentElement.Status != ProcessFlowInstanceElementStatus.Finished)
73+
{
74+
return;
75+
}
76+
77+
var nextElts = context.ProcessFlowInstance.NextElements(context.CurrentElement.Id);
78+
if (!nextElts.Any())
79+
{
80+
return;
81+
}
82+
83+
foreach (var nextElt in nextElts)
84+
{
85+
await Start(context, token);
86+
}
87+
}
588
}
689
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using CaseManagement.Workflow.Infrastructure.Scheduler;
2+
3+
namespace CaseManagement.CMMN.Infrastructures.Scheduler
4+
{
5+
public class TimerEventMessage : JobMessage
6+
{
7+
public string ProcessId { get; set; }
8+
public string ElementId { get; set; }
9+
}
10+
}

src/CaseManagement.CMMN/ServiceCollectionExtensions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
using CaseManagement.CMMN.Domains.CaseInstance.Events;
99
using CaseManagement.CMMN.Infrastructures;
1010
using CaseManagement.CMMN.Infrastructures.Bus.ConfirmForm;
11+
using CaseManagement.CMMN.Infrastructures.Scheduler;
1112
using CaseManagement.CMMN.Persistence;
1213
using CaseManagement.CMMN.Persistence.InMemory;
1314
using CaseManagement.Workflow.Domains.Events;
1415
using CaseManagement.Workflow.Engine;
1516
using CaseManagement.Workflow.Infrastructure;
1617
using CaseManagement.Workflow.Infrastructure.Bus;
18+
using CaseManagement.Workflow.Infrastructure.Scheduler;
1719
using Hangfire;
1820
using Hangfire.MemoryStorage;
1921
using System.Collections.Generic;
@@ -32,6 +34,7 @@ public static CMMNServerBuilder AddCMMN(this IServiceCollection services)
3234
.AddProcessHandlers()
3335
.AddInfrastructure()
3436
.AddProcessors()
37+
.AddScheduledJobHandlers()
3538
.AddBus();
3639
services.AddHangfire((act) =>
3740
{
@@ -104,5 +107,11 @@ private static IServiceCollection AddProcessHandlers(this IServiceCollection ser
104107
services.AddTransient<ICaseProcessHandler, CaseManagementCallbackProcessHandler>();
105108
return services;
106109
}
110+
111+
private static IServiceCollection AddScheduledJobHandlers(this IServiceCollection services)
112+
{
113+
services.AddTransient<IScheduleJobHandler<TimerEventMessage>, CMMNTimerEventHandler>();
114+
return services;
115+
}
107116
}
108117
}

src/CaseManagement.Workflow/CaseManagement.Workflow.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,7 @@
1212
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.1.0" />
1313
<PackageReference Include="NEventStore" Version="6.0.0" />
1414
</ItemGroup>
15+
<ItemGroup>
16+
<Folder Include="Domains\Scheduler\" />
17+
</ItemGroup>
1518
</Project>

src/CaseManagement.Workflow/Engine/WorkflowEngine.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using CaseManagement.Workflow.Domains;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Threading;
45
using System.Threading.Tasks;
56

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace CaseManagement.Workflow.Infrastructure.Scheduler
5+
{
6+
public interface IScheduleJobHandler<T> where T : JobMessage
7+
{
8+
Task Handle(T message, CancellationToken token);
9+
}
10+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace CaseManagement.Workflow.Infrastructure.Scheduler
5+
{
6+
public interface IScheduleJobStore
7+
{
8+
Task<ScheduleJob> TakeNextJob();
9+
Task ScheduleJob(JobMessage message, DateTime dateTime);
10+
}
11+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using System.Threading.Tasks;
2+
3+
namespace CaseManagement.Workflow.Infrastructure.Scheduler
4+
{
5+
public interface ISchedulerHost
6+
{
7+
Task Start();
8+
Task Stop();
9+
}
10+
}

0 commit comments

Comments
 (0)