Skip to content

Commit 59f911b

Browse files
CaideyipiHTHou
authored andcommitted
Fixed the NPE when validating legacy sink (#17153) (#17161)
* npe-fix * unb
1 parent a48faf3 commit 59f911b

2 files changed

Lines changed: 18 additions & 26 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
2424
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
2525
import org.apache.iotdb.db.it.utils.TestUtils;
26+
import org.apache.iotdb.it.env.EnvFactory;
2627
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
2728
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2829
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
@@ -33,6 +34,8 @@
3334
import org.junit.experimental.categories.Category;
3435
import org.junit.runner.RunWith;
3536

37+
import java.sql.Connection;
38+
import java.sql.Statement;
3639
import java.util.Arrays;
3740
import java.util.Collections;
3841
import java.util.HashMap;
@@ -200,30 +203,16 @@ public void testLegacyConnector() throws Exception {
200203
final String receiverIp = receiverDataNode.getIp();
201204
final int receiverPort = receiverDataNode.getPort();
202205

206+
try (final Connection connection = EnvFactory.getEnv().getConnection();
207+
final Statement statement = connection.createStatement()) {
208+
statement.execute(
209+
String.format(
210+
"create pipe testPipe ('sink'='iotdb-legacy-pipe-sink', 'ip'='%s', 'port'='%s', 'version'='1.3')",
211+
receiverIp, receiverPort));
212+
}
213+
203214
try (final SyncConfigNodeIServiceClient client =
204215
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
205-
final Map<String, String> extractorAttributes = new HashMap<>();
206-
final Map<String, String> processorAttributes = new HashMap<>();
207-
final Map<String, String> connectorAttributes = new HashMap<>();
208-
209-
extractorAttributes.put("source.realtime.mode", "log");
210-
211-
connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
212-
connectorAttributes.put("sink.batch.enable", "false");
213-
connectorAttributes.put("sink.ip", receiverIp);
214-
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
215-
216-
// This version does not matter since it's no longer checked by the legacy receiver
217-
connectorAttributes.put("sink.version", "1.3");
218-
219-
final TSStatus status =
220-
client.createPipe(
221-
new TCreatePipeReq("testPipe", connectorAttributes)
222-
.setExtractorAttributes(extractorAttributes)
223-
.setProcessorAttributes(processorAttributes));
224-
225-
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
226-
227216
Assert.assertEquals(
228217
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
229218

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3838
import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
3939
import org.apache.iotdb.db.storageengine.StorageEngine;
40+
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
4041
import org.apache.iotdb.pipe.api.PipeConnector;
4142
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
4243
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -107,7 +108,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
107108
private String syncConnectorVersion;
108109

109110
private String pipeName;
110-
private String databaseName;
111+
private String databaseName = "";
111112
private IoTDBSyncClient client;
112113

113114
private SessionPool sessionPool;
@@ -199,10 +200,12 @@ public void customize(
199200
trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
200201
trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
201202

202-
databaseName =
203+
final DataRegion dataRegion =
203204
StorageEngine.getInstance()
204-
.getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
205-
.getDatabaseName();
205+
.getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
206+
if (Objects.nonNull(dataRegion)) {
207+
databaseName = dataRegion.getDatabaseName();
208+
}
206209
}
207210

208211
@Override

0 commit comments

Comments
 (0)