Skip to content

Commit 68fc992

Browse files
authored
Pipe: Fixed some potential OPC UA problems & Added IT for `` in opc (#17393)
* fix * Update OpcUaKeyStoreLoader.java * fix * remove-side * opti
1 parent b85ac14 commit 68fc992

4 files changed

Lines changed: 40 additions & 22 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void testOPCUAServerSink() throws Exception {
8989
try (final SyncConfigNodeIServiceClient client =
9090
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
9191

92-
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
92+
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, `1`) values (1, 1)", null);
9393

9494
final Map<String, String> sinkAttributes = new HashMap<>();
9595

@@ -127,7 +127,9 @@ public void testOPCUAServerSink() throws Exception {
127127
}
128128
}
129129
value =
130-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
130+
opcUaClient
131+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/`1`"))
132+
.get();
131133
Assert.assertEquals(new Variant(1.0), value.getValue());
132134
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
133135
opcUaClient.disconnect().get();
@@ -138,12 +140,12 @@ public void testOPCUAServerSink() throws Exception {
138140
TestUtils.executeNonQueries(
139141
env,
140142
Arrays.asList(
141-
"create aligned timeSeries root.db.opc(value double, quality boolean, other int32)",
142-
"create aligned timeSeries root.db.opc1(value double, quality boolean, other int32)",
143-
"create aligned timeSeries root.db.opc2(value double, quality boolean, other int32)",
144-
"insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)",
145-
"insert into root.db.opc1(time, value, quality, other) values (0, 0, true, 1)",
146-
"insert into root.db.opc2(time, value, quality, other) values (0, 0, true, 1)"),
143+
"create aligned timeSeries root.db.`123`(value double, quality boolean, other int32)",
144+
"create aligned timeSeries root.db.`1231`(value double, quality boolean, other int32)",
145+
"create aligned timeSeries root.db.`1232`(value double, quality boolean, other int32)",
146+
"insert into root.db.`123`(time, value, quality, other) values (0, 0, true, 1)",
147+
"insert into root.db.`1231`(time, value, quality, other) values (0, 0, true, 1)",
148+
"insert into root.db.`1232`(time, value, quality, other) values (0, 0, true, 1)"),
147149
null);
148150

149151
while (true) {
@@ -183,31 +185,33 @@ public void testOPCUAServerSink() throws Exception {
183185
TestUtils.executeNonQueries(
184186
env,
185187
Arrays.asList(
186-
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
187-
"insert into root.db.opc1(time, value, quality, other) values (1, 1, false, 1)",
188-
"insert into root.db.opc2(time, value, quality, other) values (1, 1, false, 1)"),
188+
"insert into root.db.`123`(time, value, quality, other) values (1, 1, false, 1)",
189+
"insert into root.db.`1231`(time, value, quality, other) values (1, 1, false, 1)",
190+
"insert into root.db.`1232`(time, value, quality, other) values (1, 1, false, 1)"),
189191
null);
190192

191193
long startTime = System.currentTimeMillis();
192194
while (true) {
193195
try {
194196
value =
195-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
197+
opcUaClient
198+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`123`"))
199+
.get();
196200
Assert.assertEquals(new Variant(1.0), value.getValue());
197201
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
198202
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
199203

200204
value =
201205
opcUaClient
202-
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc1"))
206+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`1231`"))
203207
.get();
204208
Assert.assertEquals(new Variant(1.0), value.getValue());
205209
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
206210
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
207211

208212
value =
209213
opcUaClient
210-
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc2"))
214+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`1232`"))
211215
.get();
212216
Assert.assertEquals(new Variant(1.0), value.getValue());
213217
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
@@ -221,14 +225,16 @@ public void testOPCUAServerSink() throws Exception {
221225
}
222226

223227
TestUtils.executeNonQuery(
224-
env, "insert into root.db.opc(time, quality) values (2, true)", null);
225-
TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);
228+
env, "insert into root.db.`123`(time, quality) values (2, true)", null);
229+
TestUtils.executeNonQuery(env, "insert into root.db.`123`(time, value) values (2, 2)", null);
226230

227231
startTime = System.currentTimeMillis();
228232
while (true) {
229233
try {
230234
value =
231-
opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
235+
opcUaClient
236+
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/`123`"))
237+
.get();
232238
Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
233239
Assert.assertEquals(new Variant(2.0), value.getValue());
234240
Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
import java.io.File;
3232
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.io.OutputStream;
3335
import java.nio.file.Files;
3436
import java.nio.file.Path;
3537
import java.security.Key;
@@ -61,8 +63,8 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
6163
LOGGER.info("Loading KeyStore at {}", serverKeyStore);
6264

6365
if (serverKeyStore.exists()) {
64-
try {
65-
keyStore.load(Files.newInputStream(serverKeyStore.toPath()), password);
66+
try (InputStream is = Files.newInputStream(serverKeyStore.toPath())) {
67+
keyStore.load(is, password);
6668
} catch (final IOException e) {
6769
LOGGER.warn("Load keyStore failed, the existing keyStore may be stale, re-constructing...");
6870
FileUtils.deleteFileOrDirectory(serverKeyStore);
@@ -105,7 +107,9 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
105107

106108
keyStore.setKeyEntry(
107109
SERVER_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
108-
keyStore.store(Files.newOutputStream(serverKeyStore.toPath()), password);
110+
try (final OutputStream os = Files.newOutputStream(serverKeyStore.toPath())) {
111+
keyStore.store(os, password);
112+
}
109113
}
110114

111115
final Key serverPrivateKey = keyStore.getKey(SERVER_ALIAS, password);
@@ -114,6 +118,10 @@ OpcUaKeyStoreLoader load(final Path baseDir, final char[] password) throws Excep
114118

115119
final PublicKey serverPublicKey = serverCertificate.getPublicKey();
116120
serverKeyPair = new KeyPair(serverPublicKey, (PrivateKey) serverPrivateKey);
121+
} else {
122+
throw new Exception(
123+
"Invalid keyStore, the serverPrivateKey is "
124+
+ (serverPrivateKey != null ? serverPrivateKey.getClass().getSimpleName() : "null"));
117125
}
118126

119127
return this;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,11 @@ public void startup() {
9595

9696
@Override
9797
public void shutdown() {
98-
getServer().shutdown();
99-
builder.close();
98+
try {
99+
getServer().shutdown();
100+
} finally {
101+
builder.close();
102+
}
100103
}
101104
});
102105
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymous
117117
return this;
118118
}
119119

120+
// Must be a modifiable set.
120121
public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> securityPolicies) {
121122
this.securityPolicies = securityPolicies;
122123
return this;

0 commit comments

Comments
 (0)