Skip to content

Commit ecef8f7

Browse files
authored
[To dev/1.3] Pipe: Rewrote the OPC UA subscription logic to avoid the bug of third-party subscription model (#17524)
* complete * Debounce
1 parent 9cadc71 commit ecef8f7

4 files changed

Lines changed: 173 additions & 17 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
6464
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
6565
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
66+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE;
67+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY;
6668
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
6769
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
6870
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
@@ -101,6 +103,7 @@
101103
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
102104
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
103105
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
106+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY;
104107
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
105108
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
106109
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
@@ -278,6 +281,10 @@ private void customizeServer(final PipeParameters parameters) {
278281
if (securityPolicies.isEmpty()) {
279282
throw new PipeException("The security policy cannot be empty.");
280283
}
284+
final long debounceTimeMs =
285+
parameters.getLongOrDefault(
286+
Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY, SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY),
287+
CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE);
281288

282289
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
283290
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -297,7 +304,8 @@ private void customizeServer(final PipeParameters parameters) {
297304
.setPassword(password)
298305
.setSecurityDir(securityDir)
299306
.setEnableAnonymousAccess(enableAnonymousAccess)
300-
.setSecurityPolicies(securityPolicies);
307+
.setSecurityPolicies(securityPolicies)
308+
.setDebounceTimeMs(debounceTimeMs);
301309
final OpcUaServer newServer = builder.build();
302310
nameSpace = new OpcUaNameSpace(newServer, builder);
303311
nameSpace.startup();
@@ -311,7 +319,8 @@ private void customizeServer(final PipeParameters parameters) {
311319
password,
312320
securityDir,
313321
enableAnonymousAccess,
314-
securityPolicies);
322+
securityPolicies,
323+
debounceTimeMs);
315324
return oldValue;
316325
}
317326
} catch (final PipeException e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 144 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
4646
import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
4747
import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
48-
import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
48+
import org.eclipse.milo.opcua.stack.core.AttributeId;
4949
import org.eclipse.milo.opcua.stack.core.Identifiers;
5050
import org.eclipse.milo.opcua.stack.core.UaException;
5151
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
@@ -55,6 +55,7 @@
5555
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
5656
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
5757
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
58+
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
6061

@@ -68,19 +69,33 @@
6869
import java.util.Objects;
6970
import java.util.Set;
7071
import java.util.UUID;
72+
import java.util.concurrent.ConcurrentHashMap;
73+
import java.util.concurrent.ConcurrentMap;
74+
import java.util.concurrent.CopyOnWriteArrayList;
75+
import java.util.concurrent.ScheduledFuture;
76+
import java.util.concurrent.TimeUnit;
7177

7278
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
7379
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
7480
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
75-
private final SubscriptionModel subscriptionModel;
7681
private final OpcUaServerBuilder builder;
7782

83+
// Do not use subscription model because the original subscription model has some bugs
84+
private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new ConcurrentHashMap<>();
85+
86+
// Debounce task cache: used to merge updates within a short period of time, avoiding unnecessary
87+
// duplicate pushes
88+
private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new ConcurrentHashMap<>();
89+
// Debounce interval: within 10ms, the same node is updated multiple times, and only the last one
90+
// will be pushed (can be adjusted according to your site delay requirements, the minimum can be
91+
// set to 1ms)
92+
private final long debounceIntervalMs;
93+
7894
public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
7995
super(server, NAMESPACE_URI);
8096
this.builder = builder;
97+
debounceIntervalMs = builder.getDebounceTimeMs();
8198

82-
subscriptionModel = new SubscriptionModel(server, this);
83-
getLifecycleManager().addLifecycle(subscriptionModel);
8499
getLifecycleManager()
85100
.addLifecycle(
86101
new Lifecycle() {
@@ -245,7 +260,7 @@ private void transferTabletRowForClientServerModel(
245260
measurementNode =
246261
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
247262
.setNodeId(nodeId)
248-
.setAccessLevel(AccessLevel.READ_WRITE)
263+
.setAccessLevel(AccessLevel.READ_ONLY)
249264
.setUserAccessLevel(AccessLevel.READ_ONLY)
250265
.setBrowseName(newQualifiedName(nodeName))
251266
.setDisplayName(LocalizedText.english(nodeName))
@@ -279,7 +294,7 @@ private void transferTabletRowForClientServerModel(
279294
if (Objects.isNull(measurementNode.getValue())
280295
|| Objects.isNull(measurementNode.getValue().getSourceTime())
281296
|| measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) {
282-
measurementNode.setValue(dataValue);
297+
notifyNodeValueChange(nodeId, dataValue, measurementNode);
283298
}
284299
} else {
285300
valueNode = measurementNode;
@@ -291,9 +306,11 @@ private void transferTabletRowForClientServerModel(
291306
if (Objects.isNull(valueNode.getValue())
292307
|| Objects.isNull(valueNode.getValue().getSourceTime())
293308
|| valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
294-
valueNode.setValue(
309+
notifyNodeValueChange(
310+
valueNode.getNodeId(),
295311
new DataValue(
296-
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()));
312+
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()),
313+
valueNode);
297314
}
298315
}
299316
}
@@ -451,24 +468,131 @@ public static NodeId convertToOpcDataType(final TSDataType type) {
451468
}
452469
}
453470

471+
/**
472+
* On point value changing, notify all subscribed clients proactively
473+
*
474+
* @param nodeId NodeId of the changing node
475+
* @param newValue New value of the node (DataValue object containing value, status code, and
476+
* timestamp)
477+
* @param variableNode Corresponding UaVariableNode instance, used to update the local cached
478+
* value of the node
479+
*/
480+
public void notifyNodeValueChange(
481+
NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
482+
// 1. Update the local cached value of the node
483+
variableNode.setValue(newValue);
484+
485+
// 2. If there are no subscribers, return directly without doing any extra operations
486+
List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
487+
if (subscribedItems == null || subscribedItems.isEmpty()) {
488+
return;
489+
}
490+
491+
// 2. Debounce+Async Push: Asynchronously push the expensive push operation, while merging
492+
// high-frequency repeated updates
493+
debounceTasks.compute(
494+
nodeId,
495+
(k, oldTask) -> {
496+
// If there is already a pending push task, cancel it, we only need the latest value
497+
if (oldTask != null && !oldTask.isDone()) {
498+
oldTask.cancel(false);
499+
}
500+
501+
// Submit the push task to the Milo's scheduled thread pool, delay DEBOUNCE_INTERVAL_MS
502+
// execution
503+
return getServer()
504+
.getScheduledExecutorService()
505+
.schedule(
506+
() -> {
507+
try {
508+
// Batch push changes to all subscribers, this time-consuming operation is put
509+
// into the thread pool, not blocking your data update thread
510+
for (DataItem item : subscribedItems) {
511+
try {
512+
item.setValue(newValue);
513+
} catch (Exception e) {
514+
// Single client push failure does not affect other clients
515+
LOGGER.warn(
516+
"Failed to push value change to client, nodeId={}", nodeId, e);
517+
}
518+
}
519+
} finally {
520+
// Task execution completed, clean up the debounce cache
521+
debounceTasks.remove(nodeId);
522+
}
523+
},
524+
debounceIntervalMs,
525+
TimeUnit.MILLISECONDS);
526+
});
527+
}
528+
454529
@Override
455530
public void onDataItemsCreated(final List<DataItem> dataItems) {
456-
subscriptionModel.onDataItemsCreated(dataItems);
531+
for (DataItem item : dataItems) {
532+
final ReadValueId readValueId = item.getReadValueId();
533+
// Only handle Value attribute subscription (align with the original SubscriptionModel logic,
534+
// ignore other attribute subscriptions)
535+
if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
536+
continue;
537+
}
538+
final NodeId nodeId = readValueId.getNodeId();
539+
540+
// 1. Add the new subscription item to the subscription mapping
541+
nodeSubscriptions.compute(
542+
nodeId,
543+
(k, existingList) -> {
544+
List<DataItem> list =
545+
existingList != null ? existingList : new CopyOnWriteArrayList<>();
546+
list.add(item);
547+
return list;
548+
});
549+
550+
// 2. 【Key Optimization】Proactively push the current node's initial value when the new
551+
// subscription item is created
552+
// Eliminate Bad_WaitingForInitialData, no need to wait for any polling
553+
try {
554+
UaVariableNode node = (UaVariableNode) getNodeManager().getNode(nodeId).orElse(null);
555+
if (node != null && node.getValue() != null) {
556+
// Immediately push the current value to the new subscriber, the client will instantly be
557+
// able to get the initial data
558+
item.setValue(node.getValue());
559+
}
560+
} catch (Exception e) {
561+
LOGGER.warn("Failed to send initial value to new subscription, nodeId={}", nodeId, e);
562+
}
563+
}
457564
}
458565

459566
@Override
460567
public void onDataItemsModified(final List<DataItem> dataItems) {
461-
subscriptionModel.onDataItemsModified(dataItems);
568+
// Push mode, client modifies subscription parameters (e.g. sampling interval) has no effect on
569+
// our active push, no additional processing is needed
462570
}
463571

464572
@Override
465573
public void onDataItemsDeleted(final List<DataItem> dataItems) {
466-
subscriptionModel.onDataItemsDeleted(dataItems);
574+
for (DataItem item : dataItems) {
575+
final ReadValueId readValueId = item.getReadValueId();
576+
if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
577+
continue;
578+
}
579+
final NodeId nodeId = readValueId.getNodeId();
580+
581+
// When the client cancels the subscription, remove this subscription item from the mapping
582+
nodeSubscriptions.computeIfPresent(
583+
nodeId,
584+
(k, existingList) -> {
585+
existingList.remove(item);
586+
// Automatically clean up the key when there are no subscribers, save memory
587+
return existingList.isEmpty() ? null : existingList;
588+
});
589+
}
467590
}
468591

469592
@Override
470593
public void onMonitoringModeChanged(final List<MonitoredItem> monitoredItems) {
471-
subscriptionModel.onMonitoringModeChanged(monitoredItems);
594+
// Push mode, monitoring mode change has no effect on active push, no additional processing is
595+
// needed
472596
}
473597

474598
/////////////////////////////// Conflict detection ///////////////////////////////
@@ -478,8 +602,14 @@ public void checkEquals(
478602
final String password,
479603
final String securityDir,
480604
final boolean enableAnonymousAccess,
481-
final Set<SecurityPolicy> securityPolicies) {
605+
final Set<SecurityPolicy> securityPolicies,
606+
final long debounceTimeMs) {
482607
builder.checkEquals(
483-
user, password, Paths.get(securityDir), enableAnonymousAccess, securityPolicies);
608+
user,
609+
password,
610+
Paths.get(securityDir),
611+
enableAnonymousAccess,
612+
securityPolicies,
613+
debounceTimeMs);
484614
}
485615
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
8686
private boolean enableAnonymousAccess;
8787
private Set<SecurityPolicy> securityPolicies;
8888
private DefaultTrustListManager trustListManager;
89+
private long debounceTimeMs;
8990

9091
public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
9192
this.tcpBindPort = tcpBindPort;
@@ -123,6 +124,15 @@ public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> security
123124
return this;
124125
}
125126

127+
public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) {
128+
this.debounceTimeMs = debounceTimeMs;
129+
return this;
130+
}
131+
132+
public long getDebounceTimeMs() {
133+
return debounceTimeMs;
134+
}
135+
126136
public OpcUaServer build() throws Exception {
127137
Files.createDirectories(securityDir);
128138
if (!Files.exists(securityDir)) {
@@ -314,7 +324,8 @@ void checkEquals(
314324
final String password,
315325
final Path securityDir,
316326
final boolean enableAnonymousAccess,
317-
final Set<SecurityPolicy> securityPolicies) {
327+
final Set<SecurityPolicy> securityPolicies,
328+
final long debounceTimeMs) {
318329
checkEquals("user", this.user, user);
319330
checkEquals("password", this.password, password);
320331
checkEquals(
@@ -323,6 +334,7 @@ void checkEquals(
323334
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
324335
checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess);
325336
checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
337+
checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs);
326338
}
327339

328340
private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,11 @@ public class PipeSinkConstant {
231231
"connector.opcua.timeout-seconds";
232232
public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 10L;
233233

234+
public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY =
235+
"connector.opcua.debounce-time-ms";
236+
public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY = "sink.opcua.debounce-time-ms";
237+
public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE = 50L;
238+
234239
public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable";
235240
public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable";
236241
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true;

0 commit comments

Comments
 (0)