Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,24 @@ public class TSFileConfig implements Serializable {
/** Encoder of string, blob and text column. Default value is PLAIN. */
private String textEncoding = "PLAIN";

/** Compression of boolean column. Defaults to the overall compression. */
private String booleanCompression = null;

/** Compression of int32 and date column. Defaults to the overall compression. */
private String int32Compression = null;

/** Compression of int64 and timestamp column. Defaults to the overall compression. */
private String int64Compression = null;

/** Compression of float column. Defaults to the overall compression. */
private String floatCompression = null;

/** Compression of double column. Defaults to the overall compression. */
private String doubleCompression = null;

/** Compression of string, blob and text column. Defaults to the overall compression. */
private String textCompression = null;

/**
* Encoder of value series. default value is PLAIN. For int, long data type, TsFile also supports
* TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, double data type, TsFile
Expand Down Expand Up @@ -361,6 +379,44 @@ public String getValueEncoder(TSDataType dataType) {
}
}

public CompressionType getCompressor(TSDataType dataType) {
String compressionName;
switch (dataType) {
case BOOLEAN:
compressionName = booleanCompression;
break;
case INT32:
case DATE:
compressionName = int32Compression;
break;
case INT64:
case TIMESTAMP:
compressionName = int64Compression;
break;
case FLOAT:
compressionName = floatCompression;
break;
case DOUBLE:
compressionName = doubleCompression;
break;
case STRING:
case BLOB:
case TEXT:
compressionName = textCompression;
break;
default:
compressionName = null;
}

CompressionType compressionType;
if (compressionName != null) {
compressionType = CompressionType.valueOf(compressionName);
} else {
compressionType = compressor;
}
return compressionType;
}

public void setValueEncoder(String valueEncoder) {
this.valueEncoder = valueEncoder;
}
Expand Down Expand Up @@ -689,4 +745,28 @@ public boolean isLz4UseJni() {
public void setLz4UseJni(boolean lz4UseJni) {
this.lz4UseJni = lz4UseJni;
}

public void setBooleanCompression(String booleanCompression) {
this.booleanCompression = booleanCompression;
}

public void setInt32Compression(String int32Compression) {
this.int32Compression = int32Compression;
}

public void setInt64Compression(String int64Compression) {
this.int64Compression = int64Compression;
}

public void setFloatCompression(String floatCompression) {
this.floatCompression = floatCompression;
}

public void setDoubleCompression(String doubleCompression) {
this.doubleCompression = doubleCompression;
}

public void setTextCompression(String textCompression) {
this.textCompression = textCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public void overwriteConfigByCustomSettings(Properties properties) {
writer.setInt(conf::setFloatPrecision, "float_precision");
writer.setString(conf::setValueEncoder, "value_encoder");
writer.setString(conf::setCompressor, "compressor");
writer.setString(conf::setBooleanCompression, "boolean_compressor");
writer.setString(conf::setInt32Compression, "int32_compressor");
writer.setString(conf::setInt64Compression, "int64_compressor");
writer.setString(conf::setFloatCompression, "float_compressor");
writer.setString(conf::setDoubleCompression, "double_compressor");
writer.setString(conf::setTextCompression, "text_compressor");
writer.setInt(conf::setBatchSize, "batch_size");
writer.setString(conf::setEncryptType, "encrypt_type");
writer.setBoolean(conf::setLz4UseJni, "lz4_use_jni");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema) {
timeChunkWriter =
new TimeChunkWriter(
schema.getMeasurementName(),
schema.getCompressor(),
schema.getTimeCompressor(),
schema.getTimeTSEncoding(),
schema.getTimeEncoder(),
this.encryptParam);
Expand All @@ -76,7 +76,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema) {
valueChunkWriterList.add(
new ValueChunkWriter(
valueMeasurementIdList.get(i),
schema.getCompressor(),
schema.getValueCompressor(i),
valueTSDataTypeList.get(i),
valueTSEncodingList.get(i),
valueEncoderList.get(i),
Expand All @@ -92,7 +92,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema, EncryptParameter e
timeChunkWriter =
new TimeChunkWriter(
schema.getMeasurementName(),
schema.getCompressor(),
schema.getTimeCompressor(),
schema.getTimeTSEncoding(),
schema.getTimeEncoder(),
this.encryptParam);
Expand All @@ -107,7 +107,7 @@ public AlignedChunkWriterImpl(VectorMeasurementSchema schema, EncryptParameter e
valueChunkWriterList.add(
new ValueChunkWriter(
valueMeasurementIdList.get(i),
schema.getCompressor(),
schema.getValueCompressor(i),
valueTSDataTypeList.get(i),
valueTSEncodingList.get(i),
valueEncoderList.get(i),
Expand Down Expand Up @@ -193,7 +193,8 @@ public AlignedChunkWriterImpl(List<IMeasurementSchema> schemaList) {
TSEncoding timeEncoding =
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
TSDataType timeType = TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
CompressionType timeCompression = TSFileDescriptor.getInstance().getConfig().getCompressor();
CompressionType timeCompression =
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64);
timeChunkWriter =
new TimeChunkWriter(
"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public MeasurementSchema(String measurementName, TSDataType dataType) {
measurementName,
dataType,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(dataType)),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType),
null);
}

Expand All @@ -77,7 +77,7 @@ public MeasurementSchema(String measurementName, TSDataType dataType, TSEncoding
measurementName,
dataType,
encoding,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
TSFileDescriptor.getInstance().getConfig().getCompressor(dataType),
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public TimeseriesSchema(String fullPath, TSDataType tsDataType) {
fullPath,
tsDataType,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)),
TSFileDescriptor.getInstance().getConfig().getCompressor(),
TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType),
Collections.emptyMap());
}

Expand All @@ -66,7 +66,7 @@ public TimeseriesSchema(String fullPath, TSDataType type, TSEncoding encoding) {
fullPath,
type,
encoding,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
TSFileDescriptor.getInstance().getConfig().getCompressor(type),
Collections.emptyMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ public class VectorMeasurementSchema
RamUsageEstimator.shallowSizeOfInstance(VectorMeasurementSchema.class);
private static final long BUILDER_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class);
private static final byte NO_UNIFIED_COMPRESSOR = -1;

private String deviceId;
private Map<String, Integer> measurementsToIndexMap;
private byte[] types;
private byte[] encodings;
private TSEncodingBuilder[] encodingConverters;
private byte compressor;

/** For compatibility of old versions. */
private byte unifiedCompressor;

/** [0] is for the time column. */
private byte[] compressors;

public VectorMeasurementSchema() {}

Expand All @@ -80,7 +86,34 @@ public VectorMeasurementSchema(
}
this.encodings = encodingsInByte;
this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
this.compressor = compressionType.serialize();
this.unifiedCompressor = compressionType.serialize();
}

public VectorMeasurementSchema(
String deviceId,
String[] subMeasurements,
TSDataType[] types,
TSEncoding[] encodings,
byte[] compressors) {
this.deviceId = deviceId;
this.measurementsToIndexMap = new HashMap<>();
for (int i = 0; i < subMeasurements.length; i++) {
measurementsToIndexMap.put(subMeasurements[i], i);
}
byte[] typesInByte = new byte[types.length];
for (int i = 0; i < types.length; i++) {
typesInByte[i] = types[i].serialize();
}
this.types = typesInByte;

byte[] encodingsInByte = new byte[encodings.length];
for (int i = 0; i < encodings.length; i++) {
encodingsInByte[i] = encodings[i].serialize();
}
this.encodings = encodingsInByte;
this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
this.compressors = compressors;
}

public VectorMeasurementSchema(String deviceId, String[] subMeasurements, TSDataType[] types) {
Expand All @@ -101,7 +134,15 @@ public VectorMeasurementSchema(String deviceId, String[] subMeasurements, TSData
.serialize();
}
this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
this.compressor = TSFileDescriptor.getInstance().getConfig().getCompressor().serialize();
this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
// the first column is time
this.compressors = new byte[subMeasurements.length + 1];
compressors[0] =
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64).serialize();
for (int i = 0; i < types.length; i++) {
compressors[i + 1] =
TSFileDescriptor.getInstance().getConfig().getCompressor(types[i]).serialize();
}
}

public VectorMeasurementSchema(
Expand All @@ -124,9 +165,24 @@ public String getMeasurementName() {
return deviceId;
}

@Deprecated // Aligned series should not invoke this method
@Override
public CompressionType getCompressor() {
return CompressionType.deserialize(compressor);
throw new UnsupportedOperationException("Aligned series should not invoke this method");
}

public CompressionType getTimeCompressor() {
if (compressors != null) {
return CompressionType.deserialize(compressors[0]);
}
return CompressionType.deserialize(unifiedCompressor);
}

public CompressionType getValueCompressor(int index) {
if (compressors != null) {
return CompressionType.deserialize(compressors[index + 1]);
}
return CompressionType.deserialize(unifiedCompressor);
}

@Override
Expand Down Expand Up @@ -276,7 +332,11 @@ public int serializeTo(ByteBuffer buffer) {
for (byte encoding : encodings) {
byteLen += ReadWriteIOUtils.write(encoding, buffer);
}
byteLen += ReadWriteIOUtils.write(compressor, buffer);
byteLen += ReadWriteIOUtils.write(unifiedCompressor, buffer);
if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
buffer.put(compressors);
byteLen += compressors.length;
}

return byteLen;
}
Expand All @@ -297,7 +357,11 @@ public int serializeTo(OutputStream outputStream) throws IOException {
for (byte encoding : encodings) {
byteLen += ReadWriteIOUtils.write(encoding, outputStream);
}
byteLen += ReadWriteIOUtils.write(compressor, outputStream);
byteLen += ReadWriteIOUtils.write(unifiedCompressor, outputStream);
if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
outputStream.write(compressors);
byteLen += compressors.length;
}

return byteLen;
}
Expand Down Expand Up @@ -348,7 +412,15 @@ public static VectorMeasurementSchema deserializeFrom(InputStream inputStream)
}
vectorMeasurementSchema.encodings = encodings;

vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(inputStream);
vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(inputStream);
if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
byte[] compressors = new byte[measurementSize + 1];
int read = inputStream.read(compressors);
if (read != measurementSize) {
throw new IOException("Unexpected end of stream when reading compressors");
}
vectorMeasurementSchema.compressors = compressors;
}
return vectorMeasurementSchema;
}

Expand All @@ -375,7 +447,12 @@ public static VectorMeasurementSchema deserializeFrom(ByteBuffer buffer) {
}
vectorMeasurementSchema.encodings = encodings;

vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(buffer);
vectorMeasurementSchema.unifiedCompressor = ReadWriteIOUtils.readByte(buffer);
if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
byte[] compressors = new byte[measurementSize + 1];
buffer.get(compressors);
vectorMeasurementSchema.compressors = compressors;
}
return vectorMeasurementSchema;
}

Expand All @@ -391,12 +468,13 @@ public boolean equals(Object o) {
return Arrays.equals(types, that.types)
&& Arrays.equals(encodings, that.encodings)
&& Objects.equals(deviceId, that.deviceId)
&& Objects.equals(compressor, that.compressor);
&& Objects.equals(unifiedCompressor, that.unifiedCompressor)
&& Objects.equals(compressors, that.compressors);
}

@Override
public int hashCode() {
return Objects.hash(deviceId, types, encodings, compressor);
return Objects.hash(deviceId, types, encodings, unifiedCompressor, compressors);
}

/** compare by vector name */
Expand Down Expand Up @@ -424,7 +502,14 @@ public String toString() {
TSEncoding.deserialize(encodings[entry.getValue()]).toString());
sc.addTail("],");
}
sc.addTail(CompressionType.deserialize(compressor).toString());
if (unifiedCompressor != NO_UNIFIED_COMPRESSOR) {
sc.addTail(CompressionType.deserialize(unifiedCompressor).toString());
} else {
for (byte compressor : compressors) {
sc.addTail(CompressionType.deserialize(compressor).toString()).addTail(",");
}
}

return sc.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ public void endCurrentChunk() {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void endFile() throws IOException {
if (!canWrite) {
return;
}

checkInMemoryPathCount();
readChunkMetadataAndConstructIndexTree();

Expand Down
Loading