diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java index d1a13e7ce..ff37612e2 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/TsFileWriter.java @@ -233,6 +233,10 @@ protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig co } } + public void setChunkGroupSizeThreshold(long chunkGroupSizeThreshold) { + this.chunkGroupSizeThreshold = chunkGroupSizeThreshold; + } + public void registerSchemaTemplate( String templateName, Map template, boolean isAligned) { getSchema().registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template)); @@ -501,7 +505,7 @@ private List checkIsAllMeasurementsInGroup( } private IChunkGroupWriter tryToInitialGroupWriter( - IDeviceID deviceId, boolean isAligned, boolean isTableModel) { + IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws IOException { IChunkGroupWriter groupWriter = groupWriters.get(deviceId); if (groupWriter == null) { if (isAligned) { @@ -509,6 +513,8 @@ private IChunkGroupWriter tryToInitialGroupWriter( isTableModel ? new TableChunkGroupWriterImpl(deviceId, encryptParam) : new AlignedChunkGroupWriterImpl(deviceId, encryptParam); + initAllSeriesWriterForAlignedSeries( + (AlignedChunkGroupWriterImpl) groupWriter, deviceId, isTableModel); if (!isUnseq) { // Sequence File ((AlignedChunkGroupWriterImpl) groupWriter) .setLastTime(alignedDeviceLastTimeMap.get(deviceId)); @@ -526,6 +532,21 @@ private IChunkGroupWriter tryToInitialGroupWriter( return groupWriter; } + private void initAllSeriesWriterForAlignedSeries( + AlignedChunkGroupWriterImpl alignedChunkGroupWriter, IDeviceID deviceID, boolean isTableModel) + throws IOException { + Schema schema = getSchema(); + if (isTableModel) { + alignedChunkGroupWriter.tryToAddSeriesWriter( + schema.getTableSchemaMap().get(deviceID.getTableName()).getColumnSchemas()); + } else { + MeasurementGroup deviceSchema = schema.getSeriesSchema(deviceID); + for (IMeasurementSchema measurementSchema : deviceSchema.getMeasurementSchemaMap().values()) { + alignedChunkGroupWriter.tryToAddSeriesWriterInternal(measurementSchema); + } + } + } + /** * write a record in type of T. * diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java index 92f4c102c..3120bb40a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/AbstractTableModelTsFileWriter.java @@ -146,7 +146,7 @@ protected AbstractTableModelTsFileWriter(File file, long chunkGroupSizeThreshold } protected IChunkGroupWriter tryToInitialGroupWriter( - IDeviceID deviceId, boolean isAligned, boolean isTableModel) { + IDeviceID deviceId, boolean isAligned, boolean isTableModel) throws IOException { IChunkGroupWriter groupWriter = groupWriters.get(deviceId); if (groupWriter == null) { if (isAligned) { @@ -156,6 +156,7 @@ protected IChunkGroupWriter tryToInitialGroupWriter( : new AlignedChunkGroupWriterImpl(deviceId, encryptParam); ((AlignedChunkGroupWriterImpl) groupWriter) .setLastTime(alignedDeviceLastTimeMap.get(deviceId)); + initAllSeriesWriterForAlignedSeries((AlignedChunkGroupWriterImpl) groupWriter); } else { groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId, encryptParam); ((NonAlignedChunkGroupWriterImpl) groupWriter) @@ -167,6 +168,9 @@ protected IChunkGroupWriter tryToInitialGroupWriter( return groupWriter; } + protected abstract void initAllSeriesWriterForAlignedSeries( + AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException; + /** * calculate total memory size occupied by all ChunkGroupWriter instances currently. * diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java index 66fca2cfc..f64f285f6 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/v4/DeviceTableModelWriter.java @@ -29,6 +29,7 @@ import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.WriteUtils; +import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -40,6 +41,7 @@ public class DeviceTableModelWriter extends AbstractTableModelTsFileWriter { private String tableName; + private TableSchema tableSchema; private boolean isTableWriteAligned = true; public DeviceTableModelWriter(File file, TableSchema tableSchema, long memoryThreshold) @@ -74,6 +76,12 @@ public void write(Tablet table) throws IOException, WriteProcessException { checkMemorySizeAndMayFlushChunks(); } + @Override + protected void initAllSeriesWriterForAlignedSeries( + AlignedChunkGroupWriterImpl alignedChunkGroupWriter) throws IOException { + alignedChunkGroupWriter.tryToAddSeriesWriter(tableSchema.getColumnSchemas()); + } + private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet) throws WriteProcessException { String tabletTableName = tablet.getTableName(); @@ -102,6 +110,7 @@ private void checkIsTableExistAndSetColumnCategoryList(Tablet tablet) private void registerTableSchema(TableSchema tableSchema) { this.tableName = tableSchema.getTableName(); + this.tableSchema = tableSchema; getSchema().registerTableSchema(tableSchema); } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java index d9b7147f2..51aa64ce6 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileV4ReadWriteInterfacesTest.java @@ -21,10 +21,13 @@ import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.read.v4.DeviceTableModelReader; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorForTest; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.record.Tablet; @@ -37,6 +40,7 @@ import org.junit.Test; import java.io.File; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -45,6 +49,104 @@ public class TsFileV4ReadWriteInterfacesTest { + @Test + public void testWriteSomeColumns() throws IOException, WriteProcessException { + String filePath = TsFileGeneratorForTest.getTestTsFilePath("db", 0, 0, 0); + + TableSchema tableSchema = + new TableSchema( + "t1", + Arrays.asList( + new MeasurementSchema("device", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)), + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet1 = + new Tablet( + tableSchema.getTableName(), + Arrays.asList("device", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + for (int i = 0; i < 1000; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue("device", i, "d1"); + tablet1.addValue("s1", i, 0); + } + Tablet tablet2 = + new Tablet( + tableSchema.getTableName(), + IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()), + tableSchema.getColumnTypes()); + for (int i = 0; i < 1000; i++) { + tablet2.addTimestamp(i, 1005 + i); + tablet2.addValue("device", i, "d1"); + tablet2.addValue("s1", i, 1); + tablet2.addValue("s2", i, 1); + tablet2.addValue("s3", i, 1); + } + try (ITsFileWriter writer = + new TsFileWriterBuilder() + .file(new File(filePath)) + .tableSchema(tableSchema) + .memoryThreshold(1) + .build()) { + writer.write(tablet1); + writer.write(tablet2); + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { + TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + while (deviceIterator.hasNext()) { + Pair pair = deviceIterator.next(); + List alignedChunkMetadataList = + reader.getAlignedChunkMetadataByMetadataIndexNode( + pair.getLeft(), deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false); + Assert.assertFalse(alignedChunkMetadataList.isEmpty()); + Assert.assertEquals(3, alignedChunkMetadataList.get(0).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(1)); + Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(2)); + Assert.assertEquals(3, alignedChunkMetadataList.get(1).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(1) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(2) + .getStatistics() + .getCount()); + } + } + } + @Test public void testGetTableDeviceMethods() throws Exception { String filePath = TsFileGeneratorForTest.getTestTsFilePath("root.testsg", 0, 0, 0); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java index 17a0d0f13..264bf9e54 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java @@ -26,12 +26,15 @@ import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.ColumnSchema; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.fileSystem.FSFactoryProducer; +import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.Chunk; @@ -41,6 +44,7 @@ import org.apache.tsfile.read.query.dataset.ResultSet; import org.apache.tsfile.read.v4.ITsFileReader; import org.apache.tsfile.read.v4.TsFileReaderBuilder; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.chunk.ChunkWriterImpl; @@ -899,6 +903,181 @@ public void writeTreeTsFileWithUpperCaseColumns() throws IOException, WriteProce } } + @Test + public void testWriteSomeColumnsOfTree() throws IOException, WriteProcessException { + List fullMeasurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + List measurementSchemas1 = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT32)); + IDeviceID device = new StringArrayDeviceID("root.test.d1"); + Tablet tablet1 = + new Tablet( + device, + IMeasurementSchema.getMeasurementNameList(fullMeasurementSchemas), + IMeasurementSchema.getDataTypeList(fullMeasurementSchemas)); + Tablet tablet2 = + new Tablet( + device, + IMeasurementSchema.getMeasurementNameList(measurementSchemas1), + IMeasurementSchema.getDataTypeList(measurementSchemas1)); + for (int i = 0; i < 1000; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue("s1", i, 1); + tablet1.addValue("s2", i, 1); + tablet1.addValue("s3", i, 1); + } + for (int i = 0; i < 1000; i++) { + tablet2.addTimestamp(i, i + 1005); + tablet2.addValue("s1", i, 0); + } + try (TsFileWriter writer = new TsFileWriter(f)) { + writer.registerAlignedTimeseries(device, fullMeasurementSchemas); + writer.setChunkGroupSizeThreshold(1); + writer.writeTree(tablet1); + writer.writeTree(tablet2); + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) { + TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + while (deviceIterator.hasNext()) { + Pair pair = deviceIterator.next(); + List alignedChunkMetadataList = + reader.getAlignedChunkMetadataByMetadataIndexNode( + pair.getLeft(), deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false); + Assert.assertFalse(alignedChunkMetadataList.isEmpty()); + Assert.assertEquals(3, alignedChunkMetadataList.get(0).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(1) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(2) + .getStatistics() + .getCount()); + Assert.assertEquals(3, alignedChunkMetadataList.get(1).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(1)); + Assert.assertNull(alignedChunkMetadataList.get(1).getValueChunkMetadataList().get(2)); + } + } + } + + @Test + public void testWriteSomeColumnsOfTable() throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + "t1", + Arrays.asList( + new MeasurementSchema("device", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)), + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet1 = + new Tablet( + tableSchema.getTableName(), + Arrays.asList("device", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + for (int i = 0; i < 1000; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue("s1", i, 0); + } + Tablet tablet2 = + new Tablet( + tableSchema.getTableName(), + IMeasurementSchema.getMeasurementNameList(tableSchema.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema.getColumnSchemas()), + tableSchema.getColumnTypes()); + for (int i = 0; i < 1000; i++) { + tablet2.addTimestamp(i, 1005 + i); + tablet2.addValue("s1", i, 1); + tablet2.addValue("s2", i, 1); + tablet2.addValue("s3", i, 1); + } + try (TsFileWriter writer = new TsFileWriter(f)) { + writer.registerTableSchema(tableSchema); + writer.setChunkGroupSizeThreshold(1); + writer.writeTable(tablet1); + writer.writeTable(tablet2); + } + try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getPath())) { + TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + while (deviceIterator.hasNext()) { + Pair pair = deviceIterator.next(); + List alignedChunkMetadataList = + reader.getAlignedChunkMetadataByMetadataIndexNode( + pair.getLeft(), deviceIterator.getFirstMeasurementNodeOfCurrentDevice(), false); + Assert.assertFalse(alignedChunkMetadataList.isEmpty()); + Assert.assertEquals(3, alignedChunkMetadataList.get(0).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(0) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(1)); + Assert.assertNull(alignedChunkMetadataList.get(0).getValueChunkMetadataList().get(2)); + Assert.assertEquals(3, alignedChunkMetadataList.get(1).getValueChunkMetadataList().size()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(0) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(1) + .getStatistics() + .getCount()); + Assert.assertEquals( + 1000, + alignedChunkMetadataList + .get(1) + .getValueChunkMetadataList() + .get(2) + .getStatistics() + .getCount()); + } + } + } + @Test public void writeTableTsFileWithUpperCaseColumns() throws IOException, WriteProcessException { setEnv(100 * 1024 * 1024, 10 * 1024);