Skip to content

Commit 4e7f3e5

Browse files
authored
Pipe: Fixed the audit db filter on config receiver && Added the judgments for table model audit DB && Optimized the logger for receiver status (#17219)
* fix * table-audit * defend * except * fix
1 parent 85ecbd6 commit 4e7f3e5

11 files changed

Lines changed: 105 additions & 44 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
2323
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
2424
import org.apache.iotdb.commons.schema.SchemaConstant;
25+
import org.apache.iotdb.commons.schema.table.Audit;
2526
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
2627
import org.apache.iotdb.confignode.manager.ConfigManager;
2728
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
@@ -78,7 +79,9 @@ public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEven
7879
|| (!databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
7980
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")
8081
&& !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
81-
&& !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + "."))) {
82+
&& !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + ".")
83+
&& !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
84+
&& !databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + "."))) {
8285
// null or -1 means empty origin leader
8386
final int oldLeaderNodeId = (pair.left == null ? -1 : pair.left.getLeaderId());
8487
final int newLeaderNodeId = (pair.right == null ? -1 : pair.right.getLeaderId());

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/ConfigRegionListeningFilter.java

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
2525
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
2626
import org.apache.iotdb.commons.schema.SchemaConstant;
27+
import org.apache.iotdb.commons.schema.table.Audit;
2728
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
2829
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
2930
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
3031
import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
32+
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTableOrViewPlan;
33+
import org.apache.iotdb.confignode.consensus.request.write.table.AbstractTablePlan;
3134
import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
3235
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
3336

@@ -252,34 +255,61 @@ public class ConfigRegionListeningFilter {
252255
static boolean shouldPlanBeListened(final ConfigPhysicalPlan plan) {
253256
final ConfigPhysicalPlanType type = plan.getType();
254257

255-
// Do not transfer roll back set template plan
256-
if (type.equals(ConfigPhysicalPlanType.CommitSetSchemaTemplate)
257-
&& ((CommitSetSchemaTemplatePlan) plan).isRollback()) {
258-
return false;
258+
switch (type) {
259+
// Do not transfer roll back set template plan
260+
case CommitSetSchemaTemplate:
261+
return !((CommitSetSchemaTemplatePlan) plan).isRollback();
262+
// system / audit DB
263+
case DeleteDatabase:
264+
return !((DeleteDatabasePlan) plan).getName().equals(SchemaConstant.AUDIT_DATABASE)
265+
&& !((DeleteDatabasePlan) plan).getName().equals(SchemaConstant.SYSTEM_DATABASE)
266+
&& !((DeleteDatabasePlan) plan).getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
267+
case CreateDatabase:
268+
case AlterDatabase:
269+
return !(((DatabaseSchemaPlan) plan)
270+
.getSchema()
271+
.getName()
272+
.equals(SchemaConstant.SYSTEM_DATABASE)
273+
&& !((DatabaseSchemaPlan) plan)
274+
.getSchema()
275+
.getName()
276+
.equals(SchemaConstant.AUDIT_DATABASE)
277+
&& !((DatabaseSchemaPlan) plan)
278+
.getSchema()
279+
.getName()
280+
.equals(Audit.TABLE_MODEL_AUDIT_DATABASE));
281+
// Table under audit db
282+
case PipeCreateTableOrView:
283+
return !((PipeCreateTableOrViewPlan) plan)
284+
.getDatabase()
285+
.equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
286+
case CommitCreateTable:
287+
case AddTableColumn:
288+
case AddViewColumn:
289+
case SetTableProperties:
290+
case SetViewProperties:
291+
case SetTableComment:
292+
case SetViewComment:
293+
case SetTableColumnComment:
294+
case RenameTable:
295+
case RenameView:
296+
case RenameTableColumn:
297+
case RenameViewColumn:
298+
case AlterColumnDataType:
299+
case CommitDeleteTable:
300+
case CommitDeleteView:
301+
case CommitDeleteColumn:
302+
case CommitDeleteViewColumn:
303+
case PipeDeleteDevices:
304+
return !((AbstractTablePlan) plan).getDatabase().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
305+
// PipeEnriched & UnsetTemplate are not listened directly,
306+
// but their inner plan or converted plan are listened.
307+
case PipeEnriched:
308+
case UnsetTemplate:
309+
return true;
310+
default:
311+
return OPTION_PLAN_MAP.values().stream().anyMatch(types -> types.contains(type));
259312
}
260-
261-
// system / audit DB
262-
if (type.equals(ConfigPhysicalPlanType.DeleteDatabase)
263-
&& (((DeleteDatabasePlan) plan).getName().equals(SchemaConstant.AUDIT_DATABASE)
264-
|| ((DeleteDatabasePlan) plan).getName().equals(SchemaConstant.SYSTEM_DATABASE))
265-
|| (type.equals(ConfigPhysicalPlanType.CreateDatabase)
266-
|| type.equals(ConfigPhysicalPlanType.AlterDatabase))
267-
&& (((DatabaseSchemaPlan) plan)
268-
.getSchema()
269-
.getName()
270-
.equals(SchemaConstant.SYSTEM_DATABASE)
271-
|| ((DatabaseSchemaPlan) plan)
272-
.getSchema()
273-
.getName()
274-
.equals(SchemaConstant.AUDIT_DATABASE))) {
275-
return false;
276-
}
277-
278-
// PipeEnriched & UnsetTemplate are not listened directly,
279-
// but their inner plan or converted plan are listened.
280-
return type.equals(ConfigPhysicalPlanType.PipeEnriched)
281-
|| type.equals(ConfigPhysicalPlanType.UnsetTemplate)
282-
|| OPTION_PLAN_MAP.values().stream().anyMatch(types -> types.contains(type));
283313
}
284314

285315
public static Set<ConfigPhysicalPlanType> parseListeningPlanTypeSet(

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,8 @@ public static boolean isTypeListened(
344344
final IoTDBTreePatternOperations treePattern,
345345
final TablePattern tablePattern) {
346346
final Boolean isTableDatabasePlan = isTableDatabasePlan(plan);
347-
return listenedTypeSet.contains(plan.getType())
347+
return ConfigRegionListeningFilter.shouldPlanBeListened(plan)
348+
&& listenedTypeSet.contains(plan.getType())
348349
&& (Objects.isNull(isTableDatabasePlan)
349350
|| Boolean.TRUE.equals(isTableDatabasePlan)
350351
&& tablePattern.isTableModelDataAllowedToBeCaptured()

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.path.PartialPath;
2828
import org.apache.iotdb.commons.path.PathPatternTree;
2929
import org.apache.iotdb.commons.schema.SchemaConstant;
30+
import org.apache.iotdb.commons.schema.table.Audit;
3031
import org.apache.iotdb.commons.schema.table.NonCommittableTsTable;
3132
import org.apache.iotdb.commons.schema.table.TableNodeStatus;
3233
import org.apache.iotdb.commons.schema.table.TreeViewSchema;
@@ -180,7 +181,8 @@ public TSStatus setDatabase(
180181
clusterSchemaInfo.isDatabaseNameValid(
181182
schema.getName(), schema.isSetIsTableModel() && schema.isIsTableModel());
182183
if (!schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
183-
&& !schema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
184+
&& !schema.getName().equals(SchemaConstant.AUDIT_DATABASE)
185+
&& !schema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
184186
clusterSchemaInfo.checkDatabaseLimit();
185187
}
186188
// Cache DatabaseSchema
@@ -488,7 +490,8 @@ public synchronized void adjustMaxRegionGroupNum() {
488490
for (final TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
489491
if (!isDatabaseExist(databaseSchema.getName())
490492
|| databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
491-
|| databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
493+
|| databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
494+
|| databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
492495
// filter the pre deleted database and the system database
493496
databaseNum--;
494497
}
@@ -498,7 +501,8 @@ public synchronized void adjustMaxRegionGroupNum() {
498501
new AdjustMaxRegionGroupNumPlan();
499502
for (final TDatabaseSchema databaseSchema : databaseSchemaMap.values()) {
500503
if (databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
501-
|| databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)) {
504+
|| databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
505+
|| databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
502506
// filter the system database
503507
continue;
504508
}
@@ -828,7 +832,9 @@ public static TSStatus enrichDatabaseSchemaWithDefaultProperties(
828832
TSStatus errorResp = null;
829833
final boolean isSystemDatabase =
830834
databaseSchema.getName().equals(SchemaConstant.SYSTEM_DATABASE);
831-
final boolean isAuditDatabase = databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE);
835+
final boolean isAuditDatabase =
836+
databaseSchema.getName().equals(SchemaConstant.AUDIT_DATABASE)
837+
|| databaseSchema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE);
832838

833839
if (databaseSchema.getTTL() < 0) {
834840
errorResp =

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/CNPhysicalPlanGenerator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.commons.schema.SchemaConstant;
2727
import org.apache.iotdb.commons.schema.node.role.IDatabaseMNode;
2828
import org.apache.iotdb.commons.schema.node.utils.IMNodeFactory;
29+
import org.apache.iotdb.commons.schema.table.Audit;
2930
import org.apache.iotdb.commons.schema.table.TsTable;
3031
import org.apache.iotdb.commons.schema.template.Template;
3132
import org.apache.iotdb.commons.utils.AuthUtils;
@@ -481,8 +482,10 @@ private void generateDatabasePhysicalPlan() {
481482
}
482483
stack.push(new Pair<>(databaseMNode, true));
483484
name = databaseMNode.getName();
484-
for (final TsTable table : tableSet) {
485-
planDeque.add(new PipeCreateTableOrViewPlan(name, table));
485+
if (!name.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
486+
for (final TsTable table : tableSet) {
487+
planDeque.add(new PipeCreateTableOrViewPlan(name, table));
488+
}
486489
}
487490
tableSet.clear();
488491
break;
@@ -552,7 +555,8 @@ private IConfigMNode deserializeDatabaseMNode(final InputStream inputStream) thr
552555

553556
final TDatabaseSchema schema = databaseMNode.getAsMNode().getDatabaseSchema();
554557
if (!schema.getName().equals(SchemaConstant.AUDIT_DATABASE)
555-
&& !schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)) {
558+
&& !schema.getName().equals(SchemaConstant.SYSTEM_DATABASE)
559+
&& !schema.getName().equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
556560
final DatabaseSchemaPlan createDBPlan =
557561
new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, schema);
558562
planDeque.add(createDBPlan);

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
3131
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
3232
import org.apache.iotdb.commons.schema.SchemaConstant;
33+
import org.apache.iotdb.commons.schema.table.Audit;
3334
import org.apache.iotdb.commons.utils.TestOnly;
3435
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
3536
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2;
@@ -191,6 +192,8 @@ public void executeFromCalculateInfoForTask(final ConfigNodeProcedureEnv env) {
191192
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")
192193
&& !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
193194
&& !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + ".")
195+
&& !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
196+
&& !databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + ".")
194197
&& !Objects.isNull(currentPipeTaskMeta)
195198
&& !(PipeTaskAgent.isHistoryOnlyPipe(
196199
currentPipeStaticMeta.getSourceParameters())

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
3535
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
3636
import org.apache.iotdb.commons.schema.SchemaConstant;
37+
import org.apache.iotdb.commons.schema.table.Audit;
3738
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
3839
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
3940
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
@@ -313,7 +314,9 @@ public void executeFromCalculateInfoForTask(final ConfigNodeProcedureEnv env) {
313314
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
314315
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")
315316
&& !databaseName.equals(SchemaConstant.AUDIT_DATABASE)
316-
&& !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + ".")) {
317+
&& !databaseName.startsWith(SchemaConstant.AUDIT_DATABASE + ".")
318+
&& !databaseName.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)
319+
&& !databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE + ".")) {
317320
// Pipe only collect user's data, filter out metric database here.
318321
consensusGroupIdToTaskMetaMap.put(
319322
regionGroupId.getId(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.iotdb.commons.path.PartialPath;
4343
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
4444
import org.apache.iotdb.commons.schema.SchemaConstant;
45+
import org.apache.iotdb.commons.schema.table.Audit;
4546
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
4647
import org.apache.iotdb.commons.utils.PathUtils;
4748
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
@@ -300,6 +301,8 @@ private void createDatabaseAndUpdateCache(
300301
databaseNamesNeedCreated.add(SchemaConstant.SYSTEM_DATABASE);
301302
} else if (PathUtils.isStartWith(deviceID, SchemaConstant.AUDIT_DATABASE)) {
302303
databaseNamesNeedCreated.add(SchemaConstant.AUDIT_DATABASE);
304+
} else if (PathUtils.isStartWith(deviceID, Audit.TABLE_MODEL_AUDIT_DATABASE)) {
305+
databaseNamesNeedCreated.add(Audit.TABLE_MODEL_AUDIT_DATABASE);
303306
} else {
304307
final PartialPath databaseNameNeedCreated =
305308
MetaUtils.getDatabasePathByLevel(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.commons.path.PartialPath;
2828
import org.apache.iotdb.commons.path.PathPatternTree;
2929
import org.apache.iotdb.commons.schema.SchemaConstant;
30+
import org.apache.iotdb.commons.schema.table.Audit;
3031
import org.apache.iotdb.commons.utils.PathUtils;
3132
import org.apache.iotdb.commons.utils.TimePartitionUtils;
3233
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -655,12 +656,13 @@ public List<PlanNode> visitSchemaQueryMerge(
655656
.getSchemaPartitionInfo()
656657
.getSchemaPartitionMap()
657658
.forEach(
658-
(storageGroup, deviceGroup) -> {
659-
if (storageGroup.equals(SchemaConstant.SYSTEM_DATABASE)) {
659+
(database, deviceGroup) -> {
660+
if (database.equals(SchemaConstant.SYSTEM_DATABASE)) {
660661
deviceGroup.forEach(
661662
(deviceGroupId, schemaRegionReplicaSet) ->
662663
regionsOfSystemDatabase.add(schemaRegionReplicaSet));
663-
} else if (storageGroup.equals(SchemaConstant.AUDIT_DATABASE)) {
664+
} else if (database.equals(SchemaConstant.AUDIT_DATABASE)
665+
|| database.equals(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
664666
deviceGroup.forEach(
665667
(deviceGroupId, schemaRegionReplicaSet) ->
666668
regionsOfAuditDatabase.add(schemaRegionReplicaSet));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.commons.path.IFullPath;
3232
import org.apache.iotdb.commons.path.MeasurementPath;
3333
import org.apache.iotdb.commons.schema.SchemaConstant;
34+
import org.apache.iotdb.commons.schema.table.Audit;
3435
import org.apache.iotdb.commons.schema.table.InformationSchema;
3536
import org.apache.iotdb.commons.schema.table.TsFileTableSchemaUtil;
3637
import org.apache.iotdb.commons.schema.table.TsTable;
@@ -3799,6 +3800,9 @@ public static Optional<String> getNonSystemDatabaseName(String databaseName) {
37993800
if (databaseName.startsWith(SchemaConstant.AUDIT_DATABASE)) {
38003801
return Optional.empty();
38013802
}
3803+
if (databaseName.startsWith(Audit.TABLE_MODEL_AUDIT_DATABASE)) {
3804+
return Optional.empty();
3805+
}
38023806
int lastIndex = databaseName.lastIndexOf("-");
38033807
if (lastIndex == -1) {
38043808
lastIndex = databaseName.length();

0 commit comments

Comments
 (0)