Skip to content

Commit 4dbda78

Browse files
authored
Added database check for flush on local & Optimized the UTF-8 param in IT (#17365)
* fix * bishop * fix-it
1 parent 3b02d32 commit 4dbda78

7 files changed

Lines changed: 45 additions & 15 deletions

File tree

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,8 @@ public void start() {
527527
"-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize() + "m",
528528
"-Djdk.nio.maxCachedBufferSize=262144",
529529
"-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" + killPoints.toString(),
530-
"-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
530+
"-Dsun.jnu.encoding=UTF-8",
531+
"-Dfile.encoding=UTF-8",
531532
"-cp",
532533
server_node_lib_path));
533534
addStartCmdParams(startCmd);

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,15 @@ public void testFlushNotExistGroupNoData() {
188188
sqe.printStackTrace();
189189
assertTrue(sqe.getMessage().contains(expectedMsg));
190190
}
191+
try {
192+
statement.execute(
193+
"FLUSH root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 on local");
194+
} catch (SQLException sqe) {
195+
String expectedMsg =
196+
"500: Database root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 does not exist on local";
197+
sqe.printStackTrace();
198+
assertTrue(sqe.getMessage().contains(expectedMsg));
199+
}
191200
} catch (Exception e) {
192201
fail(e.getMessage());
193202
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -981,11 +981,11 @@ public TSStatus merge() throws TException {
981981
@Override
982982
public TSStatus flush(final TFlushReq req) throws TException {
983983
if (req.storageGroups != null) {
984-
final List<String> noExistSg =
984+
final List<String> noExistDB =
985985
configManager.getPartitionManager().filterUnExistDatabases(req.storageGroups);
986-
if (!noExistSg.isEmpty()) {
986+
if (!noExistDB.isEmpty()) {
987987
final StringBuilder sb = new StringBuilder();
988-
noExistSg.forEach(storageGroup -> sb.append(storageGroup).append(","));
988+
noExistDB.forEach(database -> sb.append(database).append(","));
989989
return RpcUtils.getStatus(
990990
TSStatusCode.DATABASE_NOT_EXIST,
991991
"Database " + sb.subSequence(0, sb.length() - 1) + " does not exist");

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2433,7 +2433,7 @@ public TSStatus stopRepairData() throws TException {
24332433
@Override
24342434
public TSStatus flush(TFlushReq req) throws TException {
24352435
try {
2436-
storageEngine.operateFlush(req);
2436+
storageEngine.operateFlush(req, false);
24372437
} catch (Exception e) {
24382438
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
24392439
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,8 +1264,7 @@ public SettableFuture<ConfigTaskResult> flush(
12641264
}
12651265
} else {
12661266
try {
1267-
StorageEngine.getInstance().operateFlush(tFlushReq);
1268-
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
1267+
tsStatus = StorageEngine.getInstance().operateFlush(tFlushReq, true);
12691268
} catch (final Exception e) {
12701269
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
12711270
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,17 +1574,17 @@ public Node visitRemoveAINodeStatement(RelationalSqlParser.RemoveAINodeStatement
15741574
@Override
15751575
public Node visitFlushStatement(final RelationalSqlParser.FlushStatementContext ctx) {
15761576
final FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH);
1577-
List<String> storageGroups = null;
1577+
List<String> databases = null;
15781578
if (ctx.booleanValue() != null) {
15791579
flushStatement.setSeq(Boolean.parseBoolean(ctx.booleanValue().getText()));
15801580
}
15811581
flushStatement.setOnCluster(
15821582
ctx.localOrClusterMode() == null || ctx.localOrClusterMode().LOCAL() == null);
15831583
if (ctx.identifier() != null) {
1584-
storageGroups =
1584+
databases =
15851585
getIdentifiers(ctx.identifier()).stream().map(Identifier::getValue).collect(toList());
15861586
}
1587-
flushStatement.setDatabases(storageGroups);
1587+
flushStatement.setDatabases(databases);
15881588
return new Flush(flushStatement, null);
15891589
}
15901590

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.iotdb.commons.service.IService;
4343
import org.apache.iotdb.commons.service.ServiceType;
4444
import org.apache.iotdb.commons.utils.PathUtils;
45+
import org.apache.iotdb.commons.utils.StatusUtils;
4546
import org.apache.iotdb.commons.utils.TestOnly;
4647
import org.apache.iotdb.commons.utils.TimePartitionUtils;
4748
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -555,6 +556,11 @@ public void syncCloseProcessorsInRegion(List<String> dataRegionIds) {
555556
checkResults(tasks, "Failed to sync close processor.");
556557
}
557558

559+
public boolean containsDatabase(final String database) {
560+
return dataRegionMap.values().stream()
561+
.anyMatch(dataRegion -> Objects.equals(database, dataRegion.getDatabaseName()));
562+
}
563+
558564
public void syncCloseProcessorsInDatabase(String databaseName, boolean isSeq) {
559565
List<Future<Void>> tasks = new ArrayList<>();
560566
for (DataRegion dataRegion : dataRegionMap.values()) {
@@ -662,22 +668,37 @@ public void recoverRepairData() {
662668
}
663669
}
664670

665-
public void operateFlush(TFlushReq req) {
671+
public TSStatus operateFlush(final TFlushReq req, final boolean onLocal) {
672+
final StorageEngine storageEngine = StorageEngine.getInstance();
666673
if (req.getRegionIds() != null && !req.getRegionIds().isEmpty()) {
667-
StorageEngine.getInstance().syncCloseProcessorsInRegion(req.getRegionIds());
674+
storageEngine.syncCloseProcessorsInRegion(req.getRegionIds());
668675
} else if (req.storageGroups == null || req.storageGroups.isEmpty()) {
669676
StorageEngine.getInstance().syncCloseAllProcessor();
670677
WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
671678
} else {
679+
if (onLocal) {
680+
final List<String> noExistDB =
681+
req.storageGroups.stream()
682+
.filter(database -> !storageEngine.containsDatabase(database))
683+
.collect(Collectors.toList());
684+
if (!noExistDB.isEmpty()) {
685+
final StringBuilder sb = new StringBuilder();
686+
noExistDB.forEach(database -> sb.append(database).append(","));
687+
return RpcUtils.getStatus(
688+
TSStatusCode.DATABASE_NOT_EXIST,
689+
"Database " + sb.subSequence(0, sb.length() - 1) + " does not exist on local");
690+
}
691+
}
672692
for (String databaseName : req.storageGroups) {
673693
if (req.isSeq == null) {
674-
StorageEngine.getInstance().syncCloseProcessorsInDatabase(databaseName);
694+
storageEngine.syncCloseProcessorsInDatabase(databaseName);
675695
} else {
676-
StorageEngine.getInstance()
677-
.syncCloseProcessorsInDatabase(databaseName, Boolean.parseBoolean(req.isSeq));
696+
storageEngine.syncCloseProcessorsInDatabase(
697+
databaseName, Boolean.parseBoolean(req.isSeq));
678698
}
679699
}
680700
}
701+
return StatusUtils.OK;
681702
}
682703

683704
public void clearCache() {

0 commit comments

Comments
 (0)