Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co
}
}

public void setChunkGroupSizeThreshold(long chunkGroupSizeThreshold) {
this.chunkGroupSizeThreshold = chunkGroupSizeThreshold;
}

public void registerSchemaTemplate(
String templateName, Map<String, IMeasurementSchema> template, boolean isAligned) {
getSchema().registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template));
Expand Down Expand Up @@ -501,14 +505,16 @@ private List<IMeasurementSchema> checkIsAllMeasurementsInGroup(
}

private IChunkGroupWriter tryToInitialGroupWriter(
IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws IOException {
IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
if (groupWriter == null) {
if (isAligned) {
groupWriter =
isTableModel
? new TableChunkGroupWriterImpl(deviceId, encryptParam)
: new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
initAllSeriesWriterForAlignedSeries(
(AlignedChunkGroupWriterImpl) groupWriter, deviceId, isTableModel);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
Expand All @@ -526,6 +532,21 @@ private IChunkGroupWriter tryToInitialGroupWriter(
return groupWriter;
}

private void initAllSeriesWriterForAlignedSeries(
AlignedChunkGroupWriterImpl alignedChunkGroupWriter, IDeviceID deviceID, boolean isTableModel)
throws IOException {
Schema schema = getSchema();
if (isTableModel) {
alignedChunkGroupWriter.tryToAddSeriesWriter(
schema.getTableSchemaMap().get(deviceID.getTableName()).getColumnSchemas());
} else {
MeasurementGroup deviceSchema = schema.getSeriesSchema(deviceID);
for (IMeasurementSchema measurementSchema : deviceSchema.getMeasurementSchemaMap().values()) {
alignedChunkGroupWriter.tryToAddSeriesWriterInternal(measurementSchema);
}
}
}

/**
* write a record in type of T.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected AbstractTableModelTsFileWriter(File file, long chunkGroupSizeThreshold
}

protected IChunkGroupWriter tryToInitialGroupWriter(
IDeviceID deviceId, boolean isAligned, boolean isTableModel) {
IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws IOException {
IChunkGroupWriter groupWriter = groupWriters.get(deviceId);
if (groupWriter == null) {
if (isAligned) {
Expand All @@ -156,6 +156,7 @@ protected IChunkGroupWriter tryToInitialGroupWriter(
: new AlignedChunkGroupWriterImpl(deviceId, encryptParam);
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.get(deviceId));
initAllSeriesWriterForAlignedSeries((AlignedChunkGroupWriterImpl) groupWriter);
} else {
groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId, encryptParam);
((NonAlignedChunkGroupWriterImpl) groupWriter)
Expand All @@ -167,6 +168,9 @@ protected IChunkGroupWriter tryToInitialGroupWriter(
return groupWriter;
}

protected abstract void initAllSeriesWriterForAlignedSeries(
AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException;

/**
* calculate total memory size occupied by all ChunkGroupWriter instances currently.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.WriteUtils;
import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;

Expand All @@ -40,6 +41,7 @@
public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter {

private String tableName;
private TableSchema tableSchema;
private boolean isTableWriteAligned = true;

public DeviceTableModelWriter(File file, TableSchema tableSchema, long memoryThreshold)
Expand Down Expand Up @@ -74,6 +76,12 @@ public void write(Tablet table) throws IOException, WriteProcessException {
checkMemorySizeAndMayFlushChunks();
}

@Override
protected void initAllSeriesWriterForAlignedSeries(
AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException {
alignedChunkGroupWriter.tryToAddSeriesWriter(tableSchema.getColumnSchemas());
}

private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet)
throws WriteProcessException {
String tabletTableName = tablet.getTableName();
Expand Down Expand Up @@ -102,6 +110,7 @@ private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet)

private void registerTableSchema(TableSchema tableSchema) {
this.tableName = tableSchema.getTableName();
this.tableSchema = tableSchema;
getSchema().registerTableSchema(tableSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.v4.DeviceTableModelReader;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorForTest;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.record.Tablet;
Expand All @@ -37,6 +40,7 @@
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand All @@ -45,6 +49,104 @@

public class TsFileV4ReadWriteInterfacesTest {

@Test
public void testWriteSomeColumns() throws IOException, WriteProcessException {
String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 0, 0);

TableSchema tableSchema =
new TableSchema(
"t1",
Arrays.asList(
new MeasurementSchema("device", TSDataType.STRING),
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT32),
new MeasurementSchema("s3", TSDataType.INT32)),
Arrays.asList(
ColumnCategory.TAG,
ColumnCategory.FIELD,
ColumnCategory.FIELD,
ColumnCategory.FIELD));
Tablet tablet1 =
new Tablet(
tableSchema.getTableName(),
Arrays.asList("device", "s1"),
Arrays.asList(TSDataType.STRING, TSDataType.INT32),
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD));
for (int i = 0; i < 1000; i++) {
tablet1.addTimestamp(i, i);
tablet1.addValue("device", i, "d1");
tablet1.addValue("s1", i, 0);
}
Tablet tablet2 =
new Tablet(
tableSchema.getTableName(),
IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()),
tableSchema.getColumnTypes());
for (int i = 0; i < 1000; i++) {
tablet2.addTimestamp(i, 1005 + i);
tablet2.addValue("device", i, "d1");
tablet2.addValue("s1", i, 1);
tablet2.addValue("s2", i, 1);
tablet2.addValue("s3", i, 1);
}
try (ITsFileWriter writer =
new TsFileWriterBuilder()
.file(new File(filePath))
.tableSchema(tableSchema)
.memoryThreshold(1)
.build()) {
writer.write(tablet1);
writer.write(tablet2);
}
try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) {
TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned();
while (deviceIterator.hasNext()) {
Pair<IDeviceID, Boolean> pair = deviceIterator.next();
List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
reader.getAlignedChunkMetadataByMetadataIndexNode(
pair.getLeft(), deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false);
Assert.assertFalse(alignedChunkMetadataList.isEmpty());
Assert.assertEquals(3, alignedChunkMetadataList.get(0).getValueChunkMetadataList().size());
Assert.assertEquals(
1000,
alignedChunkMetadataList
.get(0)
.getValueChunkMetadataList()
.get(0)
.getStatistics()
.getCount());
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(1));
Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(2));
Assert.assertEquals(3, alignedChunkMetadataList.get(1).getValueChunkMetadataList().size());
Assert.assertEquals(
1000,
alignedChunkMetadataList
.get(1)
.getValueChunkMetadataList()
.get(0)
.getStatistics()
.getCount());
Assert.assertEquals(
1000,
alignedChunkMetadataList
.get(1)
.getValueChunkMetadataList()
.get(1)
.getStatistics()
.getCount());
Assert.assertEquals(
1000,
alignedChunkMetadataList
.get(1)
.getValueChunkMetadataList()
.get(2)
.getStatistics()
.getCount());
}
}
}

@Test
public void testGetTableDeviceMethods() throws Exception {
String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg", 0, 0, 0);
Expand Down
Loading
Loading