Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ public MeasurementIterator iterateNotAlignedSeries(
private void applyModificationForAlignedChunkMetadataList(
TsFileResource tsFileResource, List<AlignedChunkMetadata> alignedChunkMetadataList)
throws IllegalPathException {
if (alignedChunkMetadataList.isEmpty()) {
// all the value chunks is empty chunk
return;
}
ModificationFile modificationFile = ModificationFile.getNormalMods(tsFileResource);
if (!modificationFile.exists()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
Expand All @@ -40,9 +41,13 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
import org.apache.iotdb.tsfile.write.page.TimePageWriter;
import org.apache.iotdb.tsfile.write.page.ValuePageWriter;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;

import org.junit.After;
Expand All @@ -54,6 +59,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -709,4 +715,142 @@ public void testCompactionLogIsDeletedAfterException() throws IOException {
Assert.assertEquals(
4, Objects.requireNonNull(seqResource1.getTsFile().getParentFile().listFiles()).length);
}

@Test
public void testCompactionWithAllEmptyValueChunks() throws IOException, IllegalPathException {
List<PartialPath> timeserisPathList = new ArrayList<>();
List<TSDataType> tsDataTypes = new ArrayList<>();
// seq file with empty aligned device
int deviceNum = 10;
int measurementNum = 10;
TsFileResource resource = createEmptyFileAndResource(true);
try (TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(resource.getTsFile())) {
// write the data in device
for (int deviceIndex = 0; deviceIndex < deviceNum; deviceIndex++) {
tsFileIOWriter.startChunkGroup(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + deviceIndex);

List<TSDataType> dataTypes = createDataType(measurementNum);
List<TSEncoding> encodings = createEncodingType(measurementNum);
List<CompressionType> compressionTypes = createCompressionType(measurementNum);
List<Integer> measurementIndexes = new ArrayList<>();
for (int i = 0; i < measurementNum; i++) {
measurementIndexes.add(i);
}
List<PartialPath> timeseriesPath =
createTimeseries(deviceIndex, measurementIndexes, dataTypes, true);

List<IChunkWriter> iChunkWriters =
createChunkWriter(timeseriesPath, dataTypes, encodings, compressionTypes, true);

// write first chunk
List<TimeRange> pages = new ArrayList<>();
pages.add(new TimeRange(0L, 300L));
pages.add(new TimeRange(500L, 600L));

for (IChunkWriter iChunkWriter : iChunkWriters) {
if (deviceIndex == 0) {
writeEmptyAlignedChunk(
(AlignedChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true);
} else {
writeAlignedChunk((AlignedChunkWriterImpl) iChunkWriter, tsFileIOWriter, pages, true);
}
}

tsFileIOWriter.endChunkGroup();
resource.updateStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + deviceIndex, 0);
resource.updateEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + deviceIndex, 1400);
timeserisPathList.addAll(timeseriesPath);
tsDataTypes.addAll(dataTypes);
if (deviceIndex == 0) {
generateModsFile(timeseriesPath, resource, Long.MIN_VALUE, Long.MAX_VALUE);
}
}
tsFileIOWriter.endFile();
}
resource.serialize();
seqResources.add(resource);

// start compacting
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);

Map<PartialPath, List<TimeValuePair>> sourceDatas =
readSourceFiles(createTimeseries(maxDeviceNum, maxMeasurementNum, true), tsDataTypes);

// execute inner compaction for each seq file to produce file with empty value chunk
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0,
tsFileManager,
Collections.singletonList(seqResources.get(0)),
true,
new ReadChunkCompactionPerformer(),
0);
Assert.assertTrue(task.start());

validateSeqFiles(true);

validateTargetDatas(sourceDatas, tsDataTypes);
}

private void writeEmptyAlignedChunk(
AlignedChunkWriterImpl alignedChunkWriter,
TsFileIOWriter tsFileIOWriter,
List<TimeRange> pages,
boolean isSeq)
throws IOException {
TimePageWriter timePageWriter = alignedChunkWriter.getTimeChunkWriter().getPageWriter();
for (TimeRange page : pages) {
// write time page
for (long timestamp = page.getMin(); timestamp <= page.getMax(); timestamp++) {
timePageWriter.write(timestamp);
}
// seal time page
alignedChunkWriter.getTimeChunkWriter().sealCurrentPage();

// write value page
for (ValueChunkWriter valueChunkWriter : alignedChunkWriter.getValueChunkWriterList()) {
ValuePageWriter valuePageWriter = valueChunkWriter.getPageWriter();
for (long timestamp = page.getMin(); timestamp <= page.getMax(); timestamp++) {
writeEmptyAlignedPoint(valuePageWriter, timestamp, isSeq);
}
// seal sub value page
valueChunkWriter.sealCurrentPage();
}
}
// seal time chunk and value chunks
alignedChunkWriter.writeToFileWriter(tsFileIOWriter);
}

private void writeEmptyAlignedPoint(
ValuePageWriter valuePageWriter, long timestamp, boolean isSeq) {
switch (valuePageWriter.getStatistics().getType()) {
case TEXT:
valuePageWriter.write(
timestamp,
new Binary(isSeq ? "seqText" : "unSeqText", TSFileConfig.STRING_CHARSET),
true);
break;
case DOUBLE:
valuePageWriter.write(timestamp, isSeq ? timestamp + 0.01 : 100000.01 + timestamp, true);
break;
case BOOLEAN:
valuePageWriter.write(timestamp, isSeq, true);
break;
case INT64:
valuePageWriter.write(timestamp, isSeq ? timestamp : 100000L + timestamp, true);
break;
case INT32:
valuePageWriter.write(
timestamp, isSeq ? (int) timestamp : (int) (100000 + timestamp), true);
break;
case FLOAT:
valuePageWriter.write(
timestamp, isSeq ? timestamp + (float) 0.1 : (float) (100000.1 + timestamp), true);
break;
default:
throw new UnsupportedOperationException(
"Unknown data type " + valuePageWriter.getStatistics().getType());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public List<AlignedChunkMetadata> getChunkMetadataList() {
return getAlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList);
}

/** Notice: if all the value chunks is empty chunk, then return empty list. */
private List<AlignedChunkMetadata> getAlignedChunkMetadata(
List<IChunkMetadata> timeChunkMetadata, List<List<IChunkMetadata>> valueChunkMetadataList) {
List<AlignedChunkMetadata> res = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2078,7 +2078,8 @@ public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException {
}

/**
* Get AlignedChunkMetadata of sensors under one device
* Get AlignedChunkMetadata of sensors under one device. Notice: if all the value chunks is empty
* chunk, then return empty list.
*
* @param device device name
*/
Expand Down