Skip to content

Commit 5f9cb66

Browse files
CaideyipiHTHou
andauthored
Fixed the REST partial insert & Pipe: Added partial insert IT (#17340)
* try * ' * no-bomb * fix * Revert "side-effect" This reverts commit cd104ea. * fix * fix * fix bug * fix bug * ignore flaky test --------- Co-authored-by: HTHou <haonan@apache.org>
1 parent 936af64 commit 5f9cb66

3 files changed

Lines changed: 176 additions & 4 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
2222
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
2324
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
2425
import org.apache.iotdb.it.framework.IoTDBTestRunner;
2526
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -41,19 +42,28 @@
4142
import org.junit.After;
4243
import org.junit.Assert;
4344
import org.junit.Before;
45+
import org.junit.Ignore;
4446
import org.junit.Test;
4547
import org.junit.experimental.categories.Category;
4648
import org.junit.runner.RunWith;
4749

4850
import java.io.IOException;
4951
import java.nio.charset.Charset;
5052
import java.nio.charset.StandardCharsets;
53+
import java.sql.Connection;
54+
import java.sql.ResultSet;
55+
import java.sql.ResultSetMetaData;
56+
import java.sql.SQLException;
57+
import java.sql.Statement;
5158
import java.util.ArrayList;
5259
import java.util.Base64;
5360
import java.util.List;
5461
import java.util.Map;
62+
import java.util.concurrent.TimeUnit;
5563

5664
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.COLUMN_TTL;
65+
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
66+
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
5767
import static org.junit.Assert.assertEquals;
5868
import static org.junit.Assert.assertTrue;
5969
import static org.junit.Assert.fail;
@@ -79,7 +89,7 @@ public void tearDown() throws Exception {
7989
EnvFactory.getEnv().cleanClusterEnvironment();
8090
}
8191

82-
private String getAuthorization(String username, String password) {
92+
public static String getAuthorization(String username, String password) {
8393
return Base64.getEncoder()
8494
.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8));
8595
}
@@ -129,7 +139,7 @@ public void ping() {
129139
}
130140
}
131141

132-
private HttpPost getHttpPost(String url) {
142+
public static HttpPost getHttpPost(String url) {
133143
HttpPost httpPost = new HttpPost(url);
134144
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
135145
httpPost.setHeader("Accept", "application/json");
@@ -243,6 +253,101 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http
243253
}
244254
}
245255

256+
@Ignore // Flaky test
257+
@Test
258+
public void errorInsertRecords() throws SQLException, InterruptedException {
259+
SimpleEnv simpleEnv = new SimpleEnv();
260+
simpleEnv
261+
.getConfig()
262+
.getCommonConfig()
263+
.setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
264+
.setSchemaReplicationFactor(3)
265+
.setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
266+
.setDataReplicationFactor(2);
267+
simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
268+
simpleEnv.initClusterEnvironment(1, 3);
269+
270+
CloseableHttpResponse response = null;
271+
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
272+
try {
273+
HttpPost httpPost =
274+
getHttpPost(
275+
"http://"
276+
+ simpleEnv.getDataNodeWrapper(0).getIp()
277+
+ ":"
278+
+ simpleEnv.getDataNodeWrapper(0).getRestServicePort()
279+
+ "/rest/v2/insertRecords");
280+
String json =
281+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
282+
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
283+
for (int i = 0; i < 30; i++) {
284+
try {
285+
response = httpClient.execute(httpPost);
286+
break;
287+
} catch (Exception e) {
288+
if (i == 29) {
289+
throw e;
290+
}
291+
try {
292+
Thread.sleep(1000);
293+
} catch (InterruptedException ex) {
294+
throw new RuntimeException(ex);
295+
}
296+
}
297+
}
298+
299+
HttpEntity responseEntity = response.getEntity();
300+
String message = EntityUtils.toString(responseEntity, "utf-8");
301+
JsonObject result = JsonParser.parseString(message).getAsJsonObject();
302+
assertEquals(507, Integer.parseInt(result.get("code").toString()));
303+
} catch (IOException e) {
304+
e.printStackTrace();
305+
fail(e.getMessage());
306+
} finally {
307+
try {
308+
if (response != null) {
309+
response.close();
310+
}
311+
} catch (IOException e) {
312+
e.printStackTrace();
313+
fail(e.getMessage());
314+
}
315+
}
316+
TimeUnit.SECONDS.sleep(5);
317+
318+
try {
319+
for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) {
320+
dataNodeWrapper.stop();
321+
try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection();
322+
Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) {
323+
int count = 0;
324+
try (ResultSet resultSet =
325+
statementAfterNodeDown.executeQuery(
326+
"select s88, s77, s66, s55, s44, s33 from root.s1")) {
327+
ResultSetMetaData metaData = resultSet.getMetaData();
328+
while (resultSet.next()) {
329+
StringBuilder row = new StringBuilder();
330+
for (int i = 0; i < metaData.getColumnCount(); i++) {
331+
row.append(resultSet.getString(i + 1)).append(",");
332+
}
333+
System.out.println(row);
334+
count++;
335+
}
336+
}
337+
assertEquals(3, count);
338+
}
339+
dataNodeWrapper.start();
340+
TimeUnit.SECONDS.sleep(1);
341+
}
342+
} catch (SQLException e) {
343+
if (!e.getMessage().contains("Maybe server is down")) {
344+
throw e;
345+
}
346+
} finally {
347+
simpleEnv.cleanClusterEnvironment();
348+
}
349+
}
350+
246351
public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) {
247352
CloseableHttpResponse response = null;
248353
try {

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,17 @@
2929
import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
3030
import org.apache.iotdb.rpc.TSStatusCode;
3131

32+
import org.apache.http.client.methods.HttpPost;
33+
import org.apache.http.entity.StringEntity;
34+
import org.apache.http.impl.client.CloseableHttpClient;
35+
import org.apache.http.impl.client.HttpClientBuilder;
3236
import org.junit.Assert;
3337
import org.junit.Before;
3438
import org.junit.Test;
3539
import org.junit.experimental.categories.Category;
3640
import org.junit.runner.RunWith;
3741

42+
import java.nio.charset.Charset;
3843
import java.sql.Connection;
3944
import java.sql.Statement;
4045
import java.util.Arrays;
@@ -44,6 +49,8 @@
4449
import java.util.Map;
4550
import java.util.function.Consumer;
4651

52+
import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;
53+
4754
@RunWith(IoTDBTestRunner.class)
4855
@Category({MultiClusterIT2DualTreeAutoBasic.class})
4956
public class IoTDBPipeDataSinkIT extends AbstractPipeDualTreeModelAutoIT {
@@ -54,8 +61,14 @@ public void setUp() {
5461
super.setUp();
5562
}
5663

64+
@Override
65+
protected void setupConfig() {
66+
super.setupConfig();
67+
senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
68+
}
69+
5770
@Test
58-
public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
71+
public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
5972
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
6073

6174
final String receiverIp = receiverDataNode.getIp();
@@ -207,7 +220,7 @@ private void testSinkFormat(final String format) throws Exception {
207220
}
208221

209222
@Test
210-
public void testLegacyConnector() throws Exception {
223+
public void testLegacySink() throws Exception {
211224
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
212225

213226
final String receiverIp = receiverDataNode.getIp();
@@ -514,4 +527,53 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
514527
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,"))));
515528
}
516529
}
530+
531+
@Test
532+
public void testSpecialPartialInsert() throws Exception {
533+
try (final Connection connection = senderEnv.getConnection();
534+
final Statement statement = connection.createStatement()) {
535+
statement.execute(
536+
String.format(
537+
"create pipe a2b with sink ('node-urls'='%s')",
538+
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
539+
}
540+
541+
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
542+
543+
HttpPost httpPost =
544+
getHttpPost(
545+
"http://"
546+
+ senderEnv.getDataNodeWrapper(0).getIp()
547+
+ ":"
548+
+ senderEnv.getDataNodeWrapper(0).getRestServicePort()
549+
+ "/rest/v2/insertRecords");
550+
String json =
551+
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
552+
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
553+
for (int i = 0; i < 30; i++) {
554+
try {
555+
httpClient.execute(httpPost);
556+
break;
557+
} catch (final Exception e) {
558+
if (i == 29) {
559+
throw e;
560+
}
561+
try {
562+
Thread.sleep(1000);
563+
} catch (InterruptedException ex) {
564+
throw new RuntimeException(ex);
565+
}
566+
}
567+
}
568+
569+
TestUtils.assertDataEventuallyOnEnv(
570+
receiverEnv,
571+
"select s88, s77, s66, s55, s44, s33 from root.s1",
572+
"Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
573+
new HashSet<>(
574+
Arrays.asList(
575+
"1635232113960,null,null,null,null,1,",
576+
"1635232151960,null,null,2.0,2.1,null,",
577+
"1635232143960,6.0,4.0,null,null,null,")));
578+
}
517579
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,11 @@ public PlanNode visitInsertRows(
792792
insertRowStatement.getTime(),
793793
insertRowStatement.getValues(),
794794
insertRowStatement.isNeedInferType());
795+
if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
796+
for (Integer index : insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
797+
insertRowNode.markFailedMeasurement(index);
798+
}
799+
}
795800
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
796801
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
797802
}

0 commit comments

Comments
 (0)