diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 2aef8c8094cea..0bbc4535adfda 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -373,6 +373,10 @@ public MeasurementIterator iterateNotAlignedSeries( private void applyModificationForAlignedChunkMetadataList( TsFileResource tsFileResource, List alignedChunkMetadataList) throws IllegalPathException { + if (alignedChunkMetadataList.isEmpty()) { + // all the value chunks is empty chunk + return; + } ModificationFile modificationFile = ModificationFile.getNormalMods(tsFileResource); if (!modificationFile.exists()) { return; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java index e87596c1c5530..7978fc6bef937 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; @@ -40,9 +41,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter; +import org.apache.iotdb.tsfile.write.page.TimePageWriter; +import org.apache.iotdb.tsfile.write.page.ValuePageWriter; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.junit.After; @@ -54,6 +59,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -709,4 +715,142 @@ public void testCompactionLogIsDeletedAfterException() throws IOException { Assert.assertEquals( 4, Objects.requireNonNull(seqResource1.getTsFile().getParentFile().listFiles()).length); } + + @Test + public void testCompactionWithAllEmptyValueChunks() throws IOException, IllegalPathException { + List timeserisPathList = new ArrayList<>(); + List tsDataTypes = new ArrayList<>(); + // seq file with empty aligned device + int deviceNum = 10; + int measurementNum = 10; + TsFileResource resource = createEmptyFileAndResource(true); + try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) { + // write the data in device + for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) { + tsFileIOWriter.startChunkGroup(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + deviceIndex); + + List dataTypes = createDataType(measurementNum); + List encodings = createEncodingType(measurementNum); + List compressionTypes = createCompressionType(measurementNum); + List measurementIndexes = new ArrayList<>(); + for (int i = 0; i < measurementNum; i++) { + measurementIndexes.add(i); + } + List timeseriesPath = + createTimeseries(deviceIndex, measurementIndexes, dataTypes, true); + + List iChunkWriters = + createChunkWriter(timeseriesPath, dataTypes, encodings, compressionTypes, true); + + // write first chunk + List pages = new ArrayList<>(); + pages.add(new TimeRange(0L, 300L)); + pages.add(new TimeRange(500L, 600L)); + + for (IChunkWriter iChunkWriter : iChunkWriters) { + if (deviceIndex == 0) { + writeEmptyAlignedChunk( + (AlignedChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true); + } else { + writeAlignedChunk((AlignedChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true); + } + } + + tsFileIOWriter.endChunkGroup(); + resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + deviceIndex, 0); + resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + deviceIndex, 1400); + timeserisPathList.addAll(timeseriesPath); + tsDataTypes.addAll(dataTypes); + if (deviceIndex == 0) { + generateModsFile(timeseriesPath, resource, Long.MIN_VALUE, Long.MAX_VALUE); + } + } + tsFileIOWriter.endFile(); + } + resource.serialize(); + seqResources.add(resource); + + // start compacting + tsFileManager.addAll(seqResources, true); + tsFileManager.addAll(unseqResources, false); + + Map> sourceDatas = + readSourceFiles(createTimeseries(maxDeviceNum, maxMeasurementNum, true), tsDataTypes); + + // execute inner compaction for each seq file to produce file with empty value chunk + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, + tsFileManager, + Collections.singletonList(seqResources.get(0)), + true, + new ReadChunkCompactionPerformer(), + 0); + Assert.assertTrue(task.start()); + + validateSeqFiles(true); + + validateTargetDatas(sourceDatas, tsDataTypes); + } + + private void writeEmptyAlignedChunk( + AlignedChunkWriterImpl alignedChunkWriter, + TsFileIOWriter tsFileIOWriter, + List pages, + boolean isSeq) + throws IOException { + TimePageWriter timePageWriter = alignedChunkWriter.getTimeChunkWriter().getPageWriter(); + for (TimeRange page : pages) { + // write time page + for (long timestamp = page.getMin(); timestamp <= page.getMax(); timestamp++) { + timePageWriter.write(timestamp); + } + // seal time page + alignedChunkWriter.getTimeChunkWriter().sealCurrentPage(); + + // write value page + for (ValueChunkWriter valueChunkWriter : alignedChunkWriter.getValueChunkWriterList()) { + ValuePageWriter valuePageWriter = valueChunkWriter.getPageWriter(); + for (long timestamp = page.getMin(); timestamp <= page.getMax(); timestamp++) { + writeEmptyAlignedPoint(valuePageWriter, timestamp, isSeq); + } + // seal sub value page + valueChunkWriter.sealCurrentPage(); + } + } + // seal time chunk and value chunks + alignedChunkWriter.writeToFileWriter(tsFileIOWriter); + } + + private void writeEmptyAlignedPoint( + ValuePageWriter valuePageWriter, long timestamp, boolean isSeq) { + switch (valuePageWriter.getStatistics().getType()) { + case TEXT: + valuePageWriter.write( + timestamp, + new Binary(isSeq ? "seqText" : "unSeqText", TSFileConfig.STRING_CHARSET), + true); + break; + case DOUBLE: + valuePageWriter.write(timestamp, isSeq ? timestamp + 0.01 : 100000.01 + timestamp, true); + break; + case BOOLEAN: + valuePageWriter.write(timestamp, isSeq, true); + break; + case INT64: + valuePageWriter.write(timestamp, isSeq ? timestamp : 100000L + timestamp, true); + break; + case INT32: + valuePageWriter.write( + timestamp, isSeq ? (int) timestamp : (int) (100000 + timestamp), true); + break; + case FLOAT: + valuePageWriter.write( + timestamp, isSeq ? timestamp + (float) 0.1 : (float) (100000.1 + timestamp), true); + break; + default: + throw new UnsupportedOperationException( + "Unknown data type " + valuePageWriter.getStatistics().getType()); + } + } } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java index a512e5f3cfcef..b72d2267f18b8 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java @@ -155,6 +155,7 @@ public List getChunkMetadataList() { return getAlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList); } + /** Notice: if all the value chunks is empty chunk, then return empty list. */ private List getAlignedChunkMetadata( List timeChunkMetadata, List> valueChunkMetadataList) { List res = new ArrayList<>(); diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index dfdbd81c7441f..396c0a9eead52 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -2078,7 +2078,8 @@ public List getChunkMetadataList(Path path) throws IOException { } /** - * Get AlignedChunkMetadata of sensors under one device + * Get AlignedChunkMetadata of sensors under one device. Notice: if all the value chunks is empty + * chunk, then return empty list. * * @param device device name */