Skip to content

Commit 85ecbd6

Browse files
authored
Pipe: Implemented the login check in customize method & Added mosaic to subtask string in logger & Optimized the error code of illegal pipe / pipe plugin name string (#17197)
* wb * expr * login * login * fix * fix * semantic * mosaic * fix * fix * fix * fix * fix * lo * if * fix * fix * fix * fix * fix * may-fix * fix * part * fix * pwce * fix * initial * partial * bug-fix * fix * .. * st@ong * fix * fb * 701 * bug-fix * bug-fix * further * fix * fix
1 parent 8782662 commit 85ecbd6

50 files changed

Lines changed: 643 additions & 253 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ private void testDirectoryError(final String wrongDir, final Statement statement
396396
+ "PipePlugin.jar"));
397397
fail();
398398
} catch (final SQLException e) {
399-
Assert.assertTrue(e.getMessage().contains("1600: Failed to create pipe plugin"));
399+
Assert.assertTrue(e.getMessage().contains("701: Failed to create pipe plugin"));
400400
}
401401
}
402402

@@ -860,7 +860,7 @@ public void testPipePluginValidation() {
860860
fail();
861861
} catch (final SQLException e) {
862862
Assert.assertEquals(
863-
"1603: Failed to get executable for PipePlugin TestProcessor, please check the URI.",
863+
"701: Failed to get executable for PipePlugin TestProcessor, please check the URI.",
864864
e.getMessage());
865865
}
866866
try {

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,7 @@ public void testSourcePermission() {
311311
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
312312
fail("Shall fail if password is wrong.");
313313
} catch (final SQLException e) {
314-
Assert.assertTrue(
315-
e.getMessage().contains("Fail to CREATE_PIPE because Authentication failed."));
314+
Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage());
316315
}
317316

318317
// Use current session, user is root
@@ -506,4 +505,129 @@ public void testSourcePermission() {
506505
fail(e.getMessage());
507506
}
508507
}
508+
509+
@Test
510+
public void testIllegalPassword() throws Exception {
511+
TestUtils.executeNonQueries(
512+
senderEnv,
513+
Arrays.asList(
514+
"create user `thulab` 'ST@ongpasswd123456'",
515+
"create role `admin`",
516+
"grant role `admin` to `thulab`",
517+
"grant WRITE, READ, SYSTEM, SECURITY on root.** to role `admin`"),
518+
null);
519+
520+
TestUtils.executeNonQuery(
521+
senderEnv,
522+
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)");
523+
TestUtils.executeNonQuery(
524+
receiverEnv,
525+
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)");
526+
527+
Connection connection = senderEnv.getConnection();
528+
Statement statement = connection.createStatement();
529+
try {
530+
statement.execute(
531+
String.format(
532+
"create pipe a2b"
533+
+ " with source ("
534+
+ "'user'='thulab'"
535+
+ ", 'password'='passwd')"
536+
+ " with sink ("
537+
+ "'node-urls'='%s')",
538+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
539+
fail();
540+
} catch (final Exception e) {
541+
Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage());
542+
}
543+
544+
try {
545+
statement.execute(
546+
"create pipe a2b ('sink'='write-back-sink', 'user'='thulab', 'password'='passwd')");
547+
fail();
548+
} catch (final Exception e) {
549+
Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage());
550+
}
551+
552+
statement.execute(
553+
String.format(
554+
"create pipe a2b"
555+
+ " with source ("
556+
+ "'user'='thulab'"
557+
+ ", 'password'='ST@ongpasswd123456')"
558+
+ " with sink ("
559+
+ "'node-urls'='%s')",
560+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
561+
562+
TestUtils.executeNonQuery(
563+
senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)");
564+
565+
TestUtils.assertDataEventuallyOnEnv(
566+
receiverEnv,
567+
"select count(pressure) from root.vehicle.plane",
568+
"count(root.vehicle.plane.pressure),",
569+
Collections.singleton("1,"));
570+
571+
statement.execute("alter user thulab set password 'newST@ongPassword'");
572+
573+
try {
574+
TestUtils.restartCluster(senderEnv);
575+
} catch (final Throwable e) {
576+
e.printStackTrace();
577+
return;
578+
}
579+
580+
connection = senderEnv.getConnection();
581+
statement = connection.createStatement();
582+
TestUtils.executeNonQuery(
583+
senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)");
584+
585+
TestUtils.assertDataAlwaysOnEnv(
586+
receiverEnv,
587+
"select count(pressure) from root.vehicle.plane",
588+
"count(root.vehicle.plane.pressure),",
589+
Collections.singleton("1,"));
590+
591+
try {
592+
statement.execute("alter pipe a2b modify source ('password'='fake')");
593+
} catch (final SQLException e) {
594+
Assert.assertEquals("801: Failed to check password for pipe a2b.", e.getMessage());
595+
}
596+
597+
statement.execute("alter pipe a2b modify source ('password'='newST@ongPassword')");
598+
599+
// Test empty alter
600+
statement.execute("alter pipe a2b");
601+
602+
TestUtils.assertDataEventuallyOnEnv(
603+
receiverEnv,
604+
"select count(pressure) from root.vehicle.plane",
605+
"count(root.vehicle.plane.pressure),",
606+
Collections.singleton("2,"));
607+
608+
statement.execute("alter user thulab set password 'anotherST@ongPassword'");
609+
610+
try {
611+
TestUtils.restartCluster(senderEnv);
612+
} catch (final Throwable e) {
613+
e.printStackTrace();
614+
return;
615+
}
616+
617+
connection = senderEnv.getConnection();
618+
statement = connection.createStatement();
619+
TestUtils.executeNonQuery(
620+
senderEnv, "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)");
621+
statement.execute("alter user thulab set password 'newST@ongPassword'");
622+
statement.execute("alter pipe a2b");
623+
624+
TestUtils.assertDataEventuallyOnEnv(
625+
receiverEnv,
626+
"select count(pressure) from root.vehicle.plane",
627+
"count(root.vehicle.plane.pressure),",
628+
Collections.singleton("3,"));
629+
630+
statement.close();
631+
connection.close();
632+
}
509633
}

iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame
371371
return new PipeParameters(thisMap);
372372
}
373373

374-
private static class KeyReducer {
374+
public static class KeyReducer {
375375

376376
private static final Set<String> FIRST_PREFIXES = new HashSet<>();
377377
private static final Set<String> SECOND_PREFIXES = new HashSet<>();
@@ -399,7 +399,7 @@ static String shallowReduce(String key) {
399399
return key;
400400
}
401401

402-
static String reduce(String key) {
402+
public static String reduce(String key) {
403403
if (key == null) {
404404
return null;
405405
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.api.exception;
21+
22+
public class PipePasswordCheckException extends PipeException {
23+
public PipePasswordCheckException(final String message) {
24+
super(message);
25+
}
26+
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ public enum TSStatusCode {
265265
CREATE_PIPE_PLUGIN_ERROR(1600),
266266
DROP_PIPE_PLUGIN_ERROR(1601),
267267
PIPE_PLUGIN_LOAD_CLASS_ERROR(1602),
268+
@Deprecated
268269
PIPE_PLUGIN_DOWNLOAD_ERROR(1603),
269270
CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR(1604),
270271
DROP_PIPE_PLUGIN_ON_DATANODE_ERROR(1605),

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,10 +1332,11 @@ public DataSet queryPermission(final AuthorPlan authorPlan) {
13321332
}
13331333

13341334
@Override
1335-
public TPermissionInfoResp login(String username, String password) {
1335+
public TPermissionInfoResp login(
1336+
final String username, final String password, final boolean useEncryptedPassword) {
13361337
TSStatus status = confirmLeader();
13371338
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
1338-
return permissionManager.login(username, password);
1339+
return permissionManager.login(username, password, useEncryptedPassword);
13391340
} else {
13401341
TPermissionInfoResp resp = AuthUtils.generateEmptyPermissionInfoResp();
13411342
resp.setStatus(status);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,8 @@ TDataPartitionTableResp getOrCreateDataPartition(
501501
DataSet queryPermission(final AuthorPlan authorPlan);
502502

503503
/** login. */
504-
TPermissionInfoResp login(String username, String password);
504+
TPermissionInfoResp login(
505+
final String username, final String password, final boolean useEncryptedPassword);
505506

506507
/** Check User Privileges. */
507508
TPermissionInfoResp checkUserPrivileges(String username, PrivilegeUnion union);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,9 @@ protected ConsensusManager getConsensusManager() {
112112
return configManager.getConsensusManager();
113113
}
114114

115-
public TPermissionInfoResp login(String username, String password) {
116-
return authorInfo.login(username, password);
115+
public TPermissionInfoResp login(
116+
final String username, final String password, final boolean useEncryptedPassword) {
117+
return authorInfo.login(username, password, useEncryptedPassword);
117118
}
118119

119120
public String login4Pipe(final String userName, final String password) {

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1225,7 +1225,7 @@ protected boolean shouldLogin() {
12251225

12261226
@Override
12271227
protected TSStatus login() {
1228-
return configManager.login(username, password).getStatus();
1228+
return configManager.login(username, password, false).getStatus();
12291229
}
12301230

12311231
@Override

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,11 @@
5555
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
5656
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
5757
import org.apache.iotdb.pipe.api.exception.PipeException;
58+
import org.apache.iotdb.pipe.api.exception.PipePasswordCheckException;
5859
import org.apache.iotdb.rpc.TSStatusCode;
5960

61+
import javax.annotation.Nonnull;
62+
6063
import java.io.IOException;
6164
import java.nio.file.Paths;
6265
import java.util.Collections;
@@ -108,6 +111,20 @@ public void customize(
108111
PipeConfigNodeRemainingTimeMetrics.getInstance().register(this);
109112
}
110113

114+
@Override
115+
protected void login(final @Nonnull String password) {
116+
if (ConfigNode.getInstance()
117+
.getConfigManager()
118+
.getPermissionManager()
119+
.login(userName, password, true)
120+
.getStatus()
121+
.getCode()
122+
!= TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
123+
throw new PipePasswordCheckException(
124+
String.format("Failed to check password for pipe %s.", pipeName));
125+
}
126+
}
127+
111128
@Override
112129
protected AbstractPipeListeningQueue getListeningQueue() {
113130
return PipeConfigNodeAgent.runtime().listener();

0 commit comments

Comments
 (0)