Skip to content

Commit a48faf3

Browse files
CaideyipiHTHou
authored andcommitted
[To dev/1.3] Pipe: Banned the illegal names in pipe and pipe plugins (#17145) (#17156)
* Pipe: Banned the illegal names in pipe and pipe plugins (#17145) * fix * fix * fix * sp-linux * fix * fix
1 parent d2545fb commit a48faf3

5 files changed

Lines changed: 179 additions & 15 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeSyntaxIT.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
3030
import org.apache.iotdb.rpc.TSStatusCode;
3131

32+
import org.apache.commons.lang3.SystemUtils;
3233
import org.junit.Assert;
3334
import org.junit.Test;
3435
import org.junit.experimental.categories.Category;
@@ -38,6 +39,7 @@
3839
import java.sql.Connection;
3940
import java.sql.SQLException;
4041
import java.sql.Statement;
42+
import java.util.ArrayList;
4143
import java.util.Arrays;
4244
import java.util.HashMap;
4345
import java.util.List;
@@ -316,6 +318,68 @@ public void testInvalidParameter() throws Exception {
316318
}
317319
}
318320

321+
@Test
322+
public void testDirectoryErrors() throws SQLException {
323+
try (final Connection connection = senderEnv.getConnection();
324+
final Statement statement = connection.createStatement()) {
325+
List<String> wrongDirs = Arrays.asList(".", "..", "./hackYou", ".\\hackYouTwice");
326+
if (SystemUtils.IS_OS_WINDOWS) {
327+
wrongDirs = new ArrayList<>(wrongDirs);
328+
wrongDirs.add("BombWindows/:*?");
329+
wrongDirs.add("AUX");
330+
}
331+
for (final String name : wrongDirs) {
332+
testDirectoryError(name, statement);
333+
}
334+
}
335+
}
336+
337+
private void testDirectoryError(final String wrongDir, final Statement statement) {
338+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
339+
340+
final String receiverIp = receiverDataNode.getIp();
341+
final int receiverPort = receiverDataNode.getPort();
342+
343+
try {
344+
statement.execute(
345+
String.format(
346+
"create pipe `"
347+
+ wrongDir
348+
+ "` with source ()"
349+
+ " with processor ()"
350+
+ " with sink ("
351+
+ "'sink'='invalid-param',"
352+
+ "'sink.ip'='%s',"
353+
+ "'sink.port'='%s',"
354+
+ "'sink.batch.enable'='false')",
355+
receiverIp,
356+
receiverPort));
357+
fail();
358+
} catch (final Exception ignore) {
359+
// Expected
360+
}
361+
362+
try {
363+
statement.execute(
364+
String.format(
365+
"create pipePlugin `"
366+
+ wrongDir
367+
+ "` as 'org.apache.iotdb.db.pipe.example.TestProcessor' USING URI '%s'",
368+
new File(
369+
System.getProperty("user.dir")
370+
+ File.separator
371+
+ "target"
372+
+ File.separator
373+
+ "test-classes"
374+
+ File.separator)
375+
.toURI()
376+
+ "PipePlugin.jar"));
377+
fail();
378+
} catch (final SQLException e) {
379+
Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe plugin"));
380+
}
381+
}
382+
319383
@Test
320384
public void testBrackets() throws Exception {
321385
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.conf.CommonDescriptor;
2525
import org.apache.iotdb.commons.exception.IllegalPathException;
2626
import org.apache.iotdb.commons.path.PartialPath;
27+
import org.apache.iotdb.commons.utils.FileUtils;
2728
import org.apache.iotdb.db.auth.AuthorityChecker;
2829
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2930
import org.apache.iotdb.db.pipe.sink.payload.legacy.PipeData;
@@ -52,6 +53,7 @@
5253
import java.nio.ByteBuffer;
5354
import java.time.ZoneId;
5455
import java.util.Map;
56+
import java.util.Objects;
5557
import java.util.concurrent.ConcurrentHashMap;
5658
import java.util.concurrent.atomic.AtomicLong;
5759

@@ -124,7 +126,8 @@ public TSStatus handshake(
124126
}
125127

126128
private boolean validatePipeName(final TSyncIdentityInfo info) {
127-
return info.isSetPipeName() && !info.getPipeName().contains(File.separator);
129+
return info.isSetPipeName()
130+
&& Objects.isNull(FileUtils.getIllegalError4Directory(info.getPipeName()));
128131
}
129132

130133
private void createConnection(final SyncIdentityInfo identityInfo) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
6363
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
6464
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
65+
import org.apache.iotdb.commons.utils.FileUtils;
6566
import org.apache.iotdb.commons.utils.TimePartitionUtils;
6667
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
6768
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
@@ -290,6 +291,7 @@
290291
import java.util.HashSet;
291292
import java.util.List;
292293
import java.util.Map;
294+
import java.util.Objects;
293295
import java.util.Set;
294296
import java.util.TreeMap;
295297
import java.util.stream.Collectors;
@@ -813,6 +815,15 @@ public SettableFuture<ConfigTaskResult> createPipePlugin(
813815
final String className = createPipePluginStatement.getClassName();
814816
final String uriString = createPipePluginStatement.getUriString();
815817

818+
final String pathError = FileUtils.getIllegalError4Directory(pluginName);
819+
if (Objects.nonNull(pathError)) {
820+
future.setException(
821+
new IoTDBException(
822+
String.format("Failed to create pipe plugin %s. " + pathError, pluginName),
823+
TSStatusCode.CREATE_PIPE_PLUGIN_ERROR.getStatusCode()));
824+
return future;
825+
}
826+
816827
if (uriString == null || uriString.isEmpty()) {
817828
future.setException(
818829
new IoTDBException(
@@ -1774,26 +1785,36 @@ public SettableFuture<ConfigTaskResult> unsetSchemaTemplate(
17741785
}
17751786

17761787
@Override
1777-
public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPipeStatement) {
1778-
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
1788+
public SettableFuture<ConfigTaskResult> createPipe(
1789+
final CreatePipeStatement createPipeStatement) {
1790+
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
17791791

1780-
// Validate pipe name
1781-
if (createPipeStatement.getPipeName().startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
1782-
String exceptionMessage =
1783-
String.format(
1784-
"Failed to create pipe %s, pipe name starting with \"%s\" are not allowed to be created.",
1785-
createPipeStatement.getPipeName(), PipeStaticMeta.SYSTEM_PIPE_PREFIX);
1786-
LOGGER.warn(exceptionMessage);
1792+
// Verify that Pipe is disabled if TSFile encryption is enabled
1793+
final String pipeName = createPipeStatement.getPipeName();
1794+
if (pipeName.startsWith(PipeStaticMeta.SYSTEM_PIPE_PREFIX)) {
17871795
future.setException(
1788-
new IoTDBException(exceptionMessage, TSStatusCode.PIPE_ERROR.getStatusCode()));
1796+
new IoTDBException(
1797+
String.format(
1798+
"Failed to create pipe %s, pipe name starting with \"%s\" are not allowed to be created.",
1799+
pipeName, PipeStaticMeta.SYSTEM_PIPE_PREFIX),
1800+
TSStatusCode.PIPE_ERROR.getStatusCode()));
1801+
return future;
1802+
}
1803+
1804+
final String pathError = FileUtils.getIllegalError4Directory(pipeName);
1805+
if (Objects.nonNull(pathError)) {
1806+
future.setException(
1807+
new IoTDBException(
1808+
String.format("Failed to create pipe %s, " + pathError, pipeName),
1809+
TSStatusCode.PIPE_ERROR.getStatusCode()));
17891810
return future;
17901811
}
17911812

17921813
// Validate pipe plugin before creation
17931814
try {
17941815
PipeDataNodeAgent.plugin()
17951816
.validate(
1796-
createPipeStatement.getPipeName(),
1817+
pipeName,
17971818
createPipeStatement.getExtractorAttributes(),
17981819
createPipeStatement.getProcessorAttributes(),
17991820
createPipeStatement.getConnectorAttributes());
@@ -1816,7 +1837,7 @@ public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPip
18161837
final TCreatePipeReq realtimeReq =
18171838
new TCreatePipeReq()
18181839
// Append suffix to the pipeline name for real-time data
1819-
.setPipeName(createPipeStatement.getPipeName() + "_realtime")
1840+
.setPipeName(pipeName + "_realtime")
18201841
// NOTE: set if not exists always to true to handle partial failure
18211842
.setIfNotExistsCondition(true)
18221843
// Use extractor parameters for real-time data
@@ -1844,7 +1865,7 @@ public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPip
18441865
final TCreatePipeReq historyReq =
18451866
new TCreatePipeReq()
18461867
// Append suffix to the pipeline name for historical data
1847-
.setPipeName(createPipeStatement.getPipeName() + "_history")
1868+
.setPipeName(pipeName + "_history")
18481869
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
18491870
// Use source parameters for historical data
18501871
.setExtractorAttributes(
@@ -1887,7 +1908,7 @@ public SettableFuture<ConfigTaskResult> createPipe(CreatePipeStatement createPip
18871908
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
18881909
TCreatePipeReq req =
18891910
new TCreatePipeReq()
1890-
.setPipeName(createPipeStatement.getPipeName())
1911+
.setPipeName(pipeName)
18911912
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
18921913
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
18931914
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class FileUtils {
5454
"Renamed file {} to {} because it already exists in the target directory: {}";
5555
private static final String COPY_FILE_MESSAGE =
5656
"Copy file {} to {} because it already exists in the target directory: {}";
57+
private static final String ILLEGAL_PATH_MESSAGE =
58+
"The path cannot be '.', '..', './' or '.\\'. ";
5759

5860
private FileUtils() {}
5961

@@ -552,4 +554,14 @@ private static void copyFileRename(final File sourceFile, final File targetFile)
552554
targetFile,
553555
targetFile.getParentFile().getAbsolutePath());
554556
}
557+
558+
public static String getIllegalError4Directory(final String path) {
559+
if (path.equals(".") || path.equals("..") || path.contains("./") || path.contains(".\\")) {
560+
return ILLEGAL_PATH_MESSAGE;
561+
}
562+
if (!WindowsOSUtils.isLegalPathSegment4Windows(path)) {
563+
return WindowsOSUtils.OS_SEGMENT_ERROR;
564+
}
565+
return null;
566+
}
555567
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.commons.utils;
21+
22+
import org.apache.commons.lang3.SystemUtils;
23+
24+
import java.util.Arrays;
25+
import java.util.HashSet;
26+
import java.util.Set;
27+
28+
public class WindowsOSUtils {
29+
private static final String ILLEGAL_WINDOWS_CHARS = "\\/:*?\"<>|";
30+
private static final Set<String> ILLEGAL_WINDOWS_NAMES =
31+
new HashSet<>(Arrays.asList("CON", "PRN", "AUX", "NUL", "COM1-COM9, LPT1-LPT9"));
32+
33+
static {
34+
for (int i = 0; i < 10; ++i) {
35+
ILLEGAL_WINDOWS_NAMES.add("COM" + i);
36+
ILLEGAL_WINDOWS_NAMES.add("LPT" + i);
37+
}
38+
}
39+
40+
public static final String OS_SEGMENT_ERROR =
41+
String.format(
42+
"In Windows System, the path shall not contains %s, equals one of %s, or ends with '.' or ' '.",
43+
ILLEGAL_WINDOWS_CHARS, ILLEGAL_WINDOWS_NAMES);
44+
45+
public static boolean isLegalPathSegment4Windows(final String pathSegment) {
46+
if (!SystemUtils.IS_OS_WINDOWS) {
47+
return true;
48+
}
49+
for (final char illegalChar : ILLEGAL_WINDOWS_CHARS.toCharArray()) {
50+
if (pathSegment.indexOf(illegalChar) != -1) {
51+
return false;
52+
}
53+
}
54+
if (pathSegment.endsWith(".") || pathSegment.endsWith(" ")) {
55+
return false;
56+
}
57+
for (final String illegalName : ILLEGAL_WINDOWS_NAMES) {
58+
if (pathSegment.equalsIgnoreCase(illegalName)) {
59+
return false;
60+
}
61+
}
62+
return true;
63+
}
64+
}

0 commit comments

Comments
 (0)