Skip to content

Commit 6de423b

Browse files
VGalaxiesJackieTien97
authored andcommitted
Subscription: fully managed tsfile parsing process for tsfile format topic (#15524)
(cherry picked from commit 632d87e)
1 parent 8d5e92d commit 6de423b

4 files changed

Lines changed: 32 additions & 15 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
5050
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
5151
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
52-
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
5352
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
5453
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
5554
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
@@ -149,15 +148,7 @@ public PipeDataNodeTask build() {
149148
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
150149
CONNECTOR_FORMAT_HYBRID_VALUE)
151150
.equals(CONNECTOR_FORMAT_TABLET_VALUE),
152-
PipeType.SUBSCRIPTION.equals(pipeType)
153-
&&
154-
// should not skip parsing when the format is tsfile
155-
!pipeStaticMeta
156-
.getConnectorParameters()
157-
.getStringOrDefault(
158-
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
159-
CONNECTOR_FORMAT_HYBRID_VALUE)
160-
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
151+
PipeType.SUBSCRIPTION.equals(pipeType));
161152

162153
return new PipeDataNodeTask(
163154
pipeStaticMeta.getPipeName(), regionId, extractorStage, processorStage, connectorStage);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th
149149
}
150150
}
151151

152-
private boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent sourceEvent) {
152+
public static boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent sourceEvent) {
153153
return !sourceEvent.shouldParseTimeOrPattern()
154154
|| (sourceEvent.isTableModelEvent()
155155
&& (sourceEvent.getTablePattern() == null

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.apache.iotdb.db.subscription.broker;
2121

22+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2223
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
24+
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
2325
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
2426
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
2527
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -245,6 +247,10 @@ public SubscriptionEvent pollTsFile(
245247

246248
@Override
247249
protected boolean onEvent(final TsFileInsertionEvent event) {
250+
if (!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent) event)) {
251+
return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
252+
}
253+
248254
final SubscriptionCommitContext commitContext = generateSubscriptionCommitContext();
249255
final SubscriptionEvent ev =
250256
new SubscriptionEvent(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2323
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
24+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2425
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
2526
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
2627
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -82,10 +83,29 @@ protected void onTabletInsertionEvent(final TabletInsertionEvent event) {
8283

8384
@Override
8485
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
85-
LOGGER.warn(
86-
"SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} when batching.",
87-
this,
88-
event);
86+
// TODO: parse tsfile event on the fly like SubscriptionPipeTabletEventBatch
87+
try {
88+
for (final TabletInsertionEvent parsedEvent : event.toTabletInsertionEvents()) {
89+
if (!((PipeRawTabletInsertionEvent) parsedEvent)
90+
.increaseReferenceCount(this.getClass().getName())) {
91+
LOGGER.warn(
92+
"SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.",
93+
((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
94+
} else {
95+
try {
96+
batch.onEvent(parsedEvent);
97+
} catch (final Exception ignored) {
98+
// no exceptions will be thrown
99+
}
100+
}
101+
}
102+
} finally {
103+
try {
104+
event.close();
105+
} catch (final Exception ignored) {
106+
// no exceptions will be thrown
107+
}
108+
}
89109
}
90110

91111
@Override

0 commit comments

Comments
 (0)