Skip to content

Commit 67b017b

Browse files
authored
Support COPY TO TsFile for exporting query results in table model (#17372)
1 parent 6ef35e4 commit 67b017b

30 files changed

Lines changed: 2188 additions & 15 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/copyto/IoTDBCopyToTsFileIT.java

Lines changed: 667 additions & 0 deletions
Large diffs are not rendered by default.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/DiskSpaceInsufficientException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,8 @@ public DiskSpaceInsufficientException(List<String> folders) {
3131
String.format("Can't get next folder from [%s], because they are all full.", folders),
3232
TSStatusCode.DISK_SPACE_INSUFFICIENT.getStatusCode());
3333
}
34+
35+
public DiskSpaceInsufficientException(String message) {
36+
super(message, TSStatusCode.DISK_SPACE_INSUFFICIENT.getStatusCode());
37+
}
3438
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.operator.process.copyto;
21+
22+
import org.apache.iotdb.commons.schema.column.ColumnHeader;
23+
import org.apache.iotdb.db.exception.sql.SemanticException;
24+
import org.apache.iotdb.db.queryengine.execution.operator.process.copyto.tsfile.CopyToTsFileOptions;
25+
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
26+
import org.apache.iotdb.db.queryengine.plan.relational.planner.RelationPlan;
27+
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
28+
29+
import org.apache.tsfile.utils.Accountable;
30+
31+
import java.util.List;
32+
import java.util.Set;
33+
34+
/**
35+
* Interface for COPY TO command options.
36+
*
37+
* <p>This interface defines the configuration for COPY TO operations, including target format,
38+
* target table name, column mappings, and memory management. Implementations provide
39+
* format-specific validation and inference logic.
40+
*/
41+
public interface CopyToOptions extends Accountable {
42+
43+
/**
44+
* Infers and validates options based on query analysis and relation plan.
45+
*
46+
* <p>This method is called during analysis phase to fill in default values and validate
47+
* user-specified options against the query structure. For example, it can infer the target time
48+
* column from the query's time column if not explicitly specified.
49+
*
50+
* @param analysis the query analysis containing metadata about the query
51+
* @param queryRelationPlan the logical relation plan of the inner query
52+
* @param columnHeaders the column headers from the query result
53+
*/
54+
void infer(Analysis analysis, RelationPlan queryRelationPlan, List<ColumnHeader> columnHeaders);
55+
56+
/**
57+
* Validates the options against the actual column schema.
58+
*
59+
* <p>This method is called after planning to ensure the specified options (e.g., target columns,
60+
* tags) are valid for the given output schema.
61+
*
62+
* @param columnHeaders the column headers of the query result
63+
* @throws SemanticException if validation fails
64+
*/
65+
void check(List<ColumnHeader> columnHeaders);
66+
67+
/**
68+
* Returns the response column headers for the result set.
69+
*
70+
* <p>These headers describe the columns that will be returned to the client after the COPY TO
71+
* operation completes (e.g., file path, row count).
72+
*
73+
* @return list of column headers for the response
74+
*/
75+
List<ColumnHeader> getRespColumnHeaders();
76+
77+
/**
78+
* Returns the output symbols that represent the columns in the COPY TO result.
79+
*
80+
* @return list of symbols for the output columns
81+
*/
82+
List<Symbol> getOutputSymbols();
83+
84+
/**
85+
* Returns the output column names for the result.
86+
*
87+
* @return list of column names
88+
*/
89+
List<String> getOutputColumnNames();
90+
91+
/**
92+
* Returns the target output format.
93+
*
94+
* @return the format enum value
95+
*/
96+
Format getFormat();
97+
98+
/**
99+
* Estimates the maximum memory usage in bytes required for writing.
100+
*
101+
* <p>This is used for memory management and determining whether to flush data to disk.
102+
*
103+
* @return estimated maximum memory usage in bytes
104+
*/
105+
long estimatedMaxRamBytesInWrite();
106+
107+
/** Supported output formats for COPY TO command. */
108+
enum Format {
109+
/** TsFile format output. */
110+
TSFILE,
111+
}
112+
113+
class Builder {
114+
private CopyToOptions.Format format = CopyToOptions.Format.TSFILE;
115+
private String targetTableName = null;
116+
private String targetTimeColumn = null;
117+
private Set<String> targetTagColumns = null;
118+
private long memoryThreshold = 32 * 1024 * 1024;
119+
120+
public Builder withFormat(CopyToOptions.Format format) {
121+
this.format = format;
122+
return this;
123+
}
124+
125+
public Builder withTargetTableName(String targetTableName) {
126+
this.targetTableName = targetTableName;
127+
return this;
128+
}
129+
130+
public Builder withTargetTimeColumn(String targetTimeColumn) {
131+
this.targetTimeColumn = targetTimeColumn;
132+
return this;
133+
}
134+
135+
public Builder withTargetTagColumns(Set<String> targetTagColumns) {
136+
this.targetTagColumns = targetTagColumns;
137+
return this;
138+
}
139+
140+
public Builder withMemoryThreshold(long memoryThreshold) {
141+
if (memoryThreshold <= 0) {
142+
throw new SemanticException("The memory threshold must be greater than 0.");
143+
}
144+
this.memoryThreshold = memoryThreshold;
145+
return this;
146+
}
147+
148+
public CopyToOptions build() {
149+
switch (format) {
150+
case TSFILE:
151+
default:
152+
return new CopyToTsFileOptions(
153+
targetTableName, targetTimeColumn, targetTagColumns, memoryThreshold);
154+
}
155+
}
156+
}
157+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.operator.process.copyto;
21+
22+
import org.apache.tsfile.read.common.block.TsBlock;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* Interface for writing query results to external storage formats.
28+
*
29+
* <p>This interface abstracts the writing logic so that different output formats (e.g., TsFile,
30+
* CSV) can be supported. Each implementation handles format-specific operations like encoding,
31+
* schema definition, and file sealing.
32+
*/
33+
public interface IFormatCopyToWriter {
34+
35+
/**
36+
* Writes a TsBlock containing query results to the target storage.
37+
*
38+
* @param tsBlock the data block to write, containing rows of query results
39+
* @throws Exception if write fails (e.g., IO error, encoding error)
40+
*/
41+
void write(TsBlock tsBlock) throws Exception;
42+
43+
/**
44+
* Builds a TsBlock containing metadata or summary information about the written data.
45+
*
46+
* <p>This is typically called after all data has been written to return information about the
47+
* output, such as file paths, row counts, or statistics.
48+
*
49+
* @return a TsBlock containing result metadata, or null if no result is needed
50+
*/
51+
TsBlock buildResultTsBlock();
52+
53+
/**
54+
* Finalizes the writing process and prepares the file for reading.
55+
*
56+
* <p>This method ensures all data is flushed to disk and necessary footer/headers are written.
57+
* After seal() is called, no more data should be written.
58+
*
59+
* @throws Exception if sealing fails
60+
*/
61+
void seal() throws Exception;
62+
63+
/**
64+
* Closes the writer and releases all resources.
65+
*
66+
* <p>This method should be called after seal() to ensure proper cleanup of file handles, buffers,
67+
* and other resources.
68+
*
69+
* @throws IOException if close fails
70+
*/
71+
void close() throws IOException;
72+
}

0 commit comments

Comments
 (0)