Skip to content

Commit 8bae81a

Browse files
authored
feat: port nats tasks executors (#5624)
1 parent f01fc4e commit 8bae81a

25 files changed

Lines changed: 1958 additions & 1 deletion

File tree

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2025 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
plugins {
16+
`java-library`
17+
`java-test-fixtures`
18+
}
19+
20+
dependencies {
21+
api(project(":spi:common:core-spi"))
22+
api(project(":spi:common:task-spi"))
23+
api(project(":spi:common:transaction-spi"))
24+
25+
implementation(libs.nats.client)
26+
27+
testFixturesImplementation(libs.nats.client)
28+
testFixturesImplementation(libs.testcontainers.junit)
29+
30+
}
31+
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.nats;
16+
17+
import io.nats.client.JetStreamApiException;
18+
import io.nats.client.JetStreamManagement;
19+
import io.nats.client.api.ConsumerConfiguration;
20+
import io.nats.client.api.StorageType;
21+
import io.nats.client.api.StreamConfiguration;
22+
23+
public class NatsFunctions {
24+
25+
public static void createStream(JetStreamManagement jsm, String streamName, StorageType storageType, String... subject) {
26+
try {
27+
if (!streamExists(jsm, streamName)) {
28+
var streamConfig = StreamConfiguration.builder()
29+
.name(streamName)
30+
.subjects(subject)
31+
.storageType(storageType)
32+
.build();
33+
jsm.addStream(streamConfig);
34+
}
35+
} catch (Exception e) {
36+
throw new RuntimeException(e);
37+
}
38+
}
39+
40+
public static void createConsumer(JetStreamManagement jsm, String streamName, String consumerName) {
41+
createConsumer(jsm, streamName, consumerName, null);
42+
}
43+
44+
public static void createConsumer(JetStreamManagement jsm, String streamName, String consumerName, String filterSubject) {
45+
try {
46+
jsm.addOrUpdateConsumer(streamName, ConsumerConfiguration.builder()
47+
.durable(consumerName)
48+
.name(consumerName)
49+
.filterSubject(filterSubject)
50+
.build());
51+
} catch (Exception e) {
52+
throw new RuntimeException(e);
53+
}
54+
}
55+
56+
public static boolean streamExists(JetStreamManagement jsm, String streamName) {
57+
try {
58+
var si = jsm.getStreamInfo(streamName);
59+
return si != null;
60+
} catch (JetStreamApiException e) {
61+
// This means the stream doesn't exist
62+
if (e.getErrorCode() == 404) {
63+
return false;
64+
}
65+
throw new RuntimeException(e);
66+
} catch (Exception e) {
67+
throw new RuntimeException(e);
68+
}
69+
}
70+
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.nats.subscriber;
16+
17+
import io.nats.client.Connection;
18+
import io.nats.client.JetStreamSubscription;
19+
import io.nats.client.Message;
20+
import io.nats.client.Nats;
21+
import io.nats.client.PullSubscribeOptions;
22+
import io.nats.client.api.StorageType;
23+
import org.eclipse.edc.spi.monitor.Monitor;
24+
import org.eclipse.edc.spi.response.StatusResult;
25+
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
26+
27+
import java.util.Objects;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
32+
import static org.eclipse.edc.nats.NatsFunctions.createConsumer;
33+
import static org.eclipse.edc.nats.NatsFunctions.createStream;
34+
35+
public abstract class NatsSubscriber {
36+
37+
private final AtomicBoolean active = new AtomicBoolean(false);
38+
protected String url;
39+
protected String stream;
40+
protected String name;
41+
protected String subject;
42+
protected ExecutorService executorService;
43+
protected Monitor monitor;
44+
protected boolean autoCreate = false;
45+
protected Integer batchSize = 100;
46+
protected Integer maxWait = 100;
47+
private Connection connection;
48+
49+
50+
public void prepare() {
51+
if (!autoCreate) {
52+
return;
53+
}
54+
try {
55+
var conn = getOrCreateConnection();
56+
conn.jetStream();
57+
var jsm = conn.jetStreamManagement();
58+
createStream(jsm, stream, StorageType.Memory, subject);
59+
createConsumer(jsm, stream, name, subject);
60+
} catch (Exception e) {
61+
throw new RuntimeException(e);
62+
}
63+
}
64+
65+
public void start() {
66+
try {
67+
var connection = getOrCreateConnection();
68+
var js = connection.jetStream();
69+
var pullOptions = PullSubscribeOptions.builder()
70+
.stream(stream)
71+
.durable(name)
72+
.build();
73+
74+
var sub = js.subscribe(subject, pullOptions);
75+
active.set(true);
76+
77+
executorService.submit(() -> {
78+
run(sub);
79+
});
80+
} catch (Exception e) {
81+
throw new RuntimeException("Failed to start NATS subscriber", e);
82+
}
83+
84+
}
85+
86+
private Connection getOrCreateConnection() {
87+
if (connection == null) {
88+
try {
89+
connection = Nats.connect(url);
90+
} catch (Exception e) {
91+
throw new RuntimeException("Failed to create NATS connection", e);
92+
}
93+
}
94+
return connection;
95+
}
96+
97+
protected abstract StatusResult<Void> handleMessage(Message message);
98+
99+
private void run(JetStreamSubscription sub) {
100+
while (active.get()) {
101+
var messages = sub.fetch(batchSize, maxWait);
102+
for (var message : messages) {
103+
try {
104+
var result = handleMessage(message);
105+
if (result.failed()) {
106+
if (result.fatalError()) {
107+
monitor.severe("Failed to handle Nats message, received a fatal error: " + result.getFailureMessages());
108+
message.term();
109+
} else {
110+
monitor.warning("Failed to handle Nats message: " + result.getFailureMessages());
111+
message.nak();
112+
}
113+
continue;
114+
}
115+
message.ack();
116+
} catch (Exception e) {
117+
monitor.severe("Failed to process transfer message: " + e.getMessage(), e);
118+
message.nak();
119+
}
120+
}
121+
}
122+
}
123+
124+
public void stop() {
125+
active.set(false);
126+
executorService.shutdown();
127+
try {
128+
if (connection != null) {
129+
connection.close();
130+
}
131+
} catch (InterruptedException e) {
132+
throw new RuntimeException(e);
133+
}
134+
}
135+
136+
public abstract static class Builder<T extends NatsSubscriber, B extends Builder<T, B>> {
137+
138+
protected final T subscriber;
139+
private ExecutorInstrumentation executorInstrumentation = ExecutorInstrumentation.noop();
140+
141+
protected Builder(T subscriber) {
142+
this.subscriber = subscriber;
143+
}
144+
145+
public abstract B self();
146+
147+
public B url(String url) {
148+
subscriber.url = url;
149+
return self();
150+
}
151+
152+
public B stream(String stream) {
153+
subscriber.stream = stream;
154+
return self();
155+
}
156+
157+
public B name(String name) {
158+
subscriber.name = name;
159+
return self();
160+
}
161+
162+
public B subject(String subject) {
163+
subscriber.subject = subject;
164+
return self();
165+
}
166+
167+
public B executorInstrumentation(ExecutorInstrumentation executorInstrumentation) {
168+
this.executorInstrumentation = executorInstrumentation;
169+
return self();
170+
}
171+
172+
public B monitor(Monitor monitor) {
173+
subscriber.monitor = monitor;
174+
return self();
175+
}
176+
177+
public B autoCreate(boolean autoCreate) {
178+
subscriber.autoCreate = autoCreate;
179+
return self();
180+
}
181+
182+
public B batchSize(int batchSize) {
183+
subscriber.batchSize = batchSize;
184+
return self();
185+
}
186+
187+
public B maxWait(int maxWait) {
188+
subscriber.maxWait = maxWait;
189+
return self();
190+
}
191+
192+
public T build() {
193+
194+
Objects.requireNonNull(subscriber.url, "url");
195+
Objects.requireNonNull(subscriber.stream, "stream");
196+
Objects.requireNonNull(subscriber.name, "name");
197+
Objects.requireNonNull(subscriber.subject, "subject");
198+
Objects.requireNonNull(executorInstrumentation, "executorInstrumentation");
199+
Objects.requireNonNull(subscriber.monitor, "monitor");
200+
201+
var name = "NatsSubscriber-" + subscriber.name;
202+
subscriber.executorService = executorInstrumentation.instrument(Executors.newSingleThreadScheduledExecutor(r -> {
203+
var thread = Executors.defaultThreadFactory().newThread(r);
204+
thread.setName(name);
205+
return thread;
206+
}), name);
207+
208+
return subscriber;
209+
}
210+
}
211+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2026 Metaform Systems, Inc.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Metaform Systems, Inc. - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.nats.tasks.publisher;
16+
17+
import com.fasterxml.jackson.databind.ObjectMapper;
18+
import io.nats.client.JetStream;
19+
import org.eclipse.edc.controlplane.tasks.ProcessTaskPayload;
20+
import org.eclipse.edc.controlplane.tasks.Task;
21+
import org.eclipse.edc.controlplane.tasks.TaskListener;
22+
import org.eclipse.edc.spi.EdcException;
23+
import org.eclipse.edc.spi.monitor.Monitor;
24+
25+
import java.util.function.Supplier;
26+
27+
import static java.lang.String.format;
28+
29+
public class NatsTaskPublisher implements TaskListener {
30+
31+
private final String subjectPrefix;
32+
private final Class<? extends ProcessTaskPayload> target;
33+
private final JetStream js;
34+
private final Monitor monitor;
35+
private final Supplier<ObjectMapper> objectMapper;
36+
37+
38+
public NatsTaskPublisher(String subjectPrefix, Class<? extends ProcessTaskPayload> target, JetStream js, Monitor monitor, Supplier<ObjectMapper> objectMapper) {
39+
this.subjectPrefix = subjectPrefix;
40+
this.target = target;
41+
this.js = js;
42+
this.monitor = monitor;
43+
this.objectMapper = objectMapper;
44+
}
45+
46+
47+
private String formatSubject(ProcessTaskPayload t) {
48+
return format("%s.%s.%s", subjectPrefix, t.getProcessType().toLowerCase(), t.name());
49+
}
50+
51+
@Override
52+
public void created(Task task) {
53+
try {
54+
if (target.isAssignableFrom(task.getPayload().getClass())) {
55+
var message = objectMapper.get().writeValueAsString(task);
56+
js.publish(formatSubject((ProcessTaskPayload) task.getPayload()), message.getBytes());
57+
}
58+
} catch (Exception e) {
59+
monitor.severe("Failed to publish task created event for task id " + task.getId(), e);
60+
throw new EdcException(e);
61+
}
62+
63+
}
64+
}

0 commit comments

Comments
 (0)