Skip to content

Commit 4e06946

Browse files
authored
Pipe: Rewrote the OPC UA subscription logic to avoid the bug of third-party subscription model (#17525)
* complete * Debounce * if
1 parent d142b75 commit 4e06946

5 files changed

Lines changed: 174 additions & 18 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ public void testSingleEnv() throws Exception {
111111
TestUtils.executeNonQueries(
112112
senderEnv,
113113
Arrays.asList(
114-
"drop pipe a2b_history",
115-
"drop pipe a2b_realtime",
114+
"drop pipe if exists a2b_history",
115+
"drop pipe if exists a2b_realtime",
116116
String.format(
117117
"create pipe a2b1 with source ('inclusion'='schema') with sink ('node-urls'='%s')",
118118
receiverDataNode.getIpAndPortString()),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
7070
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
7171
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
72+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE;
73+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY;
7274
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
7375
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
7476
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
@@ -109,6 +111,7 @@
109111
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
110112
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
111113
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
114+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY;
112115
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
113116
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
114117
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
@@ -308,6 +311,10 @@ private void customizeServer(final PipeParameters parameters) {
308311
if (securityPolicies.isEmpty()) {
309312
throw new PipeException("The security policy cannot be empty.");
310313
}
314+
final long debounceTimeMs =
315+
parameters.getLongOrDefault(
316+
Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY, SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY),
317+
CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE);
311318

312319
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
313320
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -327,7 +334,8 @@ private void customizeServer(final PipeParameters parameters) {
327334
.setPassword(password)
328335
.setSecurityDir(securityDir)
329336
.setEnableAnonymousAccess(enableAnonymousAccess)
330-
.setSecurityPolicies(securityPolicies);
337+
.setSecurityPolicies(securityPolicies)
338+
.setDebounceTimeMs(debounceTimeMs);
331339
final OpcUaServer newServer = builder.build();
332340
nameSpace = new OpcUaNameSpace(newServer, builder);
333341
nameSpace.startup();
@@ -341,7 +349,8 @@ private void customizeServer(final PipeParameters parameters) {
341349
password,
342350
securityDir,
343351
enableAnonymousAccess,
344-
securityPolicies);
352+
securityPolicies,
353+
debounceTimeMs);
345354
return oldValue;
346355
}
347356
} catch (final PipeException e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 143 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
4848
import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
4949
import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
50-
import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
50+
import org.eclipse.milo.opcua.stack.core.AttributeId;
5151
import org.eclipse.milo.opcua.stack.core.Identifiers;
5252
import org.eclipse.milo.opcua.stack.core.UaException;
5353
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
@@ -57,6 +57,7 @@
5757
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
5858
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
5959
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
60+
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
6061
import org.slf4j.Logger;
6162
import org.slf4j.LoggerFactory;
6263

@@ -71,20 +72,34 @@
7172
import java.util.Objects;
7273
import java.util.Set;
7374
import java.util.UUID;
75+
import java.util.concurrent.ConcurrentHashMap;
76+
import java.util.concurrent.ConcurrentMap;
77+
import java.util.concurrent.CopyOnWriteArrayList;
78+
import java.util.concurrent.ScheduledFuture;
79+
import java.util.concurrent.TimeUnit;
7480
import java.util.stream.Collectors;
7581

7682
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
7783
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
7884
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
79-
private final SubscriptionModel subscriptionModel;
8085
private final OpcUaServerBuilder builder;
8186

87+
// Do not use subscription model because the original subscription model has some bugs
88+
private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new ConcurrentHashMap<>();
89+
90+
// Debounce task cache: used to merge updates within a short period of time, avoiding unnecessary
91+
// duplicate pushes
92+
private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new ConcurrentHashMap<>();
93+
// Debounce interval: within 10ms, the same node is updated multiple times, and only the last one
94+
// will be pushed (can be adjusted according to your site delay requirements, the minimum can be
95+
// set to 1ms)
96+
private final long debounceIntervalMs;
97+
8298
public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
8399
super(server, NAMESPACE_URI);
84100
this.builder = builder;
101+
debounceIntervalMs = builder.getDebounceTimeMs();
85102

86-
subscriptionModel = new SubscriptionModel(server, this);
87-
getLifecycleManager().addLifecycle(subscriptionModel);
88103
getLifecycleManager()
89104
.addLifecycle(
90105
new Lifecycle() {
@@ -291,7 +306,7 @@ private void transferTabletRowForClientServerModel(
291306
if (Objects.isNull(measurementNode.getValue())
292307
|| Objects.isNull(measurementNode.getValue().getSourceTime())
293308
|| measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) {
294-
measurementNode.setValue(dataValue);
309+
notifyNodeValueChange(measurementNode.getNodeId(), dataValue, measurementNode);
295310
}
296311
} else {
297312
value = values.get(i);
@@ -311,9 +326,11 @@ private void transferTabletRowForClientServerModel(
311326
if (Objects.isNull(valueNode.getValue())
312327
|| Objects.isNull(valueNode.getValue().getSourceTime())
313328
|| valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
314-
valueNode.setValue(
329+
notifyNodeValueChange(
330+
valueNode.getNodeId(),
315331
new DataValue(
316-
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()));
332+
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()),
333+
valueNode);
317334
}
318335
}
319336
}
@@ -546,24 +563,131 @@ public static NodeId convertToOpcDataType(final TSDataType type) {
546563
}
547564
}
548565

566+
/**
567+
* On point value changing, notify all subscribed clients proactively
568+
*
569+
* @param nodeId NodeId of the changing node
570+
* @param newValue New value of the node (DataValue object containing value, status code, and
571+
* timestamp)
572+
* @param variableNode Corresponding UaVariableNode instance, used to update the local cached
573+
* value of the node
574+
*/
575+
public void notifyNodeValueChange(
576+
NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
577+
// 1. Update the local cached value of the node
578+
variableNode.setValue(newValue);
579+
580+
// 2. If there are no subscribers, return directly without doing any extra operations
581+
List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
582+
if (subscribedItems == null || subscribedItems.isEmpty()) {
583+
return;
584+
}
585+
586+
// 2. Debounce+Async Push: Asynchronously push the expensive push operation, while merging
587+
// high-frequency repeated updates
588+
debounceTasks.compute(
589+
nodeId,
590+
(k, oldTask) -> {
591+
// If there is already a pending push task, cancel it, we only need the latest value
592+
if (oldTask != null && !oldTask.isDone()) {
593+
oldTask.cancel(false);
594+
}
595+
596+
// Submit the push task to the Milo's scheduled thread pool, delay DEBOUNCE_INTERVAL_MS
597+
// execution
598+
return getServer()
599+
.getScheduledExecutorService()
600+
.schedule(
601+
() -> {
602+
try {
603+
// Batch push changes to all subscribers, this time-consuming operation is put
604+
// into the thread pool, not blocking your data update thread
605+
for (DataItem item : subscribedItems) {
606+
try {
607+
item.setValue(newValue);
608+
} catch (Exception e) {
609+
// Single client push failure does not affect other clients
610+
LOGGER.warn(
611+
"Failed to push value change to client, nodeId={}", nodeId, e);
612+
}
613+
}
614+
} finally {
615+
// Task execution completed, clean up the debounce cache
616+
debounceTasks.remove(nodeId);
617+
}
618+
},
619+
debounceIntervalMs,
620+
TimeUnit.MILLISECONDS);
621+
});
622+
}
623+
549624
@Override
550625
public void onDataItemsCreated(final List<DataItem> dataItems) {
551-
subscriptionModel.onDataItemsCreated(dataItems);
626+
for (DataItem item : dataItems) {
627+
final ReadValueId readValueId = item.getReadValueId();
628+
// Only handle Value attribute subscription (align with the original SubscriptionModel logic,
629+
// ignore other attribute subscriptions)
630+
if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
631+
continue;
632+
}
633+
final NodeId nodeId = readValueId.getNodeId();
634+
635+
// 1. Add the new subscription item to the subscription mapping
636+
nodeSubscriptions.compute(
637+
nodeId,
638+
(k, existingList) -> {
639+
List<DataItem> list =
640+
existingList != null ? existingList : new CopyOnWriteArrayList<>();
641+
list.add(item);
642+
return list;
643+
});
644+
645+
// 2. 【Key Optimization】Proactively push the current node's initial value when the new
646+
// subscription item is created
647+
// Eliminate Bad_WaitingForInitialData, no need to wait for any polling
648+
try {
649+
UaVariableNode node = (UaVariableNode) getNodeManager().getNode(nodeId).orElse(null);
650+
if (node != null && node.getValue() != null) {
651+
// Immediately push the current value to the new subscriber, the client will instantly be
652+
// able to get the initial data
653+
item.setValue(node.getValue());
654+
}
655+
} catch (Exception e) {
656+
LOGGER.warn("Failed to send initial value to new subscription, nodeId={}", nodeId, e);
657+
}
658+
}
552659
}
553660

554661
@Override
555662
public void onDataItemsModified(final List<DataItem> dataItems) {
556-
subscriptionModel.onDataItemsModified(dataItems);
663+
// Push mode, client modifies subscription parameters (e.g. sampling interval) has no effect on
664+
// our active push, no additional processing is needed
557665
}
558666

559667
@Override
560668
public void onDataItemsDeleted(final List<DataItem> dataItems) {
561-
subscriptionModel.onDataItemsDeleted(dataItems);
669+
for (DataItem item : dataItems) {
670+
final ReadValueId readValueId = item.getReadValueId();
671+
if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
672+
continue;
673+
}
674+
final NodeId nodeId = readValueId.getNodeId();
675+
676+
// When the client cancels the subscription, remove this subscription item from the mapping
677+
nodeSubscriptions.computeIfPresent(
678+
nodeId,
679+
(k, existingList) -> {
680+
existingList.remove(item);
681+
// Automatically clean up the key when there are no subscribers, save memory
682+
return existingList.isEmpty() ? null : existingList;
683+
});
684+
}
562685
}
563686

564687
@Override
565688
public void onMonitoringModeChanged(final List<MonitoredItem> monitoredItems) {
566-
subscriptionModel.onMonitoringModeChanged(monitoredItems);
689+
// Push mode, monitoring mode change has no effect on active push, no additional processing is
690+
// needed
567691
}
568692

569693
/////////////////////////////// Conflict detection ///////////////////////////////
@@ -573,8 +697,14 @@ public void checkEquals(
573697
final String password,
574698
final String securityDir,
575699
final boolean enableAnonymousAccess,
576-
final Set<SecurityPolicy> securityPolicies) {
700+
final Set<SecurityPolicy> securityPolicies,
701+
final long debounceTimeMs) {
577702
builder.checkEquals(
578-
user, password, Paths.get(securityDir), enableAnonymousAccess, securityPolicies);
703+
user,
704+
password,
705+
Paths.get(securityDir),
706+
enableAnonymousAccess,
707+
securityPolicies,
708+
debounceTimeMs);
579709
}
580710
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
8686
private boolean enableAnonymousAccess;
8787
private Set<SecurityPolicy> securityPolicies;
8888
private DefaultTrustListManager trustListManager;
89+
private long debounceTimeMs;
8990

9091
public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
9192
this.tcpBindPort = tcpBindPort;
@@ -123,6 +124,15 @@ public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> security
123124
return this;
124125
}
125126

127+
public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) {
128+
this.debounceTimeMs = debounceTimeMs;
129+
return this;
130+
}
131+
132+
public long getDebounceTimeMs() {
133+
return debounceTimeMs;
134+
}
135+
126136
public OpcUaServer build() throws Exception {
127137
Files.createDirectories(securityDir);
128138
if (!Files.exists(securityDir)) {
@@ -314,7 +324,8 @@ void checkEquals(
314324
final String password,
315325
final Path securityDir,
316326
final boolean enableAnonymousAccess,
317-
final Set<SecurityPolicy> securityPolicies) {
327+
final Set<SecurityPolicy> securityPolicies,
328+
final long debounceTimeMs) {
318329
checkEquals("user", this.user, user);
319330
checkEquals("password", this.password, password);
320331
checkEquals(
@@ -323,6 +334,7 @@ void checkEquals(
323334
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
324335
checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess);
325336
checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
337+
checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs);
326338
}
327339

328340
private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ public class PipeSinkConstant {
242242
"connector.opcua.timeout-seconds";
243243
public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 10L;
244244

245+
public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY =
246+
"connector.opcua.debounce-time-ms";
247+
public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY = "sink.opcua.debounce-time-ms";
248+
public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE = 50L;
249+
245250
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable";
246251
public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable";
247252
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true;

0 commit comments

Comments
 (0)