Skip to content

Commit 9f790e1

Browse files
authored
Pipe: Fixed the initial value bug for opc value + quality (#17345)
* add * o * fix
1 parent 5a83067 commit 9f790e1

2 files changed

Lines changed: 60 additions & 44 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 59 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ private void transferTabletRowForClientServerModel(
250250
final String currentFolder = currentStr.toString();
251251

252252
StatusCode currentQuality = sink.getDefaultQuality();
253-
UaVariableNode valueNode = null;
254253
Object value = null;
254+
TSDataType dataType = null;
255255
long timestamp = 0;
256256

257257
for (int i = 0; i < measurementSchemas.size(); ++i) {
@@ -274,9 +274,6 @@ private void transferTabletRowForClientServerModel(
274274
"When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\"");
275275
continue;
276276
}
277-
final String nodeName =
278-
Objects.isNull(sink.getValueName()) ? name : segments[segments.length - 1];
279-
final NodeId nodeId = newNodeId(currentFolder + nodeName);
280277
final UaVariableNode measurementNode;
281278
final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
282279
final DataValue dataValue =
@@ -286,53 +283,28 @@ private void transferTabletRowForClientServerModel(
286283
new DateTime(utcTimestamp),
287284
new DateTime());
288285

289-
if (!getNodeManager().containsNode(nodeId)) {
290-
measurementNode =
291-
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
292-
.setNodeId(nodeId)
293-
.setAccessLevel(AccessLevel.READ_WRITE)
294-
.setUserAccessLevel(AccessLevel.READ_ONLY)
295-
.setBrowseName(newQualifiedName(nodeName))
296-
.setDisplayName(LocalizedText.english(nodeName))
297-
.setDataType(convertToOpcDataType(type))
298-
.setTypeDefinition(Identifiers.BaseDataVariableType)
299-
.setValue(dataValue)
300-
.build();
301-
getNodeManager().addNode(measurementNode);
302-
if (Objects.nonNull(folderNode)) {
303-
folderNode.addReference(
304-
new Reference(
305-
folderNode.getNodeId(), Identifiers.Organizes, nodeId.expanded(), true));
306-
} else {
307-
measurementNode.addReference(
308-
new Reference(
309-
nodeId, Identifiers.Organizes, Identifiers.ObjectsFolder.expanded(), false));
310-
}
311-
} else {
312-
// This must exist
313-
measurementNode =
314-
(UaVariableNode)
315-
getNodeManager()
316-
.getNode(nodeId)
317-
.orElseThrow(
318-
() ->
319-
new PipeRuntimeCriticalException(
320-
String.format("The Node %s does not exist.", nodeId)));
321-
}
322-
323286
if (Objects.isNull(sink.getValueName())) {
287+
measurementNode = addNode(name, currentFolder, folderNode, dataValue, type);
324288
if (Objects.isNull(measurementNode.getValue())
325289
|| Objects.isNull(measurementNode.getValue().getSourceTime())
326290
|| measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) {
327291
measurementNode.setValue(dataValue);
328292
}
329293
} else {
330-
valueNode = measurementNode;
331294
value = values.get(i);
332295
timestamp = utcTimestamp;
296+
dataType = type;
333297
}
334298
}
335-
if (Objects.nonNull(valueNode)) {
299+
if (Objects.nonNull(value)) {
300+
final UaVariableNode valueNode =
301+
addNode(
302+
segments[segments.length - 1],
303+
currentFolder,
304+
folderNode,
305+
new DataValue(
306+
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()),
307+
dataType);
336308
if (Objects.isNull(valueNode.getValue())
337309
|| Objects.isNull(valueNode.getValue().getSourceTime())
338310
|| valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
@@ -343,6 +315,50 @@ private void transferTabletRowForClientServerModel(
343315
}
344316
}
345317

318+
private UaVariableNode addNode(
319+
final String nodeName,
320+
final String currentFolder,
321+
final UaNode folderNode,
322+
final DataValue dataValue,
323+
final TSDataType type) {
324+
final NodeId nodeId = newNodeId(currentFolder + nodeName);
325+
final UaVariableNode measurementNode;
326+
327+
if (!getNodeManager().containsNode(nodeId)) {
328+
measurementNode =
329+
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
330+
.setNodeId(nodeId)
331+
.setAccessLevel(AccessLevel.READ_WRITE)
332+
.setUserAccessLevel(AccessLevel.READ_ONLY)
333+
.setBrowseName(newQualifiedName(nodeName))
334+
.setDisplayName(LocalizedText.english(nodeName))
335+
.setDataType(convertToOpcDataType(type))
336+
.setTypeDefinition(Identifiers.BaseDataVariableType)
337+
.setValue(dataValue)
338+
.build();
339+
getNodeManager().addNode(measurementNode);
340+
if (Objects.nonNull(folderNode)) {
341+
folderNode.addReference(
342+
new Reference(folderNode.getNodeId(), Identifiers.Organizes, nodeId.expanded(), true));
343+
} else {
344+
measurementNode.addReference(
345+
new Reference(
346+
nodeId, Identifiers.Organizes, Identifiers.ObjectsFolder.expanded(), false));
347+
}
348+
} else {
349+
// This must exist
350+
measurementNode =
351+
(UaVariableNode)
352+
getNodeManager()
353+
.getNode(nodeId)
354+
.orElseThrow(
355+
() ->
356+
new PipeRuntimeCriticalException(
357+
String.format("The Node %s does not exist.", nodeId)));
358+
}
359+
return measurementNode;
360+
}
361+
346362
private static Object getTabletObjectValue4Opc(
347363
final Object column, final int rowIndex, final TSDataType type) {
348364
switch (type) {
@@ -392,13 +408,13 @@ private void transferTabletForPubSubModel(
392408
if (isTableModel) {
393409
sourceNameList = new ArrayList<>(tablet.getRowSize());
394410
for (int i = 0; i < tablet.getRowSize(); ++i) {
395-
final StringBuilder idBuilder = new StringBuilder(sink.getDatabaseName());
411+
final StringBuilder tagBuilder = new StringBuilder(sink.getDatabaseName());
396412
for (final Object segment : tablet.getDeviceID(i).getSegments()) {
397-
idBuilder
413+
tagBuilder
398414
.append(TsFileConstant.PATH_SEPARATOR)
399415
.append(Objects.isNull(segment) ? sink.getPlaceHolder4NullTag() : segment);
400416
}
401-
sourceNameList.add(idBuilder.toString());
417+
sourceNameList.add(tagBuilder.toString());
402418
}
403419
}
404420

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public boolean isClosed() {
109109
private void mayPrintExceedingLog() {
110110
final long remainingCapacity = ringBuffer.remainingCapacity();
111111
final long bufferSize = ringBuffer.getBufferSize();
112-
if ((double) remainingCapacity / bufferSize >= 0.5
112+
if ((double) remainingCapacity / bufferSize <= 0.5
113113
&& System.currentTimeMillis()
114114
- PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds()
115115
>= lastLogTime) {

0 commit comments

Comments
 (0)