diff --git a/java/pom.xml b/java/pom.xml index 7a9ae1b57..03571d61e 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -55,7 +55,7 @@ org.apache.commons commons-lang3 - 3.15.0 + 3.18.0 org.lz4 @@ -95,11 +95,6 @@ 1.3.15 - - com.google.code.gson - gson - 2.10.1 - diff --git a/java/tsfile/pom.xml b/java/tsfile/pom.xml index 42ab37166..73d41730e 100644 --- a/java/tsfile/pom.xml +++ b/java/tsfile/pom.xml @@ -87,11 +87,6 @@ logback-classic test - - com.google.code.gson - gson - test - diff --git a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java index 72cf59be1..a7ef6b6e5 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java @@ -102,6 +102,42 @@ public class TSFileConfig implements Serializable { */ private String timeEncoding = "TS_2DIFF"; + /** Encoder of boolean column. Default value is RLE. */ + private String booleanEncoding = "RLE"; + + /** Encoder of int32 and date column. Default value is TS_2DIFF. */ + private String int32Encoding = "TS_2DIFF"; + + /** Encoder of int64 and timestamp column. Default value is TS_2DIFF. */ + private String int64Encoding = "TS_2DIFF"; + + /** Encoder of float column. Default value is GORILLA. */ + private String floatEncoding = "GORILLA"; + + /** Encoder of double column. Default value is GORILLA. */ + private String doubleEncoding = "GORILLA"; + + /** Encoder of string, blob and text column. Default value is PLAIN. */ + private String textEncoding = "PLAIN"; + + /** Compression of boolean column. Defaults to the overall compression. */ + private String booleanCompression = null; + + /** Compression of int32 and date column. Defaults to the overall compression. */ + private String int32Compression = null; + + /** Compression of int64 and timestamp column. Defaults to the overall compression. */ + private String int64Compression = null; + + /** Compression of float column. Defaults to the overall compression. */ + private String floatCompression = null; + + /** Compression of double column. Defaults to the overall compression. */ + private String doubleCompression = null; + + /** Compression of string, blob and text column. Defaults to the overall compression. */ + private String textCompression = null; + /** * Encoder of value series. default value is PLAIN. For int, long data type, TsFile also supports * TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, double data type, TsFile @@ -288,6 +324,66 @@ public String getValueEncoder() { return valueEncoder; } + public String getValueEncoder(TSDataType dataType) { + switch (dataType) { + case BOOLEAN: + return booleanEncoding; + case INT32: + case DATE: + return int32Encoding; + case INT64: + case TIMESTAMP: + return int64Encoding; + case FLOAT: + return floatEncoding; + case DOUBLE: + return doubleEncoding; + case STRING: + case BLOB: + case TEXT: + default: + return textEncoding; + } + } + + public CompressionType getCompressor(TSDataType dataType) { + String compressionName; + switch (dataType) { + case BOOLEAN: + compressionName = booleanCompression; + break; + case INT32: + case DATE: + compressionName = int32Compression; + break; + case INT64: + case TIMESTAMP: + compressionName = int64Compression; + break; + case FLOAT: + compressionName = floatCompression; + break; + case DOUBLE: + compressionName = doubleCompression; + break; + case STRING: + case BLOB: + case TEXT: + compressionName = textCompression; + break; + default: + compressionName = null; + } + + CompressionType compressionType; + if (compressionName != null) { + compressionType = CompressionType.valueOf(compressionName); + } else { + compressionType = compressor; + } + return compressionType; + } + public void setValueEncoder(String valueEncoder) { this.valueEncoder = valueEncoder; } @@ -568,4 +664,28 @@ public boolean isLz4UseJni() { public void setLz4UseJni(boolean lz4UseJni) { this.lz4UseJni = lz4UseJni; } + + public void setBooleanCompression(String booleanCompression) { + this.booleanCompression = booleanCompression; + } + + public void setInt32Compression(String int32Compression) { + this.int32Compression = int32Compression; + } + + public void setInt64Compression(String int64Compression) { + this.int64Compression = int64Compression; + } + + public void setFloatCompression(String floatCompression) { + this.floatCompression = floatCompression; + } + + public void setDoubleCompression(String doubleCompression) { + this.doubleCompression = doubleCompression; + } + + public void setTextCompression(String textCompression) { + this.textCompression = textCompression; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java index 8f2e2c787..d00220fda 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java @@ -81,6 +81,12 @@ public void overwriteConfigByCustomSettings(Properties properties) { writer.setInt(conf::setFloatPrecision, "float_precision"); writer.setString(conf::setValueEncoder, "value_encoder"); writer.setString(conf::setCompressor, "compressor"); + writer.setString(conf::setBooleanCompression, "boolean_compressor"); + writer.setString(conf::setInt32Compression, "int32_compressor"); + writer.setString(conf::setInt64Compression, "int64_compressor"); + writer.setString(conf::setFloatCompression, "float_compressor"); + writer.setString(conf::setDoubleCompression, "double_compressor"); + writer.setString(conf::setTextCompression, "text_compressor"); writer.setInt(conf::setBatchSize, "batch_size"); writer.setBoolean(conf::setLz4UseJni, "lz4_use_jni"); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java index 22d310c2a..515f8e6ee 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java @@ -56,7 +56,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema) { timeChunkWriter = new TimeChunkWriter( schema.getMeasurementId(), - schema.getCompressor(), + schema.getTimeCompressor(), schema.getTimeTSEncoding(), schema.getTimeEncoder()); @@ -70,7 +70,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema) { valueChunkWriterList.add( new ValueChunkWriter( valueMeasurementIdList.get(i), - schema.getCompressor(), + schema.getValueCompressor(i), valueTSDataTypeList.get(i), valueTSEncodingList.get(i), valueEncoderList.get(i))); @@ -122,7 +122,8 @@ public AlignedChunkWriterImpl(List schemaList) { TSEncoding timeEncoding = TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()); TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType(); - CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor(); + CompressionType timeCompression = + TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64); timeChunkWriter = new TimeChunkWriter( "", diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java index 545d9cc2b..b3f20ac42 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java @@ -63,8 +63,8 @@ public MeasurementSchema(String measurementId, TSDataType tsDataType) { this( measurementId, tsDataType, - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()), - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)), + TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType), null); } @@ -74,7 +74,7 @@ public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encod measurementId, type, encoding, - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(type), null); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java index 0c287e85f..21c742650 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java @@ -55,8 +55,8 @@ public TimeseriesSchema(String fullPath, TSDataType tsDataType) { this( fullPath, tsDataType, - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()), - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)), + TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType), Collections.emptyMap()); } @@ -66,7 +66,7 @@ public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding) { fullPath, type, encoding, - TSFileDescriptor.getInstance().getConfig().getCompressor(), + TSFileDescriptor.getInstance().getConfig().getCompressor(type), Collections.emptyMap()); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java index 9b157540c..33c21479c 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java @@ -47,13 +47,19 @@ public class VectorMeasurementSchema RamUsageEstimator.shallowSizeOfInstance(VectorMeasurementSchema.class); private static final long BUILDER_SIZE = RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class); + private static final byte NO_UNIFIED_COMPRESSOR = -1; private String deviceId; private Map measurementsToIndexMap; private byte[] types; private byte[] encodings; private TSEncodingBuilder[] encodingConverters; - private byte compressor; + + /** For compatibility of old versions. */ + private byte unifiedCompressor; + + /** [0] is for the time column. */ + private byte[] compressors; public VectorMeasurementSchema() {} @@ -80,7 +86,34 @@ public VectorMeasurementSchema( } this.encodings = encodingsInByte; this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; - this.compressor = compressionType.serialize(); + this.unifiedCompressor = compressionType.serialize(); + } + + public VectorMeasurementSchema( + String deviceId, + String[] subMeasurements, + TSDataType[] types, + TSEncoding[] encodings, + byte[] compressors) { + this.deviceId = deviceId; + this.measurementsToIndexMap = new HashMap<>(); + for (int i = 0; i < subMeasurements.length; i++) { + measurementsToIndexMap.put(subMeasurements[i], i); + } + byte[] typesInByte = new byte[types.length]; + for (int i = 0; i < types.length; i++) { + typesInByte[i] = types[i].serialize(); + } + this.types = typesInByte; + + byte[] encodingsInByte = new byte[encodings.length]; + for (int i = 0; i < encodings.length; i++) { + encodingsInByte[i] = encodings[i].serialize(); + } + this.encodings = encodingsInByte; + this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; + this.unifiedCompressor = NO_UNIFIED_COMPRESSOR; + this.compressors = compressors; } public VectorMeasurementSchema(String deviceId, String[] subMeasurements, TSDataType[] types) { @@ -101,7 +134,15 @@ public VectorMeasurementSchema(String deviceId, String[] subMeasurements, TSData .serialize(); } this.encodingConverters = new TSEncodingBuilder[subMeasurements.length]; - this.compressor = TSFileDescriptor.getInstance().getConfig().getCompressor().serialize(); + this.unifiedCompressor = NO_UNIFIED_COMPRESSOR; + // the first column is time + this.compressors = new byte[subMeasurements.length + 1]; + compressors[0] = + TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64).serialize(); + for (int i = 0; i < types.length; i++) { + compressors[i + 1] = + TSFileDescriptor.getInstance().getConfig().getCompressor(types[i]).serialize(); + } } public VectorMeasurementSchema( @@ -124,9 +165,24 @@ public String getMeasurementId() { return deviceId; } + @Deprecated // Aligned series should not invoke this method @Override public CompressionType getCompressor() { - return CompressionType.deserialize(compressor); + throw new UnsupportedOperationException("Aligned series should not invoke this method"); + } + + public CompressionType getTimeCompressor() { + if (compressors != null) { + return CompressionType.deserialize(compressors[0]); + } + return CompressionType.deserialize(unifiedCompressor); + } + + public CompressionType getValueCompressor(int index) { + if (compressors != null) { + return CompressionType.deserialize(compressors[index + 1]); + } + return CompressionType.deserialize(unifiedCompressor); } @Override @@ -276,7 +332,11 @@ public int serializeTo(ByteBuffer buffer) { for (byte encoding : encodings) { byteLen += ReadWriteIOUtils.write(encoding, buffer); } - byteLen += ReadWriteIOUtils.write(compressor, buffer); + byteLen += ReadWriteIOUtils.write(unifiedCompressor, buffer); + if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + buffer.put(compressors); + byteLen += compressors.length; + } return byteLen; } @@ -297,7 +357,11 @@ public int serializeTo(OutputStream outputStream) throws IOException { for (byte encoding : encodings) { byteLen += ReadWriteIOUtils.write(encoding, outputStream); } - byteLen += ReadWriteIOUtils.write(compressor, outputStream); + byteLen += ReadWriteIOUtils.write(unifiedCompressor, outputStream); + if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + outputStream.write(compressors); + byteLen += compressors.length; + } return byteLen; } @@ -348,7 +412,15 @@ public static VectorMeasurementSchema deserializeFrom(InputStream inputStream) } vectorMeasurementSchema.encodings = encodings; - vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(inputStream); + vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(inputStream); + if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + byte[] compressors = new byte[measurementSize + 1]; + int read = inputStream.read(compressors); + if (read != measurementSize) { + throw new IOException("Unexpected end of stream when reading compressors"); + } + vectorMeasurementSchema.compressors = compressors; + } return vectorMeasurementSchema; } @@ -375,7 +447,12 @@ public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) { } vectorMeasurementSchema.encodings = encodings; - vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(buffer); + vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(buffer); + if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) { + byte[] compressors = new byte[measurementSize + 1]; + buffer.get(compressors); + vectorMeasurementSchema.compressors = compressors; + } return vectorMeasurementSchema; } @@ -391,12 +468,13 @@ public boolean equals(Object o) { return Arrays.equals(types, that.types) && Arrays.equals(encodings, that.encodings) && Objects.equals(deviceId, that.deviceId) - && Objects.equals(compressor, that.compressor); + && Objects.equals(unifiedCompressor, that.unifiedCompressor) + && Objects.equals(compressors, that.compressors); } @Override public int hashCode() { - return Objects.hash(deviceId, types, encodings, compressor); + return Objects.hash(deviceId, types, encodings, unifiedCompressor, compressors); } /** compare by vector name */ @@ -424,7 +502,14 @@ public String toString() { TSEncoding.deserialize(encodings[entry.getValue()]).toString()); sc.addTail("],"); } - sc.addTail(CompressionType.deserialize(compressor).toString()); + if (unifiedCompressor != NO_UNIFIED_COMPRESSOR) { + sc.addTail(CompressionType.deserialize(unifiedCompressor).toString()); + } else { + for (byte compressor : compressors) { + sc.addTail(CompressionType.deserialize(compressor).toString()).addTail(","); + } + } + return sc.toString(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index b7d719726..cc59fd20d 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -322,6 +322,10 @@ public void endCurrentChunk() { */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning public void endFile() throws IOException { + if (!canWrite) { + return; + } + checkInMemoryPathCount(); readChunkMetadataAndConstructIndexTree(); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/PerfTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/PerfTest.java index 3cdf1bc07..e0c199324 100755 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/PerfTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/PerfTest.java @@ -20,7 +20,6 @@ import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.conf.TSFileDescriptor; -import org.apache.tsfile.common.constant.JsonFormatConstant; import org.apache.tsfile.constant.TestConstant; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.WriteProcessException; @@ -33,7 +32,6 @@ import ch.qos.logback.classic.Level; import ch.qos.logback.classic.LoggerContext; -import com.google.gson.JsonObject; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -186,10 +184,6 @@ private static Schema generateTestData() { schema.registerTimeseries( new Path("d2"), new MeasurementSchema("s4", TSDataType.TEXT, TSEncoding.PLAIN)); - JsonObject s4 = new JsonObject(); - s4.addProperty(JsonFormatConstant.MEASUREMENT_UID, "s4"); - s4.addProperty(JsonFormatConstant.DATA_TYPE, TSDataType.TEXT.toString()); - s4.addProperty(JsonFormatConstant.MEASUREMENT_ENCODING, TSEncoding.PLAIN.toString()); return schema; } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java index dbda5319a..7a385c0f5 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java @@ -53,6 +53,8 @@ import java.util.Map; import java.util.Set; +import static org.junit.Assert.assertEquals; + public class TsFileIOWriterTest { private static final String FILE_PATH = @@ -103,9 +105,9 @@ public void endFileTest() throws IOException { TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH); // magic_string - Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic()); - Assert.assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber()); - Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic()); + assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic()); + assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber()); + assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic()); reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + 1); @@ -113,39 +115,39 @@ public void endFileTest() throws IOException { ChunkGroupHeader chunkGroupHeader; for (int i = 0; i < CHUNK_GROUP_NUM; i++) { // chunk group header - Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); + assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); chunkGroupHeader = reader.readChunkGroupHeader(); - Assert.assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID()); + assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID()); // ordinary chunk header - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals(SENSOR_1, header.getMeasurementID()); + assertEquals(SENSOR_1, header.getMeasurementID()); } for (int i = 0; i < CHUNK_GROUP_NUM; i++) { // chunk group header - Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); + assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker()); chunkGroupHeader = reader.readChunkGroupHeader(); - Assert.assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID()); + assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID()); // vector chunk header (time) - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("", header.getMeasurementID()); + assertEquals("", header.getMeasurementID()); // vector chunk header (values) - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("s1", header.getMeasurementID()); - Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); + assertEquals("s1", header.getMeasurementID()); + assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker()); header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER); - Assert.assertEquals("s2", header.getMeasurementID()); + assertEquals("s2", header.getMeasurementID()); } - Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker()); + assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker()); reader.readPlanIndex(); - Assert.assertEquals(100, reader.getMinPlanIndex()); - Assert.assertEquals(10000, reader.getMaxPlanIndex()); + assertEquals(100, reader.getMinPlanIndex()); + assertEquals(10000, reader.getMaxPlanIndex()); - Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker()); + assertEquals(MetaMarker.SEPARATOR, reader.readMarker()); // make sure timeseriesMetadata is only Map> deviceTimeseriesMetadataMap = @@ -198,7 +200,7 @@ private void writeVectorChunkGroup( // vector chunk (time) writer.startFlushChunk( vectorMeasurementSchema.getMeasurementId(), - vectorMeasurementSchema.getCompressor(), + vectorMeasurementSchema.getTimeCompressor(), vectorMeasurementSchema.getType(), vectorMeasurementSchema.getTimeTSEncoding(), Statistics.getStatsByType(vectorMeasurementSchema.getType()), @@ -214,7 +216,7 @@ private void writeVectorChunkGroup( subStatistics.updateStats(0L, 0L); writer.startFlushChunk( vectorMeasurementSchema.getSubMeasurementsList().get(j), - vectorMeasurementSchema.getCompressor(), + vectorMeasurementSchema.getValueCompressor(j), vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j), vectorMeasurementSchema.getSubMeasurementsTSEncodingList().get(j), subStatistics,