|
23 | 23 | import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; |
24 | 24 | import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; |
25 | 25 | import org.apache.iotdb.db.it.utils.TestUtils; |
| 26 | +import org.apache.iotdb.it.env.EnvFactory; |
26 | 27 | import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; |
27 | 28 | import org.apache.iotdb.it.framework.IoTDBTestRunner; |
28 | 29 | import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; |
|
35 | 36 | import org.junit.experimental.categories.Category; |
36 | 37 | import org.junit.runner.RunWith; |
37 | 38 |
|
| 39 | +import java.sql.Connection; |
| 40 | +import java.sql.Statement; |
38 | 41 | import java.util.Arrays; |
39 | 42 | import java.util.Collections; |
40 | 43 | import java.util.HashMap; |
@@ -218,30 +221,16 @@ public void testLegacyConnector() throws Exception { |
218 | 221 | final String receiverIp = receiverDataNode.getIp(); |
219 | 222 | final int receiverPort = receiverDataNode.getPort(); |
220 | 223 |
|
| 224 | + try (final Connection connection = EnvFactory.getEnv().getConnection(); |
| 225 | + final Statement statement = connection.createStatement()) { |
| 226 | + statement.execute( |
| 227 | + String.format( |
| 228 | + "create pipe testPipe ('sink'='iotdb-legacy-pipe-sink', 'ip'='%s', 'port'='%s', 'version'='1.3')", |
| 229 | + receiverIp, receiverPort)); |
| 230 | + } |
| 231 | + |
221 | 232 | try (final SyncConfigNodeIServiceClient client = |
222 | 233 | (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { |
223 | | - final Map<String, String> extractorAttributes = new HashMap<>(); |
224 | | - final Map<String, String> processorAttributes = new HashMap<>(); |
225 | | - final Map<String, String> connectorAttributes = new HashMap<>(); |
226 | | - |
227 | | - extractorAttributes.put("source.realtime.mode", "log"); |
228 | | - |
229 | | - connectorAttributes.put("sink", "iotdb-legacy-pipe-sink"); |
230 | | - connectorAttributes.put("sink.batch.enable", "false"); |
231 | | - connectorAttributes.put("sink.ip", receiverIp); |
232 | | - connectorAttributes.put("sink.port", Integer.toString(receiverPort)); |
233 | | - |
234 | | - // This version does not matter since it's no longer checked by the legacy receiver |
235 | | - connectorAttributes.put("sink.version", "1.3"); |
236 | | - |
237 | | - final TSStatus status = |
238 | | - client.createPipe( |
239 | | - new TCreatePipeReq("testPipe", connectorAttributes) |
240 | | - .setExtractorAttributes(extractorAttributes) |
241 | | - .setProcessorAttributes(processorAttributes)); |
242 | | - |
243 | | - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); |
244 | | - |
245 | 234 | Assert.assertEquals( |
246 | 235 | TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); |
247 | 236 |
|
|
0 commit comments