Skip to content

Commit 2fdcc1d

Browse files
authored
feat: add task subsystem (#5612)
* feat: add task subsystem * pr suggestions
1 parent 83b9245 commit 2fdcc1d

56 files changed

Lines changed: 3230 additions & 1 deletion

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
plugins {
16+
`java-library`
17+
}
18+
19+
dependencies {
20+
api(project(":spi:common:task-spi"))
21+
api(project(":spi:common:transaction-spi"))
22+
implementation(project(":core:common:lib:store-lib"))
23+
testImplementation(project(":core:common:lib:query-lib"))
24+
testImplementation(testFixtures(project(":spi:common:task-spi")))
25+
testImplementation(project(":core:common:junit-base"))
26+
27+
}
28+
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.controlplane.tasks;
16+
17+
import org.eclipse.edc.spi.observe.ObservableImpl;
18+
19+
public class TaskObservableImpl extends ObservableImpl<TaskListener> implements TaskObservable {
20+
21+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.controlplane.tasks;
16+
17+
import org.eclipse.edc.controlplane.tasks.store.TaskStore;
18+
import org.eclipse.edc.spi.query.QuerySpec;
19+
import org.eclipse.edc.spi.result.ServiceResult;
20+
import org.eclipse.edc.transaction.spi.TransactionContext;
21+
22+
import java.util.List;
23+
24+
public class TaskServiceImpl implements TaskService {
25+
26+
private final TaskStore taskStore;
27+
private final TaskObservable taskObservable;
28+
private final TransactionContext transactionContext;
29+
30+
31+
public TaskServiceImpl(TaskStore taskStore, TaskObservable taskObservable, TransactionContext transactionContext) {
32+
this.taskStore = taskStore;
33+
this.taskObservable = taskObservable;
34+
this.transactionContext = transactionContext;
35+
}
36+
37+
@Override
38+
public ServiceResult<Task> create(Task task) {
39+
return transactionContext.execute(() -> {
40+
var storeResult = taskStore.create(task).onSuccess(v -> {
41+
taskObservable.invokeForEach(l -> l.created(task));
42+
});
43+
return ServiceResult.from(storeResult).map(v -> task);
44+
});
45+
}
46+
47+
@Override
48+
public List<Task> fetchLatestTask(QuerySpec query) {
49+
return transactionContext.execute(() -> taskStore.fetchForUpdate(query));
50+
}
51+
52+
@Override
53+
public ServiceResult<Void> delete(String id) {
54+
return transactionContext.execute(() -> ServiceResult.from(taskStore.delete(id)));
55+
}
56+
57+
@Override
58+
public ServiceResult<Void> update(Task task) {
59+
return transactionContext.execute(() -> ServiceResult.from(taskStore.update(task)));
60+
}
61+
62+
@Override
63+
public Task findById(String id) {
64+
return transactionContext.execute(() -> taskStore.findById(id));
65+
}
66+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.controlplane.tasks;
16+
17+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.AcceptNegotiation;
18+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.AgreeNegotiation;
19+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.FinalizeNegotiation;
20+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.OfferNegotiation;
21+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.RequestNegotiation;
22+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.SendAccept;
23+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.SendAgreement;
24+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.SendFinalizeNegotiation;
25+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.SendOffer;
26+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.SendRequestNegotiation;
27+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.SendTerminateNegotiation;
28+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.SendVerificationNegotiation;
29+
import org.eclipse.edc.controlplane.contract.spi.negotiation.tasks.VerifyNegotiation;
30+
import org.eclipse.edc.controlplane.transfer.spi.tasks.CompleteDataFlow;
31+
import org.eclipse.edc.controlplane.transfer.spi.tasks.PrepareTransfer;
32+
import org.eclipse.edc.controlplane.transfer.spi.tasks.ResumeDataFlow;
33+
import org.eclipse.edc.controlplane.transfer.spi.tasks.SendTransferRequest;
34+
import org.eclipse.edc.controlplane.transfer.spi.tasks.SendTransferStart;
35+
import org.eclipse.edc.controlplane.transfer.spi.tasks.SignalDataflowStarted;
36+
import org.eclipse.edc.controlplane.transfer.spi.tasks.SuspendDataFlow;
37+
import org.eclipse.edc.controlplane.transfer.spi.tasks.TerminateDataFlow;
38+
39+
import java.util.List;
40+
41+
/**
42+
* Central registry of all task types. This class is used to register all task types in the system, so that they can be
43+
* discovered and executed by the TaskManager.
44+
*/
45+
public class TaskTypes {
46+
47+
public static final List<Class<?>> TYPES = List.of(
48+
RequestNegotiation.class,
49+
SendRequestNegotiation.class,
50+
AcceptNegotiation.class,
51+
SendAccept.class,
52+
AgreeNegotiation.class,
53+
OfferNegotiation.class,
54+
SendAgreement.class,
55+
SendOffer.class,
56+
SendTerminateNegotiation.class,
57+
VerifyNegotiation.class,
58+
SendVerificationNegotiation.class,
59+
FinalizeNegotiation.class,
60+
SendFinalizeNegotiation.class,
61+
PrepareTransfer.class,
62+
SendTransferRequest.class,
63+
SendTransferStart.class,
64+
SignalDataflowStarted.class,
65+
SuspendDataFlow.class,
66+
ResumeDataFlow.class,
67+
TerminateDataFlow.class,
68+
CompleteDataFlow.class);
69+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.controlplane.tasks;
16+
17+
import org.eclipse.edc.controlplane.tasks.store.TaskStore;
18+
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
19+
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
20+
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
21+
import org.eclipse.edc.spi.system.ServiceExtension;
22+
import org.eclipse.edc.spi.system.ServiceExtensionContext;
23+
import org.eclipse.edc.spi.types.TypeManager;
24+
import org.eclipse.edc.transaction.spi.TransactionContext;
25+
26+
import static org.eclipse.edc.controlplane.tasks.TaskTypes.TYPES;
27+
import static org.eclipse.edc.controlplane.tasks.TasksServicesExtension.NAME;
28+
29+
@Extension(NAME)
30+
public class TasksServicesExtension implements ServiceExtension {
31+
32+
public static final String NAME = "Tasks Core Services";
33+
34+
@Inject
35+
private TaskStore taskStore;
36+
37+
@Inject
38+
private TransactionContext transactionContext;
39+
40+
@Inject
41+
private TypeManager typeManager;
42+
43+
private TaskObservable observable;
44+
45+
46+
@Override
47+
public void initialize(ServiceExtensionContext context) {
48+
TYPES.forEach(typeManager::registerTypes);
49+
}
50+
51+
@Provider
52+
public TaskService taskService() {
53+
return new TaskServiceImpl(taskStore, taskObservable(), transactionContext);
54+
}
55+
56+
@Provider
57+
public TaskObservable taskObservable() {
58+
if (observable == null) {
59+
observable = new TaskObservableImpl();
60+
}
61+
return observable;
62+
}
63+
64+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.controlplane.tasks.defaults;
16+
17+
import org.eclipse.edc.controlplane.tasks.Task;
18+
import org.eclipse.edc.controlplane.tasks.store.TaskStore;
19+
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
20+
import org.eclipse.edc.spi.query.QueryResolver;
21+
import org.eclipse.edc.spi.query.QuerySpec;
22+
import org.eclipse.edc.spi.result.StoreResult;
23+
import org.eclipse.edc.store.ReflectionBasedQueryResolver;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.stream.Collectors;
29+
30+
import static java.lang.String.format;
31+
32+
public class InMemoryTaskStore implements TaskStore {
33+
34+
private final Map<String, Task> tasks = new ConcurrentHashMap<>();
35+
private final QueryResolver<Task> queryResolver;
36+
37+
public InMemoryTaskStore(CriterionOperatorRegistry criterionOperatorRegistry) {
38+
this.queryResolver = new ReflectionBasedQueryResolver<>(Task.class, criterionOperatorRegistry);
39+
}
40+
41+
@Override
42+
public StoreResult<Void> create(Task task) {
43+
tasks.put(task.getId(), task);
44+
return StoreResult.success();
45+
}
46+
47+
@Override
48+
public List<Task> fetchForUpdate(QuerySpec querySpec) {
49+
return queryResolver.query(tasks.values().stream(), querySpec)
50+
.collect(Collectors.toList());
51+
}
52+
53+
@Override
54+
public StoreResult<Void> delete(String id) {
55+
var prev = tasks.remove(id);
56+
if (prev != null) return StoreResult.success();
57+
return StoreResult.notFound(format("Task with id %s not found", id));
58+
}
59+
60+
@Override
61+
public StoreResult<Void> update(Task task) {
62+
var prev = tasks.computeIfPresent(task.getId(), (k, v) -> task);
63+
if (prev != null) return StoreResult.success();
64+
return StoreResult.notFound(format("Task with id %s not found", task.getId()));
65+
}
66+
67+
@Override
68+
public Task findById(String id) {
69+
return tasks.get(id);
70+
}
71+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.controlplane.tasks.defaults;
16+
17+
import org.eclipse.edc.controlplane.tasks.store.TaskStore;
18+
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
19+
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
20+
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
21+
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
22+
import org.eclipse.edc.spi.system.ServiceExtension;
23+
24+
import static org.eclipse.edc.controlplane.tasks.defaults.TasksDefaultServicesExtension.NAME;
25+
26+
27+
@Extension(NAME)
28+
public class TasksDefaultServicesExtension implements ServiceExtension {
29+
30+
public static final String NAME = "Tasks Default Services";
31+
@Inject
32+
private CriterionOperatorRegistry criterionOperatorRegistry;
33+
34+
@Provider(isDefault = true)
35+
public TaskStore taskStore() {
36+
return new InMemoryTaskStore(criterionOperatorRegistry);
37+
}
38+
39+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#
2+
# Copyright (c) 2026 Metaform Systems, Inc.
3+
#
4+
# This program and the accompanying materials are made available under the
5+
# terms of the Apache License, Version 2.0 which is available at
6+
# https://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# SPDX-License-Identifier: Apache-2.0
9+
#
10+
# Contributors:
11+
# Metaform Systems, Inc. - initial API and implementation
12+
#
13+
#
14+
org.eclipse.edc.controlplane.tasks.TasksServicesExtension
15+
org.eclipse.edc.controlplane.tasks.defaults.TasksDefaultServicesExtension

0 commit comments

Comments
 (0)