From 91176fcf7bd76e3b1ba9aa774d7f4fd332c14b22 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 8 Mar 2024 15:33:03 +0800 Subject: [PATCH 1/7] fix error log caused by ClosedByInterruptException --- .../task/InnerSpaceCompactionTask.java | 2 +- .../compaction/io/CompactionTsFileInput.java | 83 +++++++++++++++++++ .../compaction/io/CompactionTsFileReader.java | 1 + .../repair/RepairDataFileScanUtil.java | 8 +- .../AbstractCompactionEstimator.java | 3 + .../AbstractCrossSpaceEstimator.java | 8 ++ .../AbstractInnerSpaceEstimator.java | 13 +++ .../RewriteCrossSpaceCompactionSelector.java | 2 +- .../tsfile/read/reader/LocalTsFileInput.java | 4 +- 9 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index d5d709a4df5df..ae6c2d92a3856 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -486,7 +486,7 @@ public long getEstimatedMemoryCost() { if (innerSpaceEstimator != null && memoryCost == 0L) { try { memoryCost = innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList); - } catch (IOException e) { + } catch (Exception e) { if (e instanceof ClosedByInterruptException || Thread.interrupted()) { Thread.currentThread().interrupt(); return -1; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java new file mode 100644 index 0000000000000..bbb5fa645f01f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.io; + +import org.apache.iotdb.tsfile.read.reader.TsFileInput; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; + +public class CompactionTsFileInput implements TsFileInput { + private final TsFileInput tsFileInput; + + public CompactionTsFileInput(TsFileInput tsFileInput) { + this.tsFileInput = tsFileInput; + } + + @Override + public long size() throws IOException { + return tsFileInput.size(); + } + + @Override + public long position() throws IOException { + return tsFileInput.position(); + } + + @Override + public TsFileInput position(long newPosition) throws IOException { + return tsFileInput.position(newPosition); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int readSize = tsFileInput.read(dst); + if (readSize == -1 && Thread.interrupted()) { + throw new ClosedByInterruptException(); + } + return readSize; + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + int readSize = tsFileInput.read(dst, position); + if (readSize == -1 && Thread.interrupted()) { + throw new ClosedByInterruptException(); + } + return readSize; + } + + @Override + public InputStream wrapAsInputStream() throws IOException { + return tsFileInput.wrapAsInputStream(); + } + + @Override + public void close() throws IOException { + tsFileInput.close(); + } + + @Override + public String getFilePath() { + return tsFileInput.getFilePath(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java index 3795904ba5748..7f580b6f1351f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileReader.java @@ -65,6 +65,7 @@ public class CompactionTsFileReader extends TsFileSequenceReader { */ public CompactionTsFileReader(String file, CompactionType compactionType) throws IOException { super(file); + this.tsFileInput = new CompactionTsFileInput(tsFileInput); this.compactionType = compactionType; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java index 4ce1552412dc4..29c961095c727 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java @@ -83,15 +83,15 @@ public void scanTsFile() { checkNonAlignedDeviceSeries(reader, device); } } - } catch (IOException ioException) { + } catch (CompactionLastTimeCheckFailedException lastTimeCheckFailedException) { + this.hasUnsortedData = true; + } catch (Exception e) { // ignored the exception caused by thread interrupt if (Thread.currentThread().isInterrupted()) { return; } - logger.warn("Meet error when read tsfile {}", tsfile.getAbsolutePath(), ioException); + logger.warn("Meet error when read tsfile {}", tsfile.getAbsolutePath(), e); isBrokenFile = true; - } catch (CompactionLastTimeCheckFailedException lastTimeCheckFailedException) { - this.hasUnsortedData = true; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java index dd3e3a263098e..6dedb710b54ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java @@ -64,6 +64,8 @@ public abstract class AbstractCompactionEstimator { protected abstract long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOException; + protected abstract TsFileSequenceReader getReader(String filePath) throws IOException; + protected boolean isAllSourceFileExist(List resources) { for (TsFileResource resource : resources) { if (resource.getStatus() == TsFileResourceStatus.DELETED) { @@ -97,6 +99,7 @@ private FileInfo getFileInfoFromCache(TsFileResource resource) throws IOExceptio } try (TsFileSequenceReader reader = new TsFileSequenceReader(resource.getTsFilePath(), true, false)) { + FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader); fileInfoCache.put(resource, fileInfo); synchronized (globalFileInfoCacheForFailedCompaction) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java index 2207fd7f51a15..5eb19d2a32549 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java @@ -19,7 +19,10 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import java.io.IOException; import java.util.ArrayList; @@ -31,6 +34,11 @@ */ public abstract class AbstractCrossSpaceEstimator extends AbstractCompactionEstimator { + @Override + protected TsFileSequenceReader getReader(String filePath) throws IOException { + return new CompactionTsFileReader(filePath, CompactionType.CROSS_COMPACTION); + } + public long estimateCrossCompactionMemory( List seqResources, List unseqResources) throws IOException { List resources = new ArrayList<>(seqResources.size() + unseqResources.size()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java index 50cf624a5c1ae..d9427c64395a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java @@ -19,7 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import java.io.IOException; import java.util.List; @@ -30,6 +34,15 @@ */ public abstract class AbstractInnerSpaceEstimator extends AbstractCompactionEstimator { + @Override + protected TsFileSequenceReader getReader(String filePath) throws IOException { + if (filePath.contains(IoTDBConstant.UNSEQUENCE_FOLDER_NAME)) { + return new CompactionTsFileReader(filePath, CompactionType.INNER_UNSEQ_COMPACTION); + } else { + return new CompactionTsFileReader(filePath, CompactionType.INNER_SEQ_COMPACTION); + } + } + public long estimateInnerCompactionMemory(List resources) throws IOException { if (!CompactionEstimateUtils.addReadLock(resources)) { return -1L; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index 56efdfbf500ad..9307de32c66c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -128,7 +128,7 @@ public CrossCompactionTaskResource selectOneTaskResources(CrossSpaceCompactionCa candidate.getUnseqFiles().size()); return executeTaskResourceSelection(candidate); - } catch (IOException e) { + } catch (Exception e) { if (e instanceof ClosedByInterruptException || Thread.interrupted()) { Thread.currentThread().interrupt(); return new CrossCompactionTaskResource(); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java index 168fdd59fb4c9..0dde06ab91215 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java @@ -35,8 +35,8 @@ public class LocalTsFileInput implements TsFileInput { private static final Logger logger = LoggerFactory.getLogger(LocalTsFileInput.class); - private final FileChannel channel; - private final String filePath; + protected final FileChannel channel; + protected final String filePath; public LocalTsFileInput(Path file) throws IOException { channel = FileChannel.open(file, StandardOpenOption.READ); From 96a45c4d2602f0f01fa9d070a73766b21f22a1b9 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 8 Mar 2024 15:35:14 +0800 Subject: [PATCH 2/7] fix error log caused by ClosedByInterruptException --- .../org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java index 0dde06ab91215..168fdd59fb4c9 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java @@ -35,8 +35,8 @@ public class LocalTsFileInput implements TsFileInput { private static final Logger logger = LoggerFactory.getLogger(LocalTsFileInput.class); - protected final FileChannel channel; - protected final String filePath; + private final FileChannel channel; + private final String filePath; public LocalTsFileInput(Path file) throws IOException { channel = FileChannel.open(file, StandardOpenOption.READ); From 7d59f5f7ccf26984b287935e502106259fc7e567 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 8 Mar 2024 17:53:13 +0800 Subject: [PATCH 3/7] add exception --- .../task/InnerSpaceCompactionTask.java | 4 +- .../compaction/io/CompactionTsFileInput.java | 35 ++++++++++++++---- .../RewriteCrossSpaceCompactionSelector.java | 4 +- .../StopReadTsFileByInterruptException.java | 24 ++++++++++++ .../tsfile/read/TsFileSequenceReader.java | 37 +++++++++++++++++++ .../tsfile/read/reader/LocalTsFileInput.java | 23 ++---------- 6 files changed, 97 insertions(+), 30 deletions(-) create mode 100644 iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/StopReadTsFileByInterruptException.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index ae6c2d92a3856..d780dbfae4856 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -40,12 +40,12 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; import org.apache.iotdb.tsfile.utils.TsFileUtils; import java.io.File; import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; @@ -487,7 +487,7 @@ public long getEstimatedMemoryCost() { try { memoryCost = innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList); } catch (Exception e) { - if (e instanceof ClosedByInterruptException || Thread.interrupted()) { + if (e instanceof StopReadTsFileByInterruptException || Thread.interrupted()) { Thread.currentThread().interrupt(); return -1; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java index bbb5fa645f01f..89ff31282c208 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java @@ -19,12 +19,12 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.io; +import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException; import org.apache.iotdb.tsfile.read.reader.TsFileInput; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; public class CompactionTsFileInput implements TsFileInput { private final TsFileInput tsFileInput; @@ -35,24 +35,45 @@ public CompactionTsFileInput(TsFileInput tsFileInput) { @Override public long size() throws IOException { - return tsFileInput.size(); + try { + return tsFileInput.size(); + } catch (Exception e) { + if (Thread.interrupted()) { + throw new StopReadTsFileByInterruptException(); + } + throw e; + } } @Override public long position() throws IOException { - return tsFileInput.position(); + try { + return tsFileInput.position(); + } catch (Exception e) { + if (Thread.interrupted()) { + throw new StopReadTsFileByInterruptException(); + } + throw e; + } } @Override public TsFileInput position(long newPosition) throws IOException { - return tsFileInput.position(newPosition); + try { + return tsFileInput.position(newPosition); + } catch (Exception e) { + if (Thread.interrupted()) { + throw new StopReadTsFileByInterruptException(); + } + throw e; + } } @Override public int read(ByteBuffer dst) throws IOException { int readSize = tsFileInput.read(dst); if (readSize == -1 && Thread.interrupted()) { - throw new ClosedByInterruptException(); + throw new StopReadTsFileByInterruptException(); } return readSize; } @@ -60,8 +81,8 @@ public int read(ByteBuffer dst) throws IOException { @Override public int read(ByteBuffer dst, long position) throws IOException { int readSize = tsFileInput.read(dst, position); - if (readSize == -1 && Thread.interrupted()) { - throw new ClosedByInterruptException(); + if (Thread.interrupted()) { + throw new StopReadTsFileByInterruptException(); } return readSize; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index 9307de32c66c3..fdc1ea8fbf4ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -38,12 +38,12 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; +import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.channels.ClosedByInterruptException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -129,7 +129,7 @@ public CrossCompactionTaskResource selectOneTaskResources(CrossSpaceCompactionCa return executeTaskResourceSelection(candidate); } catch (Exception e) { - if (e instanceof ClosedByInterruptException || Thread.interrupted()) { + if (e instanceof StopReadTsFileByInterruptException || Thread.interrupted()) { Thread.currentThread().interrupt(); return new CrossCompactionTaskResource(); } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/StopReadTsFileByInterruptException.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/StopReadTsFileByInterruptException.java new file mode 100644 index 0000000000000..2a2872502c487 --- /dev/null +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/StopReadTsFileByInterruptException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.exception; + +import java.io.IOException; + +public class StopReadTsFileByInterruptException extends IOException {} diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 396c0a9eead52..c164675978ddc 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.compress.IUnCompressor; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException; import org.apache.iotdb.tsfile.exception.TsFileRuntimeException; import org.apache.iotdb.tsfile.exception.TsFileStatisticsMistakesException; import org.apache.iotdb.tsfile.file.MetaMarker; @@ -298,6 +299,8 @@ public TsFileMetadata readFileMetadata() throws IOException { } } } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while reading file metadata of file {}", file); throw e; @@ -523,6 +526,8 @@ public List readTimeseriesMetadata( TimeseriesMetadata timeseriesMetadata; try { timeseriesMetadata = TimeseriesMetadata.deserializeFrom(tsFileInput, true); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e1) { logger.error( "Something error happened while deserializing TimeseriesMetadata of file {}", file); @@ -728,6 +733,8 @@ public void getDevicesAndEntriesOfOneLeafNode( ByteBuffer nextBuffer = readData(startOffset, endOffset); MetadataIndexNode deviceLeafNode = MetadataIndexNode.deserializeFrom(nextBuffer); getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while getting all devices of file {}", file); throw e; @@ -795,6 +802,8 @@ private void getAllDeviceLeafNodeOffset( getAllDeviceLeafNodeOffset( MetadataIndexNode.deserializeFrom(nextBuffer), leafDeviceNodeOffsets); } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while getting all devices of file {}", file); throw e; @@ -926,6 +935,8 @@ private void getAllPaths( metadataIndexNode.getNodeType(), queue); } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while getting all paths of file {}", file); throw e; @@ -1144,6 +1155,8 @@ private void generateMetadataIndex( } } } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while generating MetadataIndex of file {}", file); throw e; @@ -1193,6 +1206,8 @@ private void generateMetadataIndexUsingTsFileInput( needChunkMetadata); } } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while generating MetadataIndex of file {}", file); throw e; @@ -1314,6 +1329,8 @@ protected Pair getMetadataAndEndOffset( return getMetadataAndEndOffset( MetadataIndexNode.deserializeFrom(buffer), name, isDeviceLevel, exactSearch); } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error("Something error happened while deserializing MetadataIndex of file {}", file); throw e; @@ -1369,6 +1386,8 @@ public void readPlanIndex() throws IOException { public ChunkHeader readChunkHeader(byte chunkType) throws IOException { try { return ChunkHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), chunkType); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable t) { logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file); throw t; @@ -1383,6 +1402,8 @@ public ChunkHeader readChunkHeader(byte chunkType) throws IOException { private ChunkHeader readChunkHeader(long position) throws IOException { try { return ChunkHeader.deserializeFrom(tsFileInput, position); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable t) { logger.warn("Exception {} happened while reading chunk header of {}", t.getMessage(), file); throw t; @@ -1399,6 +1420,8 @@ private ChunkHeader readChunkHeader(long position) throws IOException { public ByteBuffer readChunk(long position, int dataSize) throws IOException { try { return readData(position, dataSize); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable t) { logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file); throw t; @@ -1415,6 +1438,8 @@ public Chunk readMemChunk(long offset) throws IOException { ChunkHeader header = readChunkHeader(offset); ByteBuffer buffer = readChunk(offset + header.getSerializedSize(), header.getDataSize()); return new Chunk(header, buffer); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable t) { logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file); throw t; @@ -1434,6 +1459,8 @@ public Chunk readMemChunk(ChunkMetadata metaData) throws IOException { readChunk( metaData.getOffsetOfChunkHeader() + header.getSerializedSize(), header.getDataSize()); return new Chunk(header, buffer, metaData.getDeleteIntervalList(), metaData.getStatistics()); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable t) { logger.warn("Exception {} happened while reading chunk of {}", t.getMessage(), file); throw t; @@ -1500,6 +1527,8 @@ public MeasurementSchema getMeasurementSchema(List chunkMetadata public PageHeader readPageHeader(TSDataType type, boolean hasStatistic) throws IOException { try { return PageHeader.deserializeFrom(tsFileInput.wrapAsInputStream(), type, hasStatistic); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable t) { logger.warn("Exception {} happened while reading page header of {}", t.getMessage(), file); throw t; @@ -1618,6 +1647,8 @@ protected ByteBuffer readData(long position, int totalSize) throws IOException { protected ByteBuffer readData(long start, long end) throws IOException { try { return readData(start, (int) (end - start)); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable t) { logger.warn("Exception {} happened while reading data of {}", t.getMessage(), file); throw t; @@ -1947,6 +1978,8 @@ public long selfCheckWithInfo( return TsFileCheckStatus.COMPLETE_FILE; } } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (IOException e) { logger.error("Error occurred while fast checking TsFile."); throw e; @@ -1960,6 +1993,8 @@ public long selfCheckWithInfo( long tscheckStatus = TsFileCheckStatus.COMPLETE_FILE; try { tscheckStatus = checkChunkAndPagesStatistics(chunkMetadata); + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (IOException e) { logger.error("Error occurred while checking the statistics of chunk and its pages"); throw e; @@ -2392,6 +2427,8 @@ private void collectEachLeafMeasurementNodeOffsetRange( } collectEachLeafMeasurementNodeOffsetRange(readData(startOffset, endOffset), queue); } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Exception e) { logger.error( "Error occurred while collecting offset ranges of measurement nodes of file {}", file); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java index 168fdd59fb4c9..5dbd596f79641 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java @@ -45,33 +45,18 @@ public LocalTsFileInput(Path file) throws IOException { @Override public long size() throws IOException { - try { - return channel.size(); - } catch (IOException e) { - logger.error("Error happened while getting {} size", filePath); - throw e; - } + return channel.size(); } @Override public long position() throws IOException { - try { - return channel.position(); - } catch (IOException e) { - logger.error("Error happened while getting {} current position", filePath); - throw e; - } + return channel.position(); } @Override public TsFileInput position(long newPosition) throws IOException { - try { - channel.position(newPosition); - return this; - } catch (IOException e) { - logger.error("Error happened while changing {} position to {}", filePath, newPosition); - throw e; - } + channel.position(newPosition); + return this; } @Override From 58b8bdf21781c1d430c84499a9a8582172fee2a8 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 8 Mar 2024 17:58:25 +0800 Subject: [PATCH 4/7] add exception --- .../compaction/io/CompactionTsFileInput.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java index 89ff31282c208..800e8facf30f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileInput.java @@ -38,7 +38,7 @@ public long size() throws IOException { try { return tsFileInput.size(); } catch (Exception e) { - if (Thread.interrupted()) { + if (Thread.currentThread().isInterrupted()) { throw new StopReadTsFileByInterruptException(); } throw e; @@ -50,7 +50,7 @@ public long position() throws IOException { try { return tsFileInput.position(); } catch (Exception e) { - if (Thread.interrupted()) { + if (Thread.currentThread().isInterrupted()) { throw new StopReadTsFileByInterruptException(); } throw e; @@ -62,7 +62,7 @@ public TsFileInput position(long newPosition) throws IOException { try { return tsFileInput.position(newPosition); } catch (Exception e) { - if (Thread.interrupted()) { + if (Thread.currentThread().isInterrupted()) { throw new StopReadTsFileByInterruptException(); } throw e; @@ -72,7 +72,7 @@ public TsFileInput position(long newPosition) throws IOException { @Override public int read(ByteBuffer dst) throws IOException { int readSize = tsFileInput.read(dst); - if (readSize == -1 && Thread.interrupted()) { + if (Thread.currentThread().isInterrupted()) { throw new StopReadTsFileByInterruptException(); } return readSize; @@ -81,7 +81,7 @@ public int read(ByteBuffer dst) throws IOException { @Override public int read(ByteBuffer dst, long position) throws IOException { int readSize = tsFileInput.read(dst, position); - if (Thread.interrupted()) { + if (Thread.currentThread().isInterrupted()) { throw new StopReadTsFileByInterruptException(); } return readSize; From c41f79e524908b2749099571e04049e0bcec83b8 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 8 Mar 2024 18:34:11 +0800 Subject: [PATCH 5/7] modify some log level --- .../tsfile/read/TsFileSequenceReader.java | 4 +++ .../tsfile/read/reader/LocalTsFileInput.java | 27 ++++++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index c164675978ddc..8e290d31c7fa3 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -145,6 +145,8 @@ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOExce if (loadMetadataSize) { loadMetadataSize(); } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable e) { tsFileInput.close(); throw e; @@ -183,6 +185,8 @@ public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws if (loadMetadataSize) { // NOTE no autoRepair here loadMetadataSize(); } + } catch (StopReadTsFileByInterruptException e) { + throw e; } catch (Throwable e) { tsFileInput.close(); throw e; diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java index 5dbd596f79641..d13fe9b065aeb 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java @@ -45,18 +45,33 @@ public LocalTsFileInput(Path file) throws IOException { @Override public long size() throws IOException { - return channel.size(); + try { + return channel.size(); + } catch (IOException e) { + logger.warn("Error happened while getting {} size", filePath); + throw e; + } } @Override public long position() throws IOException { - return channel.position(); + try { + return channel.position(); + } catch (IOException e) { + logger.warn("Error happened while getting {} current position", filePath); + throw e; + } } @Override public TsFileInput position(long newPosition) throws IOException { - channel.position(newPosition); - return this; + try { + channel.position(newPosition); + return this; + } catch (IOException e) { + logger.warn("Error happened while changing {} position to {}", filePath, newPosition); + throw e; + } } @Override @@ -64,7 +79,7 @@ public int read(ByteBuffer dst) throws IOException { try { return channel.read(dst); } catch (ClosedByInterruptException e) { - logger.warn( + logger.info( "Current thread is interrupted by another thread when it is blocked in an I/O operation upon a channel."); return -1; } catch (IOException e) { @@ -78,7 +93,7 @@ public int read(ByteBuffer dst, long position) throws IOException { try { return channel.read(dst, position); } catch (ClosedByInterruptException e) { - logger.warn( + logger.info( "Current thread is interrupted by another thread when it is blocked in an I/O operation upon a channel."); return -1; } catch (IOException e) { From 52be5226c31e55e563e4d7ca9ba09e56547024d4 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 8 Mar 2024 18:57:41 +0800 Subject: [PATCH 6/7] modify some log level --- .../org/apache/iotdb/tsfile/read/TsFileSequenceReader.java | 4 ---- .../org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 8e290d31c7fa3..c164675978ddc 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -145,8 +145,6 @@ public TsFileSequenceReader(String file, boolean loadMetadataSize) throws IOExce if (loadMetadataSize) { loadMetadataSize(); } - } catch (StopReadTsFileByInterruptException e) { - throw e; } catch (Throwable e) { tsFileInput.close(); throw e; @@ -185,8 +183,6 @@ public TsFileSequenceReader(TsFileInput input, boolean loadMetadataSize) throws if (loadMetadataSize) { // NOTE no autoRepair here loadMetadataSize(); } - } catch (StopReadTsFileByInterruptException e) { - throw e; } catch (Throwable e) { tsFileInput.close(); throw e; diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java index d13fe9b065aeb..872230d7fc148 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java @@ -79,7 +79,7 @@ public int read(ByteBuffer dst) throws IOException { try { return channel.read(dst); } catch (ClosedByInterruptException e) { - logger.info( + logger.warn( "Current thread is interrupted by another thread when it is blocked in an I/O operation upon a channel."); return -1; } catch (IOException e) { @@ -93,7 +93,7 @@ public int read(ByteBuffer dst, long position) throws IOException { try { return channel.read(dst, position); } catch (ClosedByInterruptException e) { - logger.info( + logger.warn( "Current thread is interrupted by another thread when it is blocked in an I/O operation upon a channel."); return -1; } catch (IOException e) { From 074696737a81c014bc38df824a65c27ff053c2da Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 11 Mar 2024 14:22:31 +0800 Subject: [PATCH 7/7] fix bug --- .../dataregion/compaction/schedule/CompactionScheduler.java | 6 +++++- .../selector/estimator/AbstractCompactionEstimator.java | 4 +--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index fccbe81d5c249..707f251ded7b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -184,7 +184,11 @@ private static int addTaskToWaitingQueue(List return trySubmitCount; } - private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task) { + private static boolean canAddTaskToWaitingQueue(AbstractCompactionTask task) + throws InterruptedException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } // check file num long fileNumLimitForCompaction = SystemInfo.getInstance().getTotalFileLimitForCompaction(); if (task.getProcessedFileNum() > fileNumLimitForCompaction) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java index 6dedb710b54ad..62546208378e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java @@ -97,9 +97,7 @@ private FileInfo getFileInfoFromCache(TsFileResource resource) throws IOExceptio return fileInfo; } } - try (TsFileSequenceReader reader = - new TsFileSequenceReader(resource.getTsFilePath(), true, false)) { - + try (TsFileSequenceReader reader = getReader(resource.getTsFilePath())) { FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader); fileInfoCache.put(resource, fileInfo); synchronized (globalFileInfoCacheForFailedCompaction) {