Skip to content

Commit 5ee71c0

Browse files
fix: adapt to Orchestration→Workflow proto renames (durabletask-protobuf#33) (#1704)
Update Java type references to match the Workflow terminology rename in dapr/durabletask-protobuf#33. Point proto base URL to JoshVanL's fix-named-reserved branch (durabletask-protobuf#36) which fixes the mixed reserved syntax incompatible with protoc 3.x, and remove the antrun workaround. Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent 0c8e261 commit 5ee71c0

8 files changed

Lines changed: 94 additions & 94 deletions

File tree

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ public void startAndBlock() {
180180
OrchestratorService.WorkItem workItem = workItemStream.next();
181181
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();
182182

183-
if (requestType == OrchestratorService.WorkItem.RequestCase.ORCHESTRATORREQUEST) {
184-
OrchestratorService.OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
183+
if (requestType == OrchestratorService.WorkItem.RequestCase.WORKFLOWREQUEST) {
184+
OrchestratorService.WorkflowRequest orchestratorRequest = workItem.getWorkflowRequest();
185185
logger.log(Level.FINEST,
186186
String.format("Processing orchestrator request for instance: {0}",
187187
orchestratorRequest.getInstanceId()));
@@ -193,7 +193,7 @@ public void startAndBlock() {
193193
logger.log(Level.INFO,
194194
String.format("Processing activity request: %s for instance: %s, gRPC thread context: %s",
195195
activityRequest.getName(),
196-
activityRequest.getOrchestrationInstance().getInstanceId(),
196+
activityRequest.getWorkflowInstance().getInstanceId(),
197197
Context.current()));
198198

199199
this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));

durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationMetadata.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.dapr.durabletask;
1515

16-
import io.dapr.durabletask.implementation.protobuf.Orchestration.OrchestrationState;
16+
import io.dapr.durabletask.implementation.protobuf.Orchestration.WorkflowState;
1717
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
1818

1919
import java.time.Instant;
@@ -45,19 +45,19 @@ public final class OrchestrationMetadata {
4545
OrchestratorService.GetInstanceResponse fetchResponse,
4646
DataConverter dataConverter,
4747
boolean requestedInputsAndOutputs) {
48-
this(fetchResponse.getOrchestrationState(), dataConverter, requestedInputsAndOutputs);
48+
this(fetchResponse.getWorkflowState(), dataConverter, requestedInputsAndOutputs);
4949
}
5050

5151
OrchestrationMetadata(
52-
OrchestrationState state,
52+
WorkflowState state,
5353
DataConverter dataConverter,
5454
boolean requestedInputsAndOutputs) {
5555
this.dataConverter = dataConverter;
5656
this.requestedInputsAndOutputs = requestedInputsAndOutputs;
5757

5858
this.name = state.getName();
5959
this.instanceId = state.getInstanceId();
60-
this.runtimeStatus = OrchestrationRuntimeStatus.fromProtobuf(state.getOrchestrationStatus());
60+
this.runtimeStatus = OrchestrationRuntimeStatus.fromProtobuf(state.getWorkflowStatus());
6161
this.createdAt = DataConverter.getInstantFromTimestamp(state.getCreatedTimestamp());
6262
this.lastUpdatedAt = DataConverter.getInstantFromTimestamp(state.getLastUpdatedTimestamp());
6363
this.serializedInput = state.getInput().getValue();

durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati
127127
throw new IllegalArgumentException("orchestration must not be null");
128128
}
129129

130-
OrchestratorService.OrchestratorRequest orchestratorRequest;
130+
OrchestratorService.WorkflowRequest orchestratorRequest;
131131
try {
132-
orchestratorRequest = OrchestratorService.OrchestratorRequest.parseFrom(orchestratorRequestBytes);
132+
orchestratorRequest = OrchestratorService.WorkflowRequest.parseFrom(orchestratorRequestBytes);
133133
} catch (InvalidProtocolBufferException e) {
134134
throw new IllegalArgumentException("triggerStateProtoBytes was not valid protobuf", e);
135135
}
@@ -170,7 +170,7 @@ public Boolean isLatestVersion() {
170170
orchestratorRequest.getPastEventsList(),
171171
orchestratorRequest.getNewEventsList());
172172

173-
OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder()
173+
OrchestratorService.WorkflowResponse response = OrchestratorService.WorkflowResponse.newBuilder()
174174
.setInstanceId(orchestratorRequest.getInstanceId())
175175
.addAllActions(taskOrchestratorResult.getActions())
176176
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))

durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
142142
private String appId;
143143

144144
// LinkedHashMap to maintain insertion order when returning the list of pending actions
145-
private final Map<Integer, OrchestratorActions.OrchestratorAction> pendingActions = new LinkedHashMap<>();
145+
private final Map<Integer, OrchestratorActions.WorkflowAction> pendingActions = new LinkedHashMap<>();
146146
private final Map<Integer, TaskRecord<?>> openTasks = new HashMap<>();
147147
private final Map<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
148148
private final List<HistoryEvents.HistoryEvent> unprocessedEvents = new LinkedList<>();
@@ -368,7 +368,7 @@ public <V> Task<V> callActivity(
368368

369369
TaskFactory<V> taskFactory = () -> {
370370
int id = this.sequenceNumber++;
371-
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction
371+
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction
372372
.newBuilder()
373373
.setId(id)
374374
.setScheduleTask(scheduleTaskBuilder);
@@ -463,16 +463,16 @@ public void sendEvent(String instanceId, String eventName, Object eventData) {
463463

464464
int id = this.sequenceNumber++;
465465
String serializedEventData = this.dataConverter.serialize(eventData);
466-
Orchestration.OrchestrationInstance.Builder orchestrationInstanceBuilder =
467-
Orchestration.OrchestrationInstance.newBuilder()
466+
Orchestration.WorkflowInstance.Builder orchestrationInstanceBuilder =
467+
Orchestration.WorkflowInstance.newBuilder()
468468
.setInstanceId(instanceId);
469469
OrchestratorActions.SendEventAction.Builder builder = OrchestratorActions
470470
.SendEventAction.newBuilder().setInstance(orchestrationInstanceBuilder)
471471
.setName(eventName);
472472
if (serializedEventData != null) {
473473
builder.setData(StringValue.of(serializedEventData));
474474
}
475-
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction.newBuilder()
475+
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction.newBuilder()
476476
.setId(id)
477477
.setSendEvent(builder);
478478

@@ -505,8 +505,8 @@ public <V> Task<V> callSubOrchestrator(
505505
}
506506

507507
String serializedInput = this.dataConverter.serialize(input);
508-
OrchestratorActions.CreateSubOrchestrationAction.Builder createSubOrchestrationActionBuilder =
509-
OrchestratorActions.CreateSubOrchestrationAction
508+
OrchestratorActions.CreateChildWorkflowAction.Builder createSubOrchestrationActionBuilder =
509+
OrchestratorActions.CreateChildWorkflowAction
510510
.newBuilder().setName(name);
511511
if (serializedInput != null) {
512512
createSubOrchestrationActionBuilder.setInput(StringValue.of(serializedInput));
@@ -535,10 +535,10 @@ public <V> Task<V> callSubOrchestrator(
535535

536536
TaskFactory<V> taskFactory = () -> {
537537
int id = this.sequenceNumber++;
538-
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction
538+
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction
539539
.newBuilder()
540540
.setId(id)
541-
.setCreateSubOrchestration(createSubOrchestrationActionBuilder);
541+
.setCreateChildWorkflow(createSubOrchestrationActionBuilder);
542542

543543
// Set router on the OrchestratorAction for cross-app routing
544544
if (hasSourceAppId()) {
@@ -641,7 +641,7 @@ private void handleTaskScheduled(HistoryEvents.HistoryEvent e) {
641641
// The history shows that this orchestrator created a durable task in a previous execution.
642642
// We can therefore remove it from the map of pending actions. If we can't find the pending
643643
// action, then we assume a non-deterministic code violation in the orchestrator.
644-
OrchestratorActions.OrchestratorAction taskAction = this.pendingActions.remove(taskId);
644+
OrchestratorActions.WorkflowAction taskAction = this.pendingActions.remove(taskId);
645645
if (taskAction == null) {
646646
String message = String.format(
647647
"Non-deterministic orchestrator detected: a history event scheduling an activity task with sequence "
@@ -797,7 +797,7 @@ private Task<Void> createTimer(String name, Instant finalFireAt) {
797797

798798
private CompletableTask<Void> createInstantTimer(String name, int id, Instant fireAt) {
799799
Timestamp ts = DataConverter.getTimestampFromInstant(fireAt);
800-
this.pendingActions.put(id, OrchestratorActions.OrchestratorAction.newBuilder()
800+
this.pendingActions.put(id, OrchestratorActions.WorkflowAction.newBuilder()
801801
.setId(id)
802802
.setCreateTimer(OrchestratorActions.CreateTimerAction.newBuilder()
803803
.setName(name).setFireAt(ts))
@@ -825,7 +825,7 @@ private void handleTimerCreated(HistoryEvents.HistoryEvent e) {
825825
// The history shows that this orchestrator created a durable timer in a previous execution.
826826
// We can therefore remove it from the map of pending actions. If we can't find the pending
827827
// action, then we assume a non-deterministic code violation in the orchestrator.
828-
OrchestratorActions.OrchestratorAction timerAction = this.pendingActions.remove(timerEventId);
828+
OrchestratorActions.WorkflowAction timerAction = this.pendingActions.remove(timerEventId);
829829
if (timerAction == null) {
830830
String message = String.format(
831831
"Non-deterministic orchestrator detected: a history event creating a timer with ID %d and "
@@ -860,9 +860,9 @@ public void handleTimerFired(HistoryEvents.HistoryEvent e) {
860860

861861
private void handleSubOrchestrationCreated(HistoryEvents.HistoryEvent e) {
862862
int taskId = e.getEventId();
863-
HistoryEvents.SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated =
864-
e.getSubOrchestrationInstanceCreated();
865-
OrchestratorActions.OrchestratorAction taskAction = this.pendingActions.remove(taskId);
863+
HistoryEvents.ChildWorkflowInstanceCreatedEvent subOrchestrationInstanceCreated =
864+
e.getChildWorkflowInstanceCreated();
865+
OrchestratorActions.WorkflowAction taskAction = this.pendingActions.remove(taskId);
866866
if (taskAction == null) {
867867
String message = String.format(
868868
"Non-deterministic orchestrator detected: a history event scheduling an sub-orchestration task "
@@ -876,8 +876,8 @@ private void handleSubOrchestrationCreated(HistoryEvents.HistoryEvent e) {
876876
}
877877

878878
private void handleSubOrchestrationCompleted(HistoryEvents.HistoryEvent e) {
879-
HistoryEvents.SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompletedEvent =
880-
e.getSubOrchestrationInstanceCompleted();
879+
HistoryEvents.ChildWorkflowInstanceCompletedEvent subOrchestrationInstanceCompletedEvent =
880+
e.getChildWorkflowInstanceCompleted();
881881
int taskId = subOrchestrationInstanceCompletedEvent.getTaskScheduledId();
882882
TaskRecord<?> record = this.openTasks.remove(taskId);
883883
if (record == null) {
@@ -908,8 +908,8 @@ private void handleSubOrchestrationCompleted(HistoryEvents.HistoryEvent e) {
908908
}
909909

910910
private void handleSubOrchestrationFailed(HistoryEvents.HistoryEvent e) {
911-
HistoryEvents.SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailedEvent =
912-
e.getSubOrchestrationInstanceFailed();
911+
HistoryEvents.ChildWorkflowInstanceFailedEvent subOrchestrationInstanceFailedEvent =
912+
e.getChildWorkflowInstanceFailed();
913913
int taskId = subOrchestrationInstanceFailedEvent.getTaskScheduledId();
914914
TaskRecord<?> record = this.openTasks.remove(taskId);
915915
if (record == null) {
@@ -965,9 +965,9 @@ private void completeInternal(
965965
Helpers.throwIfOrchestratorComplete(this.isComplete);
966966

967967

968-
OrchestratorActions.CompleteOrchestrationAction.Builder builder = OrchestratorActions.CompleteOrchestrationAction
968+
OrchestratorActions.CompleteWorkflowAction.Builder builder = OrchestratorActions.CompleteWorkflowAction
969969
.newBuilder();
970-
builder.setOrchestrationStatus(runtimeStatus);
970+
builder.setWorkflowStatus(runtimeStatus);
971971

972972
if (rawOutput != null) {
973973
builder.setResult(StringValue.of(rawOutput));
@@ -986,10 +986,10 @@ private void completeInternal(
986986
}
987987

988988
int id = this.sequenceNumber++;
989-
OrchestratorActions.OrchestratorAction.Builder actionBuilder = OrchestratorActions.OrchestratorAction
989+
OrchestratorActions.WorkflowAction.Builder actionBuilder = OrchestratorActions.WorkflowAction
990990
.newBuilder()
991991
.setId(id)
992-
.setCompleteOrchestration(builder.build());
992+
.setCompleteWorkflow(builder.build());
993993

994994
// Add router to completion action for cross-app routing back to parent
995995
if (hasSourceAppId()) {
@@ -1003,7 +1003,7 @@ private void completeInternal(
10031003
this.isComplete = true;
10041004
}
10051005

1006-
private void addCarryoverEvents(OrchestratorActions.CompleteOrchestrationAction.Builder builder) {
1006+
private void addCarryoverEvents(OrchestratorActions.CompleteWorkflowAction.Builder builder) {
10071007
// Add historyEvent in the unprocessedEvents buffer
10081008
// Add historyEvent in the new event list that haven't been added to the buffer.
10091009
// We don't check the event in the pass event list to avoid duplicated events.
@@ -1040,28 +1040,28 @@ private void processEvent(HistoryEvents.HistoryEvent e) {
10401040
} else {
10411041
this.logger.fine(() -> this.instanceId + ": Processing event: " + e.getEventTypeCase());
10421042
switch (e.getEventTypeCase()) {
1043-
case ORCHESTRATORSTARTED:
1043+
case WORKFLOWSTARTED:
10441044
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
10451045
this.setCurrentInstant(instant);
10461046

1047-
if (StringUtils.isNotEmpty(e.getOrchestratorStarted().getVersion().getName())) {
1048-
this.orchestratorVersionName = e.getOrchestratorStarted().getVersion().getName();
1047+
if (StringUtils.isNotEmpty(e.getWorkflowStarted().getVersion().getName())) {
1048+
this.orchestratorVersionName = e.getWorkflowStarted().getVersion().getName();
10491049
}
1050-
for (var patch : e.getOrchestratorStarted().getVersion().getPatchesList()) {
1050+
for (var patch : e.getWorkflowStarted().getVersion().getPatchesList()) {
10511051
this.historyPatches.put(patch, true);
10521052
}
10531053

10541054
this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started");
10551055
break;
1056-
case ORCHESTRATORCOMPLETED:
1056+
case WORKFLOWCOMPLETED:
10571057
// No action needed
10581058
this.logger.fine(() -> this.instanceId + ": Workflow orchestrator completed");
10591059
break;
10601060
case EXECUTIONSTARTED:
10611061
HistoryEvents.ExecutionStartedEvent executionStarted = e.getExecutionStarted();
10621062
this.setName(executionStarted.getName());
10631063
this.setInput(executionStarted.getInput().getValue());
1064-
this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId());
1064+
this.setInstanceId(executionStarted.getWorkflowInstance().getInstanceId());
10651065
this.logger.fine(() -> this.instanceId + ": Workflow execution started");
10661066
// For cross-app suborchestrations, if the router has a target, use that as our appID
10671067
// since that's where we're actually executing
@@ -1122,13 +1122,13 @@ private void processEvent(HistoryEvents.HistoryEvent e) {
11221122
case TIMERFIRED:
11231123
this.handleTimerFired(e);
11241124
break;
1125-
case SUBORCHESTRATIONINSTANCECREATED:
1125+
case CHILDWORKFLOWINSTANCECREATED:
11261126
this.handleSubOrchestrationCreated(e);
11271127
break;
1128-
case SUBORCHESTRATIONINSTANCECOMPLETED:
1128+
case CHILDWORKFLOWINSTANCECOMPLETED:
11291129
this.handleSubOrchestrationCompleted(e);
11301130
break;
1131-
case SUBORCHESTRATIONINSTANCEFAILED:
1131+
case CHILDWORKFLOWINSTANCEFAILED:
11321132
this.handleSubOrchestrationFailed(e);
11331133
break;
11341134
case EVENTRAISED:
@@ -1149,14 +1149,14 @@ private void processEvent(HistoryEvents.HistoryEvent e) {
11491149
public void setVersionNotRegistered() {
11501150
this.pendingActions.clear();
11511151

1152-
OrchestratorActions.CompleteOrchestrationAction.Builder builder = OrchestratorActions.CompleteOrchestrationAction
1152+
OrchestratorActions.CompleteWorkflowAction.Builder builder = OrchestratorActions.CompleteWorkflowAction
11531153
.newBuilder();
1154-
builder.setOrchestrationStatus(Orchestration.OrchestrationStatus.ORCHESTRATION_STATUS_STALLED);
1154+
builder.setWorkflowStatus(Orchestration.OrchestrationStatus.ORCHESTRATION_STATUS_STALLED);
11551155

11561156
int id = this.sequenceNumber++;
1157-
OrchestratorActions.OrchestratorAction action = OrchestratorActions.OrchestratorAction.newBuilder()
1157+
OrchestratorActions.WorkflowAction action = OrchestratorActions.WorkflowAction.newBuilder()
11581158
.setId(id)
1159-
.setCompleteOrchestration(builder.build())
1159+
.setCompleteWorkflow(builder.build())
11601160
.build();
11611161
this.pendingActions.put(id, action);
11621162

durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public final class TaskOrchestratorResult {
2323

24-
private final Collection<OrchestratorActions.OrchestratorAction> actions;
24+
private final Collection<OrchestratorActions.WorkflowAction> actions;
2525

2626
private final String customStatus;
2727

@@ -37,15 +37,15 @@ public final class TaskOrchestratorResult {
3737
* @param version the orchestrator version
3838
* @param patches the patches to apply
3939
*/
40-
public TaskOrchestratorResult(Collection<OrchestratorActions.OrchestratorAction> actions,
40+
public TaskOrchestratorResult(Collection<OrchestratorActions.WorkflowAction> actions,
4141
String customStatus, String version, List<String> patches) {
4242
this.actions = Collections.unmodifiableCollection(actions);
4343
this.customStatus = customStatus;
4444
this.version = version;
4545
this.patches = patches;
4646
}
4747

48-
public Collection<OrchestratorActions.OrchestratorAction> getActions() {
48+
public Collection<OrchestratorActions.WorkflowAction> getActions() {
4949
return this.actions;
5050
}
5151

durabletask-client/src/main/java/io/dapr/durabletask/runner/ActivityRunner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private void runWithTracing() {
7777
.setParent(parentContext)
7878
.setSpanKind(SpanKind.INTERNAL)
7979
.setAttribute("durabletask.task.instance_id",
80-
activityRequest.getOrchestrationInstance().getInstanceId())
80+
activityRequest.getWorkflowInstance().getInstanceId())
8181
.setAttribute("durabletask.task.id", activityRequest.getTaskId())
8282
.setAttribute("durabletask.activity.name", activityRequest.getName())
8383
.startSpan();
@@ -123,7 +123,7 @@ private void executeActivity() throws Throwable {
123123

124124
OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse
125125
.newBuilder()
126-
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
126+
.setInstanceId(activityRequest.getWorkflowInstance().getInstanceId())
127127
.setTaskId(activityRequest.getTaskId())
128128
.setCompletionToken(workItem.getCompletionToken());
129129

0 commit comments

Comments
 (0)