From 8cee23ea91122089e215532c1a7002553db7426a Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Tue, 24 Jun 2025 18:41:54 +0800 Subject: [PATCH] Support set default compression by data type --- .../tsfile/common/conf/TSFileConfig.java | 80 ++++++++++ .../tsfile/common/conf/TSFileDescriptor.java | 6 + .../write/chunk/AlignedChunkWriterImpl.java | 11 +- .../write/schema/MeasurementSchema.java | 4 +- .../tsfile/write/schema/TimeseriesSchema.java | 4 +- .../write/schema/VectorMeasurementSchema.java | 107 ++++++++++++-- .../tsfile/write/writer/TsFileIOWriter.java | 4 + .../apache/tsfile/write/ChunkRewriteTest.java | 2 +- .../tsfile/write/TsFileIOWriterTest.java | 137 +++++++++++++++--- 9 files changed, 312 insertions(+), 43 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java index a987bf340..709af3c0a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java @@ -126,6 +126,24 @@ public class TSFileConfig implements Serializable { /** Encoder of string, blob and text column. Default value is PLAIN. */ private String textEncoding = "PLAIN"; + /** Compression of boolean column. Defaults to the overall compression. */ + private String booleanCompression = null; + + /** Compression of int32 and date column. Defaults to the overall compression. */ + private String int32Compression = null; + + /** Compression of int64 and timestamp column. Defaults to the overall compression. */ + private String int64Compression = null; + + /** Compression of float column. Defaults to the overall compression. */ + private String floatCompression = null; + + /** Compression of double column. Defaults to the overall compression. */ + private String doubleCompression = null; + + /** Compression of string, blob and text column. Defaults to the overall compression. */ + private String textCompression = null; + /** * Encoder of value series. default value is PLAIN. For int, long data type, TsFile also supports * TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, double data type, TsFile @@ -361,6 +379,44 @@ public String getValueEncoder(TSDataType dataType) { } } + public CompressionType getCompressor(TSDataType dataType) { + String compressionName; + switch (dataType) { + case BOOLEAN: + compressionName = booleanCompression; + break; + case INT32: + case DATE: + compressionName = int32Compression; + break; + case INT64: + case TIMESTAMP: + compressionName = int64Compression; + break; + case FLOAT: + compressionName = floatCompression; + break; + case DOUBLE: + compressionName = doubleCompression; + break; + case STRING: + case BLOB: + case TEXT: + compressionName = textCompression; + break; + default: + compressionName = null; + } + + CompressionType compressionType; + if (compressionName != null) { + compressionType = CompressionType.valueOf(compressionName); + } else { + compressionType = compressor; + } + return compressionType; + } + public void setValueEncoder(String valueEncoder) { this.valueEncoder = valueEncoder; } @@ -689,4 +745,28 @@ public boolean isLz4UseJni() { public void setLz4UseJni(boolean lz4UseJni) { this.lz4UseJni = lz4UseJni; } + + public void setBooleanCompression(String booleanCompression) { + this.booleanCompression = booleanCompression; + } + + public void setInt32Compression(String int32Compression) { + this.int32Compression = int32Compression; + } + + public void setInt64Compression(String int64Compression) { + this.int64Compression = int64Compression; + } + + public void setFloatCompression(String floatCompression) { + this.floatCompression = floatCompression; + } + + public void setDoubleCompression(String doubleCompression) { + this.doubleCompression = doubleCompression; + } + + public void setTextCompression(String textCompression) { + this.textCompression = textCompression; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java index 435561d9f..01b783167 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java @@ -81,6 +81,12 @@ public void overwriteConfigByCustomSettings(Properties properties) { writer.setInt(conf::setFloatPrecision, "float_precision"); writer.setString(conf::setValueEncoder, "value_encoder"); writer.setString(conf::setCompressor, "compressor"); + writer.setString(conf::setBooleanCompression, "boolean_compressor"); + writer.setString(conf::setInt32Compression, "int32_compressor"); + writer.setString(conf::setInt64Compression, "int64_compressor"); + writer.setString(conf::setFloatCompression, "float_compressor"); + writer.setString(conf::setDoubleCompression, "double_compressor"); + writer.setString(conf::setTextCompression, "text_compressor"); writer.setInt(conf::setBatchSize, "batch_size"); writer.setString(conf::setEncryptType, "encrypt_type"); writer.setBoolean(conf::setLz4UseJni, "lz4_use_jni"); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java index 49ec4d7f2..2ec4bd8f8 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java @@ -61,7 +61,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema) { timeChunkWriter = new TimeChunkWriter( schema.getMeasurementName(), - schema.getCompressor(), + schema.getTimeCompressor(), schema.getTimeTSEncoding(), schema.getTimeEncoder(), this.encryptParam); @@ -76,7 +76,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema) { valueChunkWriterList.add( new ValueChunkWriter( valueMeasurementIdList.get(i), - schema.getCompressor(), + schema.getValueCompressor(i), valueTSDataTypeList.get(i), valueTSEncodingList.get(i), valueEncoderList.get(i), @@ -92,7 +92,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema, EncryptParameter e timeChunkWriter = new TimeChunkWriter( schema.getMeasurementName(), - schema.getCompressor(), + schema.getTimeCompressor(), schema.getTimeTSEncoding(), schema.getTimeEncoder(), this.encryptParam); @@ -107,7 +107,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema, EncryptParameter e valueChunkWriterList.add( new ValueChunkWriter( valueMeasurementIdList.get(i), - schema.getCompressor(), + schema.getValueCompressor(i), valueTSDataTypeList.get(i), valueTSEncodingList.get(i), valueEncoderList.get(i), @@ -193,7 +193,8 @@ public AlignedChunkWriterImpl(List schemaList) { TSEncoding timeEncoding = TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); - CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor(); + CompressionType timeCompression = + TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64); timeChunkWriter = new TimeChunkWriter( "", diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java index f63c2dc2e..59ba1816d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java @@ -67,7 +67,7 @@ public MeasurementSchema(String measurementName, TSDataType dataType) { measurementName, dataType, TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(dataType)), - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(dataType), null); } @@ -77,7 +77,7 @@ public MeasurementSchema(String measurementName, TSDataType dataType, TSEncoding measurementName, dataType, encoding, - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(dataType), null); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java index e05912d0b..21c742650 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java @@ -56,7 +56,7 @@ public TimeseriesSchema(String fullPath, TSDataType tsDataType) { fullPath, tsDataType, TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)), - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType), Collections.emptyMap()); } @@ -66,7 +66,7 @@ public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding) { fullPath, type, encoding, - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(type), Collections.emptyMap()); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java index c53fee324..777eaf876 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java @@ -47,13 +47,19 @@ public class VectorMeasurementSchema RamUsageEstimator.shallowSizeOfInstance(VectorMeasurementSchema.class); private static final long BUILDER_SIZE = RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class); + private static final byte NO_UNIFIED_COMPRESSOR = -1; private String deviceId; private Map measurementsToIndexMap; private byte[] types; private byte[] encodings; private TSEncodingBuilder[] encodingConverters; - private byte compressor; + + /** For compatibility of old versions. */ + private byte unifiedCompressor; + + /** [0] is for the time column. */ + private byte[] compressors; public VectorMeasurementSchema() {} @@ -80,7 +86,34 @@ public VectorMeasurementSchema( } this.encodings = encodingsInByte; this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; - this.compressor = compressionType.serialize(); + this.unifiedCompressor = compressionType.serialize(); + } + + public VectorMeasurementSchema( + String deviceId, + String[] subMeasurements, + TSDataType[] types, + TSEncoding[] encodings, + byte[] compressors) { + this.deviceId = deviceId; + this.measurementsToIndexMap = new HashMap<>(); + for (int i = 0; i < subMeasurements.length; i++) { + measurementsToIndexMap.put(subMeasurements[i], i); + } + byte[] typesInByte = new byte[types.length]; + for (int i = 0; i < types.length; i++) { + typesInByte[i] = types[i].serialize(); + } + this.types = typesInByte; + + byte[] encodingsInByte = new byte[encodings.length]; + for (int i = 0; i < encodings.length; i++) { + encodingsInByte[i] = encodings[i].serialize(); + } + this.encodings = encodingsInByte; + this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; + this.unifiedCompressor = NO_UNIFIED_COMPRESSOR; + this.compressors = compressors; } public VectorMeasurementSchema(String deviceId, String[] subMeasurements, TSDataType[] types) { @@ -101,7 +134,15 @@ public VectorMeasurementSchema(String deviceId, String[] subMeasurements, TSData .serialize(); } this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; - this.compressor = TSFileDescriptor.getInstance().getConfig().getCompressor().serialize(); + this.unifiedCompressor = NO_UNIFIED_COMPRESSOR; + // the first column is time + this.compressors = new byte[subMeasurements.length + 1]; + compressors[0] = + TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64).serialize(); + for (int i = 0; i < types.length; i++) { + compressors[i + 1] = + TSFileDescriptor.getInstance().getConfig().getCompressor(types[i]).serialize(); + } } public VectorMeasurementSchema( @@ -124,9 +165,24 @@ public String getMeasurementName() { return deviceId; } + @Deprecated // Aligned series should not invoke this method @Override public CompressionType getCompressor() { - return CompressionType.deserialize(compressor); + throw new UnsupportedOperationException("Aligned series should not invoke this method"); + } + + public CompressionType getTimeCompressor() { + if (compressors != null) { + return CompressionType.deserialize(compressors[0]); + } + return CompressionType.deserialize(unifiedCompressor); + } + + public CompressionType getValueCompressor(int index) { + if (compressors != null) { + return CompressionType.deserialize(compressors[index + 1]); + } + return CompressionType.deserialize(unifiedCompressor); } @Override @@ -276,7 +332,11 @@ public int serializeTo(ByteBuffer buffer) { for (byte encoding : encodings) { byteLen += ReadWriteIOUtils.write(encoding, buffer); } - byteLen += ReadWriteIOUtils.write(compressor, buffer); + byteLen += ReadWriteIOUtils.write(unifiedCompressor, buffer); + if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + buffer.put(compressors); + byteLen += compressors.length; + } return byteLen; } @@ -297,7 +357,11 @@ public int serializeTo(OutputStream outputStream) throws IOException { for (byte encoding : encodings) { byteLen += ReadWriteIOUtils.write(encoding, outputStream); } - byteLen += ReadWriteIOUtils.write(compressor, outputStream); + byteLen += ReadWriteIOUtils.write(unifiedCompressor, outputStream); + if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + outputStream.write(compressors); + byteLen += compressors.length; + } return byteLen; } @@ -348,7 +412,15 @@ public static VectorMeasurementSchema deserializeFrom(InputStream inputStream) } vectorMeasurementSchema.encodings = encodings; - vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(inputStream); + vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(inputStream); + if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + byte[] compressors = new byte[measurementSize + 1]; + int read = inputStream.read(compressors); + if (read != measurementSize) { + throw new IOException("Unexpected end of stream when reading compressors"); + } + vectorMeasurementSchema.compressors = compressors; + } return vectorMeasurementSchema; } @@ -375,7 +447,12 @@ public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) { } vectorMeasurementSchema.encodings = encodings; - vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(buffer); + vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(buffer); + if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + byte[] compressors = new byte[measurementSize + 1]; + buffer.get(compressors); + vectorMeasurementSchema.compressors = compressors; + } return vectorMeasurementSchema; } @@ -391,12 +468,13 @@ public boolean equals(Object o) { return Arrays.equals(types, that.types) && Arrays.equals(encodings, that.encodings) && Objects.equals(deviceId, that.deviceId) - && Objects.equals(compressor, that.compressor); + && Objects.equals(unifiedCompressor, that.unifiedCompressor) + && Objects.equals(compressors, that.compressors); } @Override public int hashCode() { - return Objects.hash(deviceId, types, encodings, compressor); + return Objects.hash(deviceId, types, encodings, unifiedCompressor, compressors); } /** compare by vector name */ @@ -424,7 +502,14 @@ public String toString() { TSEncoding.deserialize(encodings[entry.getValue()]).toString()); sc.addTail("],"); } - sc.addTail(CompressionType.deserialize(compressor).toString()); + if (unifiedCompressor != NO_UNIFIED_COMPRESSOR) { + sc.addTail(CompressionType.deserialize(unifiedCompressor).toString()); + } else { + for (byte compressor : compressors) { + sc.addTail(CompressionType.deserialize(compressor).toString()).addTail(","); + } + } + return sc.toString(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 7d8dc4901..31a6af52b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -403,6 +403,10 @@ public void endCurrentChunk() { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void endFile() throws IOException { + if (!canWrite) { + return; + } + checkInMemoryPathCount(); readChunkMetadataAndConstructIndexTree(); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java index 90215ccf4..30e36055c 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java @@ -385,7 +385,7 @@ public Chunk getTimeChunk( measurementSchema.getMeasurementName(), newChunkData.capacity(), TSDataType.VECTOR, - measurementSchema.getCompressor(), + measurementSchema.getTimeCompressor(), measurementSchema.getTimeTSEncoding(), timeChunkWriter.getNumOfPages()); return new Chunk(newChunkHeader, newChunkData, null, timeChunkWriter.getStatistics()); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java index 1dd2eff97..51a8b80a0 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java @@ -19,22 +19,32 @@ package org.apache.tsfile.write; import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.constant.TestConstant; +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkGroupHeader; import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.ColumnSchema; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.file.metadata.TsFileMetadata; +import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.file.metadata.utils.TestHelper; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.utils.MeasurementGroup; +import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.apache.tsfile.write.schema.Schema; @@ -49,11 +59,14 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import static org.junit.Assert.assertEquals; + public class TsFileIOWriterTest { private static final String FILE_PATH = @@ -99,14 +112,94 @@ public void after() { } } + @Test + public void changeTypeCompressionTest() throws IOException, WriteProcessException { + TSFileConfig config = TSFileDescriptor.getInstance().getConfig(); + CompressionType prevInt32Compression = config.getCompressor(TSDataType.INT32); + CompressionType prevTextCompression = config.getCompressor(TSDataType.TEXT); + config.setInt32Compression("UNCOMPRESSED"); + config.setTextCompression("GZIP"); + + try (TsFileIOWriter ioWriter = + new TsFileIOWriter( + new File( + TestConstant.BASE_OUTPUT_PATH.concat("changeTypeCompressionTest.tsfile"))); + TsFileWriter fileWriter = new TsFileWriter(ioWriter)) { + fileWriter.registerTimeseries( + Factory.DEFAULT_FACTORY.create("root.db1.d1"), + new MeasurementSchema("s1", TSDataType.INT32)); + fileWriter.registerTimeseries( + Factory.DEFAULT_FACTORY.create("root.db1.d1"), + new MeasurementSchema("s2", TSDataType.TEXT)); + TableSchema tableSchema = + new TableSchema( + "t1", + Arrays.asList( + new ColumnSchema("s1", TSDataType.INT32, ColumnCategory.FIELD), + new ColumnSchema("s2", TSDataType.TEXT, ColumnCategory.FIELD))); + fileWriter.registerTableSchema(tableSchema); + + Tablet treeTablet = + new Tablet( + "root.db1.d1", + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.TEXT))); + treeTablet.addTimestamp(0, 0); + treeTablet.addValue(0, 0, 0); + treeTablet.addValue(0, 1, "0"); + fileWriter.writeTree(treeTablet); + + Tablet tableTablet = + new Tablet( + "t1", + Arrays.asList("s1", "s2"), + Arrays.asList(TSDataType.INT32, TSDataType.TEXT), + Arrays.asList(ColumnCategory.FIELD, ColumnCategory.FIELD)); + tableTablet.addTimestamp(0, 0); + tableTablet.addValue(0, 0, 0); + tableTablet.addValue(0, 1, "0"); + fileWriter.writeTable(tableTablet); + fileWriter.flush(); + + ChunkMetadata s1TreeChunkMeta = + ioWriter.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(0); + ChunkMetadata s2TreeChunkMeta = + ioWriter.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(1); + ChunkMetadata s1TableChunkMeta = + ioWriter.getChunkGroupMetadataList().get(1).getChunkMetadataList().get(1); + ChunkMetadata s2TableChunkMeta = + ioWriter.getChunkGroupMetadataList().get(1).getChunkMetadataList().get(2); + + fileWriter.close(); + + try (TsFileSequenceReader sequenceReader = + new TsFileSequenceReader( + TestConstant.BASE_OUTPUT_PATH.concat("changeTypeCompressionTest.tsfile"))) { + Chunk chunk = sequenceReader.readMemChunk(s1TreeChunkMeta); + assertEquals(CompressionType.UNCOMPRESSED, chunk.getHeader().getCompressionType()); + chunk = sequenceReader.readMemChunk(s2TreeChunkMeta); + assertEquals(CompressionType.GZIP, chunk.getHeader().getCompressionType()); + chunk = sequenceReader.readMemChunk(s1TableChunkMeta); + assertEquals(CompressionType.UNCOMPRESSED, chunk.getHeader().getCompressionType()); + chunk = sequenceReader.readMemChunk(s2TableChunkMeta); + assertEquals(CompressionType.GZIP, chunk.getHeader().getCompressionType()); + } + + } finally { + config.setInt32Compression(prevInt32Compression.name()); + config.setTextCompression(prevTextCompression.name()); + } + } + @Test public void endFileTest() throws IOException { TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH); // magic_string - Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic()); - Assert.assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber()); - Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic()); + assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic()); + assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber()); + assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic()); reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + 1); @@ -114,39 +207,39 @@ public void endFileTest() throws IOException { ChunkGroupHeader chunkGroupHeader; for (int i = 0; i < CHUNK_GROUP_NUM; i++) { // chunk group header - Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); + assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); chunkGroupHeader = reader.readChunkGroupHeader(); - Assert.assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID()); + assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID()); // ordinary chunk header - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals(SENSOR_1, header.getMeasurementID()); + assertEquals(SENSOR_1, header.getMeasurementID()); } for (int i = 0; i < CHUNK_GROUP_NUM; i++) { // chunk group header - Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); + assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); chunkGroupHeader = reader.readChunkGroupHeader(); - Assert.assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID()); + assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID()); // vector chunk header (time) - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("", header.getMeasurementID()); + assertEquals("", header.getMeasurementID()); // vector chunk header (values) - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("s1", header.getMeasurementID()); - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); + assertEquals("s1", header.getMeasurementID()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("s2", header.getMeasurementID()); + assertEquals("s2", header.getMeasurementID()); } - Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker()); + assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker()); reader.readPlanIndex(); - Assert.assertEquals(100, reader.getMinPlanIndex()); - Assert.assertEquals(10000, reader.getMaxPlanIndex()); + assertEquals(100, reader.getMinPlanIndex()); + assertEquals(10000, reader.getMaxPlanIndex()); - Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker()); + assertEquals(MetaMarker.SEPARATOR, reader.readMarker()); // make sure timeseriesMetadata is only Map> deviceTimeseriesMetadataMap = @@ -167,7 +260,7 @@ public void endFileTest() throws IOException { for (MetadataIndexNode node : metaData.getTableMetadataIndexNodeMap().values()) { cnt += node.getChildren().size(); } - Assert.assertEquals(2, cnt); + assertEquals(2, cnt); } private void writeChunkGroup(TsFileIOWriter writer, IMeasurementSchema measurementSchema) @@ -200,7 +293,7 @@ private void writeVectorChunkGroup( // vector chunk (time) writer.startFlushChunk( vectorMeasurementSchema.getMeasurementName(), - vectorMeasurementSchema.getCompressor(), + vectorMeasurementSchema.getTimeCompressor(), vectorMeasurementSchema.getType(), vectorMeasurementSchema.getTimeTSEncoding(), Statistics.getStatsByType(vectorMeasurementSchema.getType()), @@ -216,7 +309,7 @@ private void writeVectorChunkGroup( subStatistics.updateStats(0L, 0L); writer.startFlushChunk( vectorMeasurementSchema.getSubMeasurementsList().get(j), - vectorMeasurementSchema.getCompressor(), + vectorMeasurementSchema.getValueCompressor(j), vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j), vectorMeasurementSchema.getSubMeasurementsTSEncodingList().get(j), subStatistics,