Skip to content

Commit 32f1010

Browse files
authored
Clean up tmp dirs of udf and sort while starting up (#17377)
1 parent bb68d50 commit 32f1010

3 files changed

Lines changed: 28 additions & 8 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,15 @@ private void sendRestartRequestToConfigNode() throws StartupException {
765765
}
766766
}
767767

768+
private void cleanupSortTmpDir() {
769+
String sortTmpDir = config.getSortTmpDir();
770+
File tmpDir = new File(sortTmpDir);
771+
if (tmpDir.exists()) {
772+
FileUtils.deleteFileOrDirectory(tmpDir, true);
773+
logger.info("Cleaned up stale sort temp directory: {}", sortTmpDir);
774+
}
775+
}
776+
768777
private void prepareResources() throws StartupException {
769778
prepareUDFResources();
770779
prepareTriggerResources();
@@ -819,6 +828,9 @@ private void setUp() throws StartupException, IOException {
819828
registerManager.register(new JMXService());
820829
JMXService.registerMBean(getInstance(), mbeanName);
821830

831+
// Clean up stale sort temp files left from previous runs
832+
cleanupSortTmpDir();
833+
822834
// Get resources for trigger,udf,pipe...
823835
prepareResources();
824836

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.File;
3434
import java.io.IOException;
3535
import java.util.ArrayList;
36+
import java.util.Collections;
3637
import java.util.List;
3738
import java.util.Map;
3839
import java.util.concurrent.ConcurrentHashMap;
@@ -59,10 +60,9 @@ private TemporaryQueryDataFileService() {
5960

6061
public String register(SerializationRecorder recorder) throws IOException {
6162
String queryId = recorder.getQueryId();
62-
if (!recorders.containsKey(queryId)) {
63-
recorders.put(queryId, new ArrayList<>());
64-
}
65-
recorders.get(queryId).add(recorder);
63+
recorders
64+
.computeIfAbsent(queryId, k -> Collections.synchronizedList(new ArrayList<>()))
65+
.add(recorder);
6666

6767
String dirName = getDirName(queryId);
6868
makeDirIfNecessary(dirName);
@@ -109,6 +109,11 @@ private String getFileName(String dir, long index) {
109109
@Override
110110
public void start() throws StartupException {
111111
try {
112+
// Clean up stale temp directories left from previous runs (e.g., after a crash)
113+
File tmpDir = SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR);
114+
if (tmpDir.exists()) {
115+
FileUtils.deleteDirectory(tmpDir);
116+
}
112117
makeDirIfNecessary(TEMPORARY_FILE_DIR);
113118
} catch (IOException e) {
114119
throw new StartupException(e);
@@ -117,8 +122,11 @@ public void start() throws StartupException {
117122

118123
@Override
119124
public void stop() {
120-
for (Object queryId : recorders.keySet().toArray()) {
121-
deregister((String) queryId);
125+
recorders.clear();
126+
try {
127+
FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR));
128+
} catch (IOException e) {
129+
logger.warn("Failed to delete temp dir {}.", TEMPORARY_FILE_DIR, e);
122130
}
123131
}
124132

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ private synchronized long generateNextRequestId() throws IOException {
8181

8282
private void downloadExecutables(List<String> uris, long requestId)
8383
throws IOException, URISyntaxException {
84-
// TODO: para download
8584
try {
8685
for (String uriString : uris) {
8786
final URL url = new URI(uriString).toURL();
@@ -223,7 +222,8 @@ protected void saveToDir(ByteBuffer byteBuffer, String destination) throws IOExc
223222
}
224223
Files.createFile(path);
225224
}
226-
// FileOutPutStream is not in append mode by default, so the file will be overridden if it
225+
// FileOutPutStream is not in append mode by default, so the file will be
226+
// overridden if it
227227
// already exists.
228228
try (FileOutputStream outputStream = new FileOutputStream(destination)) {
229229
outputStream.getChannel().write(byteBuffer);

0 commit comments

Comments
 (0)