From 3b389a7c9c811a9f1d1362f55efce6efbb75aa5f Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Tue, 20 May 2025 19:04:05 +0800 Subject: [PATCH 1/8] Add TsFileLastReader for retrieving last points in a TsFile --- cpp/pom.xml | 2 +- java/common/pom.xml | 2 +- java/examples/pom.xml | 4 +- java/pom.xml | 4 +- java/tools/pom.xml | 6 +- java/tsfile/pom.xml | 4 +- .../tsfile/read/TsFileSequenceReader.java | 132 +++++++++++++ .../tsfile/read/reader/TsFileLastReader.java | 177 ++++++++++++++++++ .../apache/tsfile/write/record/Tablet.java | 14 +- .../read/reader/TsFileLastReaderTest.java | 144 ++++++++++++++ pom.xml | 6 +- python/pom.xml | 2 +- 12 files changed, 478 insertions(+), 19 deletions(-) create mode 100644 java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java create mode 100644 java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java diff --git a/cpp/pom.xml b/cpp/pom.xml index d39c67c0d..cc62aa8c5 100644 --- a/cpp/pom.xml +++ b/cpp/pom.xml @@ -22,7 +22,7 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile-cpp pom diff --git a/java/common/pom.xml b/java/common/pom.xml index 0eb4066d3..bcd54f5e1 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT common TsFile: Java: Common diff --git a/java/examples/pom.xml b/java/examples/pom.xml index 9223f5960..5a484cfc1 100644 --- a/java/examples/pom.xml +++ b/java/examples/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT examples TsFile: Java: Examples @@ -36,7 +36,7 @@ org.apache.tsfile tsfile - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT diff --git a/java/pom.xml b/java/pom.xml index 1d99dbba4..df7cec5a8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -24,10 +24,10 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT pom TsFile: Java diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 0bc2c89a8..8cc58d1ad 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tools TsFile: Java: Tools @@ -32,7 +32,7 @@ org.apache.tsfile common - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT commons-cli @@ -50,7 +50,7 @@ org.apache.tsfile tsfile - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT ch.qos.logback diff --git a/java/tsfile/pom.xml b/java/tsfile/pom.xml index 5b22ea2ea..8625f0cfd 100644 --- a/java/tsfile/pom.xml +++ b/java/tsfile/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile TsFile: Java: TsFile @@ -38,7 +38,7 @@ org.apache.tsfile common - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT com.github.luben diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index dc07bfc2e..1bf69c42c 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -87,9 +87,11 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -1492,6 +1494,11 @@ public Map> getAllTimeseriesMetadata( return timeseriesMetadataMap; } + public Iterator>> iterAllTimeseriesMetadata( + boolean needChunkMetadata) throws IOException { + return new TimeseriesMetadataIterator(needChunkMetadata); + } + /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */ public List getDeviceTimeseriesMetadataWithoutChunkMetadata(IDeviceID device) throws IOException { @@ -2973,4 +2980,129 @@ public int hashCode() { public DeserializeConfig getDeserializeContext() { return deserializeConfig; } + + private class TimeseriesMetadataIterator + implements Iterator>> { + + private final Deque nodeStack = new ArrayDeque<>(); + private final boolean needChunkMetadata; + private Pair> nextValue; + private MetadataIndexNode currentLeafDeviceNode; + private int currentLeafDeviceNodeIndex; + + public TimeseriesMetadataIterator(boolean needChunkMetadata) throws IOException { + this.needChunkMetadata = needChunkMetadata; + if (tsFileMetaData == null) { + readFileMetadata(); + } + + nodeStack.addAll(tsFileMetaData.getTableMetadataIndexNodeMap().values()); + } + + @Override + public boolean hasNext() { + if (nextValue != null) { + return true; + } + + try { + loadNextValue(); + } catch (IOException e) { + logger.warn("Cannot read timeseries metadata from {},", file, e); + return false; + } + return nextValue != null; + } + + @Override + public Pair> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Pair> ret = nextValue; + nextValue = null; + return ret; + } + + private void loadNextLeafDeviceNode() throws IOException { + while (!nodeStack.isEmpty()) { + MetadataIndexNode node = nodeStack.pop(); + MetadataIndexNodeType nodeType = node.getNodeType(); + if (nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE)) { + currentLeafDeviceNode = node; + currentLeafDeviceNodeIndex = 0; + return; + } + + List childrenIndex = node.getChildren(); + for (int i = 0; i < childrenIndex.size(); i++) { + long endOffset; + IMetadataIndexEntry childIndex = childrenIndex.get(i); + endOffset = node.getEndOffset(); + if (i != childrenIndex.size() - 1) { + endOffset = childrenIndex.get(i + 1).getOffset(); + } + + MetadataIndexNode child; + if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) { + ByteBuffer buffer = readData(childIndex.getOffset(), endOffset); + child = deserializeConfig.deserializeMetadataIndexNode(buffer, true); + } else { + tsFileInput.position(childIndex.getOffset()); + child = + deserializeConfig.deserializeMetadataIndexNode( + tsFileInput.wrapAsInputStream(), true); + } + nodeStack.push(child); + } + } + } + + private void loadNextValue() throws IOException { + if (currentLeafDeviceNode == null + || currentLeafDeviceNodeIndex >= currentLeafDeviceNode.getChildren().size()) { + currentLeafDeviceNode = null; + loadNextLeafDeviceNode(); + } + if (currentLeafDeviceNode == null) { + return; + } + + IMetadataIndexEntry childIndex = + currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex); + int childNum = currentLeafDeviceNode.getChildren().size(); + IDeviceID deviceId = ((DeviceMetadataIndexEntry) childIndex).getDeviceID(); + + Map> nextValueMap = new HashMap<>(1); + long endOffset = currentLeafDeviceNode.getEndOffset(); + if (currentLeafDeviceNodeIndex != childNum - 1) { + endOffset = + currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex + 1).getOffset(); + } + if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) { + ByteBuffer nextBuffer = readData(childIndex.getOffset(), endOffset); + generateMetadataIndex( + childIndex, + nextBuffer, + deviceId, + currentLeafDeviceNode.getNodeType(), + nextValueMap, + needChunkMetadata); + } else { + // when the buffer length is over than Integer.MAX_VALUE, + // using tsFileInput to get timeseriesMetadataList + generateMetadataIndexUsingTsFileInput( + childIndex, + childIndex.getOffset(), + endOffset, + deviceId, + currentLeafDeviceNode.getNodeType(), + nextValueMap, + needChunkMetadata); + } + currentLeafDeviceNodeIndex++; + Entry> entry = nextValueMap.entrySet().iterator().next(); + nextValue = new Pair<>(entry.getKey(), entry.getValue()); + } + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java new file mode 100644 index 000000000..780164fcb --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.stream.Collectors; + +/** Conveniently retrieve last points of all timeseries from a TsFile. */ +public class TsFileLastReader + implements AutoCloseable, Iterator>>> { + + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileLastReader.class); + + private final TsFileSequenceReader sequenceReader; + private boolean asyncIO = true; + private Iterator>> timeseriesMetadataIter; + private Pair>> nextValue; + + private BlockingQueue>>> lastValueQueue; + private ForkJoinTask asyncTask; + + public TsFileLastReader(String filePath) throws IOException { + sequenceReader = new TsFileSequenceReader(filePath); + } + + public TsFileLastReader(String filePath, boolean asyncIO) throws IOException { + this(filePath); + this.asyncIO = asyncIO; + } + + @Override + public boolean hasNext() { + if (timeseriesMetadataIter == null) { + try { + init(); + } catch (IOException e) { + LOGGER.error("Cannot read timeseries metadata from {}", sequenceReader.getFileName(), e); + return false; + } + } + + // already meet the terminator + if (nextValue != null) { + return nextValue.getLeft() != null; + } + + if (asyncIO) { + try { + nextValue = lastValueQueue.take(); + if (nextValue.getLeft() == null) { + // the terminator + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } else { + if (!timeseriesMetadataIter.hasNext()) { + nextValue = new Pair<>(null, null); + } else { + Pair> next = timeseriesMetadataIter.next(); + nextValue = new Pair<>(next.left, convertToLastPoints(next.right)); + } + } + return nextValue.left != null; + } + + /** + * @return (deviceId, measurementId, lastPoint) + */ + @Override + public Pair>> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Pair>> ret = nextValue; + nextValue = null; + return ret; + } + + private List> convertToLastPoints( + List timeseriesMetadataList) { + return timeseriesMetadataList.stream() + .map( + seriesMeta -> + new Pair<>( + seriesMeta.getMeasurementId(), + new TimeValuePair( + seriesMeta.getStatistics().getEndTime(), + TsPrimitiveType.getByType( + seriesMeta.getTsDataType() == TSDataType.VECTOR + ? TSDataType.INT64 + : seriesMeta.getTsDataType(), + seriesMeta.getTsDataType() == TSDataType.VECTOR + ? seriesMeta.getStatistics().getEndTime() + : seriesMeta.getStatistics().getLastValue())))) + .collect(Collectors.toList()); + } + + private void init() throws IOException { + timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false); + if (asyncIO) { + int queueCapacity = 1024; + lastValueQueue = new ArrayBlockingQueue<>(queueCapacity); + asyncTask = + ForkJoinPool.commonPool() + .submit( + () -> { + try { + while (timeseriesMetadataIter.hasNext()) { + Pair> deviceSeriesMetadata = + timeseriesMetadataIter.next(); + lastValueQueue.put( + new Pair<>( + deviceSeriesMetadata.left, + convertToLastPoints(deviceSeriesMetadata.right))); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.error("Error while reading timeseries metadata", e); + } finally { + try { + lastValueQueue.put(new Pair<>(null, null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return null; + }); + } + } + + @Override + public void close() throws Exception { + if (asyncIO) { + asyncTask.cancel(true); + } + sequenceReader.close(); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java index f64dc059d..d84cd0577 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java @@ -412,12 +412,18 @@ public void addValue(int rowIndex, String measurement, int val) { @TsFileApi public void addValue(int rowIndex, int columnIndex, int val) { - if (!(values[columnIndex] instanceof int[])) { + if (!(values[columnIndex] instanceof int[]) && !(values[columnIndex] instanceof long[])) { throw new IllegalArgumentException( - "The data type of column index " + columnIndex + " is not INT32"); + "The data type of column index " + columnIndex + " is not INT32 or INT64"); } - final int[] sensor = (int[]) values[columnIndex]; - sensor[rowIndex] = val; + if (values[columnIndex] instanceof int[]) { + final int[] sensor = (int[]) values[columnIndex]; + sensor[rowIndex] = val; + } else if (values[columnIndex] instanceof long[]) { + final long[] sensor = (long[]) values[columnIndex]; + sensor[rowIndex] = val; + } + updateBitMap(rowIndex, columnIndex, false); } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java new file mode 100644 index 000000000..f3aad0c8b --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TsFileLastReaderTest { + private final String filePath = "target/test.tsfile"; + private final File file = new File(filePath); + + private void createFile(int deviceNum, int measurementNum, int seriesPointNum) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { + List measurementSchemaList = new ArrayList<>(); + for (int j = 0; j < measurementNum; j++) { + measurementSchemaList.add(new MeasurementSchema("s" + j, TSDataType.INT64)); + } + for (int i = 0; i < deviceNum; i++) { + writer.registerAlignedTimeseries("device" + i, measurementSchemaList); + } + + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, seriesPointNum); + for (int k = 0; k < seriesPointNum; k++) { + tablet.addTimestamp(k, k); + } + for (int j = 0; j < measurementNum; j++) { + for (int k = 0; k < seriesPointNum; k++) { + tablet.addValue(k, j, k); + } + } + writer.writeTree(tablet); + } + } + } + + private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) throws Exception { + long startTime = System.currentTimeMillis(); + Set devices = new HashSet<>(); + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, false)) { + while (lastReader.hasNext()) { + Set measurements = new HashSet<>(); + Pair>> next = lastReader.next(); + assertFalse(devices.contains(next.left)); + devices.add(next.left); + + // time column included + assertEquals(measurementNum + 1, next.getRight().size()); + next.right.forEach( + pair -> { + measurements.add(pair.getLeft()); + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); + assertEquals(seriesPointNum - 1, pair.getRight().getValue().getLong()); + }); + assertEquals(measurementNum + 1, measurements.size()); + } + } + assertEquals(deviceNum, devices.size()); + System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); + } + + private void testReadLast(int deviceNum, int measurementNum, int seriesPointNum) + throws Exception { + createFile(deviceNum, measurementNum, seriesPointNum); + doReadLast(deviceNum, measurementNum, seriesPointNum); + file.delete(); + } + + @Test + public void testSmall() throws Exception { + testReadLast(10, 10, 10); + } + + @Test + public void testManyDevices() throws Exception { + testReadLast(10000, 10, 10); + } + + @Test + public void testManyMeasurement() throws Exception { + testReadLast(10, 10000, 10); + } + + @Test + public void testManyPoints() throws Exception { + testReadLast(100, 10, 10000); + } + + @Test + public void testManyMany() throws Exception { + testReadLast(1000, 1000, 1000); + } + + @Ignore("Performance") + @Test + public void testManyRead() throws Exception { + int deviceNum = 10000; + int measurementNum = 1000; + int seriesPointNum = 1; + createFile(deviceNum, measurementNum, seriesPointNum); + for (int i = 0; i < 10; i++) { + doReadLast(deviceNum, measurementNum, seriesPointNum); + } + file.delete(); + } +} diff --git a/pom.xml b/pom.xml index 2e1d55335..08fb36557 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT pom Apache TsFile Project Parent POM @@ -68,7 +68,7 @@ maven-surefire-plugin 3.5.0 - ${argLine} -Xmx1024m + ${argLine} diff --git a/python/pom.xml b/python/pom.xml index 051e77f0e..88e48818e 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -22,7 +22,7 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile-python pom From c29178f312f64c002e5abf1bb4c0e13b68abd5a1 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Wed, 21 May 2025 16:31:08 +0800 Subject: [PATCH 2/8] Fix query last of blob series (cherry picked from commit 3ab6c404fcff8711456e9b03cc6b9780951dbfdc) --- .../tsfile/file/metadata/ChunkMetadata.java | 4 + .../file/metadata/TimeseriesMetadata.java | 26 ++- .../metadata/statistics/TimeStatistics.java | 8 +- .../tsfile/read/TsFileSequenceReader.java | 58 +++++-- .../org/apache/tsfile/read/common/Chunk.java | 4 + .../tsfile/read/reader/TsFileLastReader.java | 152 ++++++++++++++---- .../read/reader/page/ValuePageReader.java | 4 + .../org/apache/tsfile/utils/WriteUtils.java | 5 + .../read/reader/TsFileLastReaderTest.java | 36 ++++- 9 files changed, 239 insertions(+), 58 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java index 653a8991b..4be3b3ace 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java @@ -430,4 +430,8 @@ public boolean hasNullValue(int measurementIndex) { public MeasurementSchema toMeasurementSchema() { return new MeasurementSchema(measurementUid, tsDataType, encoding, compressionType); } + + public TSEncoding getEncoding() { + return encoding; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java index 657f183c2..78a69c573 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java @@ -119,8 +119,11 @@ public TimeseriesMetadata(TimeseriesMetadata timeseriesMetadata) { // new one this.chunkMetadataList = timeseriesMetadata.chunkMetadataList; } - public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) { + return deserializeFrom(buffer, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) { TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(buffer)); timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readVarIntString(buffer)); @@ -128,7 +131,8 @@ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean need int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(Statistics.deserialize(buffer, timeseriesMetaData.dataType)); - if (needChunkMetadata) { + if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) || + (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); timeseriesMetaData.chunkMetadataList = new ArrayList<>(); @@ -145,6 +149,11 @@ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean need public static TimeseriesMetadata deserializeFrom( TsFileInput tsFileInput, boolean needChunkMetadata) throws IOException { + return deserializeFrom(tsFileInput, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + TsFileInput tsFileInput, boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { InputStream inputStream = tsFileInput.wrapAsInputStream(); TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(inputStream)); @@ -155,7 +164,8 @@ public static TimeseriesMetadata deserializeFrom( timeseriesMetaData.setStatistics( Statistics.deserialize(inputStream, timeseriesMetaData.dataType)); long startOffset = tsFileInput.position(); - if (needChunkMetadata) { + if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { timeseriesMetaData.chunkMetadataList = new ArrayList<>(); while (tsFileInput.position() < startOffset + chunkMetaDataListDataSize) { timeseriesMetaData.chunkMetadataList.add( @@ -175,6 +185,12 @@ public static TimeseriesMetadata deserializeFrom( */ public static TimeseriesMetadata deserializeFrom( ByteBuffer buffer, Set excludedMeasurements, boolean needChunkMetadata) { + return deserializeFrom(buffer, excludedMeasurements, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + ByteBuffer buffer, Set excludedMeasurements, boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) { byte timeseriesType = ReadWriteIOUtils.readByte(buffer); String measurementID = ReadWriteIOUtils.readVarIntString(buffer); TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer); @@ -188,7 +204,9 @@ public static TimeseriesMetadata deserializeFrom( timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(statistics); - if (!excludedMeasurements.contains(measurementID) && needChunkMetadata) { + if (!excludedMeasurements.contains(measurementID) && + ((tsDataType != TSDataType.BLOB && needChunkMetadataForNonBlob) || + (tsDataType == TSDataType.BLOB && needChunkMetadataForBlob))) { // measurement is not in the excluded set and need chunk metadata ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java index db12ac4bc..cb8af6eab 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java @@ -76,22 +76,22 @@ public void update(long[] time, int batchSize, int arrayOffset) { @Override public Long getMinValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "min value")); + return getStartTime(); } @Override public Long getMaxValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "max value")); + return getEndTime(); } @Override public Long getFirstValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "first value")); + return getStartTime(); } @Override public Long getLastValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "last value")); + return getEndTime(); } @Override diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 1bf69c42c..2f7fb4452 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -1342,13 +1342,24 @@ private void generateMetadataIndex( IDeviceID deviceId, MetadataIndexNodeType type, Map> timeseriesMetadataMap, - boolean needChunkMetadata) + boolean needChunkMetadata) throws IOException { + generateMetadataIndex(metadataIndex, buffer, deviceId, type, timeseriesMetadataMap, needChunkMetadata, needChunkMetadata); + } + + private void generateMetadataIndex( + IMetadataIndexEntry metadataIndex, + ByteBuffer buffer, + IDeviceID deviceId, + MetadataIndexNodeType type, + Map> timeseriesMetadataMap, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) throws IOException { try { if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { List timeseriesMetadataList = new ArrayList<>(); while (buffer.hasRemaining()) { - timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata)); + timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -1377,7 +1388,8 @@ private void generateMetadataIndex( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } else { // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList @@ -1388,7 +1400,8 @@ private void generateMetadataIndex( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } } } @@ -1407,7 +1420,19 @@ private void generateMetadataIndexUsingTsFileInput( IDeviceID deviceId, MetadataIndexNodeType type, Map> timeseriesMetadataMap, - boolean needChunkMetadata) + boolean needChunkMetadata) throws IOException { + generateMetadataIndexUsingTsFileInput(metadataIndex, start, end, deviceId, type, timeseriesMetadataMap, needChunkMetadata, needChunkMetadata); + } + + private void generateMetadataIndexUsingTsFileInput( + IMetadataIndexEntry metadataIndex, + long start, + long end, + IDeviceID deviceId, + MetadataIndexNodeType type, + Map> timeseriesMetadataMap, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) throws IOException { try { tsFileInput.position(start); @@ -1415,7 +1440,7 @@ private void generateMetadataIndexUsingTsFileInput( List timeseriesMetadataList = new ArrayList<>(); while (tsFileInput.position() < end) { timeseriesMetadataList.add( - TimeseriesMetadata.deserializeFrom(tsFileInput, needChunkMetadata)); + TimeseriesMetadata.deserializeFrom(tsFileInput, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -1442,7 +1467,8 @@ private void generateMetadataIndexUsingTsFileInput( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } } } catch (StopReadTsFileByInterruptException e) { @@ -1495,8 +1521,8 @@ public Map> getAllTimeseriesMetadata( } public Iterator>> iterAllTimeseriesMetadata( - boolean needChunkMetadata) throws IOException { - return new TimeseriesMetadataIterator(needChunkMetadata); + boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + return new TimeseriesMetadataIterator(needChunkMetadataForNonBlob, needChunkMetadataForBlob); } /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */ @@ -2985,13 +3011,15 @@ private class TimeseriesMetadataIterator implements Iterator>> { private final Deque nodeStack = new ArrayDeque<>(); - private final boolean needChunkMetadata; + private final boolean needChunkMetadataForNonBlob; + private final boolean needCHunkMetadataForBlob; private Pair> nextValue; private MetadataIndexNode currentLeafDeviceNode; private int currentLeafDeviceNodeIndex; - public TimeseriesMetadataIterator(boolean needChunkMetadata) throws IOException { - this.needChunkMetadata = needChunkMetadata; + public TimeseriesMetadataIterator(boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + this.needChunkMetadataForNonBlob = needChunkMetadataForNonBlob; + this.needCHunkMetadataForBlob = needChunkMetadataForBlob; if (tsFileMetaData == null) { readFileMetadata(); } @@ -3087,7 +3115,8 @@ private void loadNextValue() throws IOException { deviceId, currentLeafDeviceNode.getNodeType(), nextValueMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needCHunkMetadataForBlob); } else { // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList @@ -3098,7 +3127,8 @@ private void loadNextValue() throws IOException { deviceId, currentLeafDeviceNode.getNodeType(), nextValueMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needCHunkMetadataForBlob); } currentLeafDeviceNodeIndex++; Entry> entry = nextValueMap.entrySet().iterator().next(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java index dd7791971..a7e2e9eaf 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java @@ -383,4 +383,8 @@ public Chunk rewrite(TSDataType newType) throws IOException { chunkWriter.getStatistics(), encryptParam); } + + public boolean isSinglePageChunk() { + return (getHeader().getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java index 780164fcb..07ceb4011 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -19,11 +19,22 @@ package org.apache.tsfile.read.reader; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import org.apache.tsfile.compress.IUnCompressor; +import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.read.reader.page.ValuePageReader; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; @@ -38,9 +49,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; -import java.util.stream.Collectors; -/** Conveniently retrieve last points of all timeseries from a TsFile. */ +/** Conveniently retrieve last points of all timeseries from a TsFile.*/ public class TsFileLastReader implements AutoCloseable, Iterator>>> { @@ -80,23 +90,37 @@ public boolean hasNext() { } if (asyncIO) { + return hasNextAsync(); + } else { + return hasNextSync(); + } + } + + private boolean hasNextSync() { + if (!timeseriesMetadataIter.hasNext()) { + nextValue = new Pair<>(null, null); + } else { + Pair> next = timeseriesMetadataIter.next(); try { - nextValue = lastValueQueue.take(); - if (nextValue.getLeft() == null) { - // the terminator - return false; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + nextValue = new Pair<>(next.left, convertToLastPoints(next.right)); + } catch (IOException e) { + LOGGER.error("Cannot read timeseries metadata from {}", sequenceReader.getFileName(), e); return false; } - } else { - if (!timeseriesMetadataIter.hasNext()) { - nextValue = new Pair<>(null, null); - } else { - Pair> next = timeseriesMetadataIter.next(); - nextValue = new Pair<>(next.left, convertToLastPoints(next.right)); + } + return nextValue.left != null; + } + + private boolean hasNextAsync() { + try { + nextValue = lastValueQueue.take(); + if (nextValue.getLeft() == null) { + // the terminator + return false; } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; } return nextValue.left != null; } @@ -115,26 +139,90 @@ public Pair>> next() { } private List> convertToLastPoints( - List timeseriesMetadataList) { - return timeseriesMetadataList.stream() - .map( - seriesMeta -> - new Pair<>( - seriesMeta.getMeasurementId(), - new TimeValuePair( - seriesMeta.getStatistics().getEndTime(), - TsPrimitiveType.getByType( - seriesMeta.getTsDataType() == TSDataType.VECTOR - ? TSDataType.INT64 - : seriesMeta.getTsDataType(), - seriesMeta.getTsDataType() == TSDataType.VECTOR - ? seriesMeta.getStatistics().getEndTime() - : seriesMeta.getStatistics().getLastValue())))) - .collect(Collectors.toList()); + List timeseriesMetadataList) throws IOException { + boolean isAligned = timeseriesMetadataList.get(0).getTsDataType() == TSDataType.VECTOR; + List> list = new ArrayList<>(); + for (TimeseriesMetadata meta : timeseriesMetadataList) { + Pair stringTimeValuePairPair = convertToLastPoint(meta, isAligned); + list.add(stringTimeValuePairPair); + } + return list; + } + + private TimeValuePair readNonAlignedLastPoint(Chunk chunk) throws IOException { + ChunkReader chunkReader = new ChunkReader(chunk); + BatchData batchData = null; + while (chunkReader.hasNextSatisfiedPage()) { + batchData = chunkReader.nextPageData(); + } + if (batchData != null) { + return batchData.getLastPairBeforeOrEqualTimestamp(Long.MAX_VALUE); + } else { + return null; + } + } + + private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetadata, long endTime) throws IOException { + ByteBuffer chunkData = chunk.getData(); + PageHeader lastPageHeader = null; + ByteBuffer lastPageData = null; + while (chunkData.hasRemaining()) { + if (chunk.isSinglePageChunk()) { + lastPageHeader = PageHeader.deserializeFrom(chunkData, chunkMetadata.getStatistics()); + } else { + lastPageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB); + } + lastPageData = chunkData.slice(); + chunkData.position(chunkData.position() + lastPageHeader.getCompressedSize()); + } + if (lastPageHeader != null) { + CompressionType compressionType = chunk.getHeader().getCompressionType(); + if (compressionType != CompressionType.UNCOMPRESSED) { + ByteBuffer uncompressedPage = ByteBuffer.allocate(lastPageHeader.getUncompressedSize()); + IUnCompressor.getUnCompressor(compressionType).uncompress(lastPageData, uncompressedPage); + lastPageData = uncompressedPage; + lastPageData.flip(); + } + + ValuePageReader valuePageReader = new ValuePageReader(lastPageHeader, lastPageData, TSDataType.BLOB, + Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.BLOB)); + TsPrimitiveType lastValue = null; + for (int i = 0; i < valuePageReader.getSize(); i++) { + // the timestamp here is not necessary + lastValue = valuePageReader.nextValue(0, i); + } + return new TimeValuePair(endTime, lastValue); + } else { + return null; + } + } + + private Pair convertToLastPoint( + TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException { + if (seriesMeta.getTsDataType() != TSDataType.BLOB) { + return new Pair<>( + seriesMeta.getMeasurementId(), + new TimeValuePair( + seriesMeta.getStatistics().getEndTime(), + seriesMeta.getTsDataType() == TSDataType.VECTOR ? + TsPrimitiveType.getByType(TSDataType.INT64, seriesMeta.getStatistics().getEndTime()) : + TsPrimitiveType.getByType(seriesMeta.getTsDataType(), + seriesMeta.getStatistics().getLastValue()))); + } else { + ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList() + .get(seriesMeta.getChunkMetadataList().size() - 1); + Chunk chunk = sequenceReader.readMemChunk(chunkMetadata); + + if (!isAligned) { + return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); + } else { + return new Pair<>(seriesMeta.getMeasurementId(), readAlignedLastPoint(chunk, chunkMetadata, seriesMeta.getStatistics().getEndTime())); + } + } } private void init() throws IOException { - timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false); + timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false, true); if (asyncIO) { int queueCapacity = 1024; lastValueQueue = new ArrayBlockingQueue<>(queueCapacity); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java index 317b493c5..fc9dd3755 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java @@ -619,4 +619,8 @@ public byte[] getBitmap() throws IOException { uncompressDataIfNecessary(); return Arrays.copyOf(bitmap, bitmap.length); } + + public int getSize() { + return size; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java b/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java index cbd9e7014..844dcfd37 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java @@ -61,4 +61,9 @@ public static int compareStrings(String a, String b) { } return a.compareTo(b); } + + @FunctionalInterface + public interface TabletAddValueFunction { + void addValue(Tablet tablet, int row, int column); + } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index f3aad0c8b..912c6b5a7 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -19,11 +19,21 @@ package org.apache.tsfile.read.reader; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.IntFunction; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.utils.WriteUtils.TabletAddValueFunction; import org.apache.tsfile.write.TsFileWriter; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -43,15 +53,26 @@ import static org.junit.Assert.assertFalse; public class TsFileLastReaderTest { + + private static final List dataTypes = Arrays.asList(TSDataType.INT64, TSDataType.BLOB); + private static final Map typeAddValueFunctions = new HashMap<>(); + static { + typeAddValueFunctions.put(TSDataType.INT64, ((tablet, row, column) -> tablet.addValue(row, column, (long) row))); + typeAddValueFunctions.put(TSDataType.BLOB, ((tablet, row, column) -> tablet.addValue(row, column, Long.toBinaryString(row).getBytes( + StandardCharsets.UTF_8)))); + } + private final String filePath = "target/test.tsfile"; private final File file = new File(filePath); + private void createFile(int deviceNum, int measurementNum, int seriesPointNum) throws IOException, WriteProcessException { try (TsFileWriter writer = new TsFileWriter(file)) { List measurementSchemaList = new ArrayList<>(); for (int j = 0; j < measurementNum; j++) { - measurementSchemaList.add(new MeasurementSchema("s" + j, TSDataType.INT64)); + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType)); } for (int i = 0; i < deviceNum; i++) { writer.registerAlignedTimeseries("device" + i, measurementSchemaList); @@ -63,8 +84,9 @@ private void createFile(int deviceNum, int measurementNum, int seriesPointNum) tablet.addTimestamp(k, k); } for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); for (int k = 0; k < seriesPointNum; k++) { - tablet.addValue(k, j, k); + typeAddValueFunctions.get(tsDataType).addValue(tablet, k, j); } } writer.writeTree(tablet); @@ -75,7 +97,7 @@ private void createFile(int deviceNum, int measurementNum, int seriesPointNum) private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) throws Exception { long startTime = System.currentTimeMillis(); Set devices = new HashSet<>(); - try (TsFileLastReader lastReader = new TsFileLastReader(filePath, false)) { + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true)) { while (lastReader.hasNext()) { Set measurements = new HashSet<>(); Pair>> next = lastReader.next(); @@ -88,7 +110,13 @@ private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) t pair -> { measurements.add(pair.getLeft()); assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); - assertEquals(seriesPointNum - 1, pair.getRight().getValue().getLong()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals(new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), value.getBinary()); + } + }); assertEquals(measurementNum + 1, measurements.size()); } From cf421a573826bfb49e5f527decba83503b88c4a7 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Wed, 21 May 2025 16:41:31 +0800 Subject: [PATCH 3/8] spotless (cherry picked from commit 1f70820f622a9e1a8bb10f5e4ab00dbecf22d3f5) --- .../file/metadata/TimeseriesMetadata.java | 25 +++++++----- .../tsfile/read/TsFileSequenceReader.java | 35 +++++++++++++---- .../tsfile/read/reader/TsFileLastReader.java | 39 ++++++++++++------- .../read/reader/TsFileLastReaderTest.java | 34 +++++++++------- 4 files changed, 87 insertions(+), 46 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java index 78a69c573..25b1a1276 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java @@ -119,11 +119,13 @@ public TimeseriesMetadata(TimeseriesMetadata timeseriesMetadata) { // new one this.chunkMetadataList = timeseriesMetadata.chunkMetadataList; } + public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) { return deserializeFrom(buffer, needChunkMetadata, needChunkMetadata); } - public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) { + public static TimeseriesMetadata deserializeFrom( + ByteBuffer buffer, boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) { TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(buffer)); timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readVarIntString(buffer)); @@ -131,8 +133,8 @@ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean need int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(Statistics.deserialize(buffer, timeseriesMetaData.dataType)); - if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) || - (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { + if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); timeseriesMetaData.chunkMetadataList = new ArrayList<>(); @@ -153,7 +155,10 @@ public static TimeseriesMetadata deserializeFrom( } public static TimeseriesMetadata deserializeFrom( - TsFileInput tsFileInput, boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + TsFileInput tsFileInput, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) + throws IOException { InputStream inputStream = tsFileInput.wrapAsInputStream(); TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(inputStream)); @@ -165,7 +170,7 @@ public static TimeseriesMetadata deserializeFrom( Statistics.deserialize(inputStream, timeseriesMetaData.dataType)); long startOffset = tsFileInput.position(); if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) - || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { + || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { timeseriesMetaData.chunkMetadataList = new ArrayList<>(); while (tsFileInput.position() < startOffset + chunkMetaDataListDataSize) { timeseriesMetaData.chunkMetadataList.add( @@ -189,7 +194,9 @@ public static TimeseriesMetadata deserializeFrom( } public static TimeseriesMetadata deserializeFrom( - ByteBuffer buffer, Set excludedMeasurements, boolean needChunkMetadataForNonBlob, + ByteBuffer buffer, + Set excludedMeasurements, + boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) { byte timeseriesType = ReadWriteIOUtils.readByte(buffer); String measurementID = ReadWriteIOUtils.readVarIntString(buffer); @@ -204,9 +211,9 @@ public static TimeseriesMetadata deserializeFrom( timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(statistics); - if (!excludedMeasurements.contains(measurementID) && - ((tsDataType != TSDataType.BLOB && needChunkMetadataForNonBlob) || - (tsDataType == TSDataType.BLOB && needChunkMetadataForBlob))) { + if (!excludedMeasurements.contains(measurementID) + && ((tsDataType != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (tsDataType == TSDataType.BLOB && needChunkMetadataForBlob))) { // measurement is not in the excluded set and need chunk metadata ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 2f7fb4452..033e3d044 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -1342,8 +1342,16 @@ private void generateMetadataIndex( IDeviceID deviceId, MetadataIndexNodeType type, Map> timeseriesMetadataMap, - boolean needChunkMetadata) throws IOException { - generateMetadataIndex(metadataIndex, buffer, deviceId, type, timeseriesMetadataMap, needChunkMetadata, needChunkMetadata); + boolean needChunkMetadata) + throws IOException { + generateMetadataIndex( + metadataIndex, + buffer, + deviceId, + type, + timeseriesMetadataMap, + needChunkMetadata, + needChunkMetadata); } private void generateMetadataIndex( @@ -1359,7 +1367,9 @@ private void generateMetadataIndex( if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { List timeseriesMetadataList = new ArrayList<>(); while (buffer.hasRemaining()) { - timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); + timeseriesMetadataList.add( + TimeseriesMetadata.deserializeFrom( + buffer, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -1420,8 +1430,17 @@ private void generateMetadataIndexUsingTsFileInput( IDeviceID deviceId, MetadataIndexNodeType type, Map> timeseriesMetadataMap, - boolean needChunkMetadata) throws IOException { - generateMetadataIndexUsingTsFileInput(metadataIndex, start, end, deviceId, type, timeseriesMetadataMap, needChunkMetadata, needChunkMetadata); + boolean needChunkMetadata) + throws IOException { + generateMetadataIndexUsingTsFileInput( + metadataIndex, + start, + end, + deviceId, + type, + timeseriesMetadataMap, + needChunkMetadata, + needChunkMetadata); } private void generateMetadataIndexUsingTsFileInput( @@ -1440,7 +1459,8 @@ private void generateMetadataIndexUsingTsFileInput( List timeseriesMetadataList = new ArrayList<>(); while (tsFileInput.position() < end) { timeseriesMetadataList.add( - TimeseriesMetadata.deserializeFrom(tsFileInput, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); + TimeseriesMetadata.deserializeFrom( + tsFileInput, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -3017,7 +3037,8 @@ private class TimeseriesMetadataIterator private MetadataIndexNode currentLeafDeviceNode; private int currentLeafDeviceNodeIndex; - public TimeseriesMetadataIterator(boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + public TimeseriesMetadataIterator( + boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { this.needChunkMetadataForNonBlob = needChunkMetadataForNonBlob; this.needCHunkMetadataForBlob = needChunkMetadataForBlob; if (tsFileMetaData == null) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java index 07ceb4011..0e829b230 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -19,8 +19,6 @@ package org.apache.tsfile.read.reader; -import java.nio.ByteBuffer; -import java.util.ArrayList; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.enums.TSDataType; @@ -42,6 +40,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -50,7 +50,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; -/** Conveniently retrieve last points of all timeseries from a TsFile.*/ +/** Conveniently retrieve last points of all timeseries from a TsFile. */ public class TsFileLastReader implements AutoCloseable, Iterator>>> { @@ -162,7 +162,8 @@ private TimeValuePair readNonAlignedLastPoint(Chunk chunk) throws IOException { } } - private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetadata, long endTime) throws IOException { + private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetadata, long endTime) + throws IOException { ByteBuffer chunkData = chunk.getData(); PageHeader lastPageHeader = null; ByteBuffer lastPageData = null; @@ -184,8 +185,12 @@ private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetad lastPageData.flip(); } - ValuePageReader valuePageReader = new ValuePageReader(lastPageHeader, lastPageData, TSDataType.BLOB, - Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.BLOB)); + ValuePageReader valuePageReader = + new ValuePageReader( + lastPageHeader, + lastPageData, + TSDataType.BLOB, + Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.BLOB)); TsPrimitiveType lastValue = null; for (int i = 0; i < valuePageReader.getSize(); i++) { // the timestamp here is not necessary @@ -202,21 +207,25 @@ private Pair convertToLastPoint( if (seriesMeta.getTsDataType() != TSDataType.BLOB) { return new Pair<>( seriesMeta.getMeasurementId(), - new TimeValuePair( - seriesMeta.getStatistics().getEndTime(), - seriesMeta.getTsDataType() == TSDataType.VECTOR ? - TsPrimitiveType.getByType(TSDataType.INT64, seriesMeta.getStatistics().getEndTime()) : - TsPrimitiveType.getByType(seriesMeta.getTsDataType(), - seriesMeta.getStatistics().getLastValue()))); + new TimeValuePair( + seriesMeta.getStatistics().getEndTime(), + seriesMeta.getTsDataType() == TSDataType.VECTOR + ? TsPrimitiveType.getByType( + TSDataType.INT64, seriesMeta.getStatistics().getEndTime()) + : TsPrimitiveType.getByType( + seriesMeta.getTsDataType(), seriesMeta.getStatistics().getLastValue()))); } else { - ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList() - .get(seriesMeta.getChunkMetadataList().size() - 1); + ChunkMetadata chunkMetadata = + (ChunkMetadata) + seriesMeta.getChunkMetadataList().get(seriesMeta.getChunkMetadataList().size() - 1); Chunk chunk = sequenceReader.readMemChunk(chunkMetadata); if (!isAligned) { return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); } else { - return new Pair<>(seriesMeta.getMeasurementId(), readAlignedLastPoint(chunk, chunkMetadata, seriesMeta.getStatistics().getEndTime())); + return new Pair<>( + seriesMeta.getMeasurementId(), + readAlignedLastPoint(chunk, chunkMetadata, seriesMeta.getStatistics().getEndTime())); } } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index 912c6b5a7..ce3495aaa 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -19,13 +19,6 @@ package org.apache.tsfile.read.reader; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.function.BiFunction; -import java.util.function.IntFunction; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; @@ -44,9 +37,13 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -54,18 +51,24 @@ public class TsFileLastReaderTest { - private static final List dataTypes = Arrays.asList(TSDataType.INT64, TSDataType.BLOB); - private static final Map typeAddValueFunctions = new HashMap<>(); + private static final List dataTypes = + Arrays.asList(TSDataType.INT64, TSDataType.BLOB); + private static final Map typeAddValueFunctions = + new HashMap<>(); + static { - typeAddValueFunctions.put(TSDataType.INT64, ((tablet, row, column) -> tablet.addValue(row, column, (long) row))); - typeAddValueFunctions.put(TSDataType.BLOB, ((tablet, row, column) -> tablet.addValue(row, column, Long.toBinaryString(row).getBytes( - StandardCharsets.UTF_8)))); + typeAddValueFunctions.put( + TSDataType.INT64, ((tablet, row, column) -> tablet.addValue(row, column, (long) row))); + typeAddValueFunctions.put( + TSDataType.BLOB, + ((tablet, row, column) -> + tablet.addValue( + row, column, Long.toBinaryString(row).getBytes(StandardCharsets.UTF_8)))); } private final String filePath = "target/test.tsfile"; private final File file = new File(filePath); - private void createFile(int deviceNum, int measurementNum, int seriesPointNum) throws IOException, WriteProcessException { try (TsFileWriter writer = new TsFileWriter(file)) { @@ -114,9 +117,10 @@ private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) t if (value.getDataType() == TSDataType.INT64) { assertEquals(seriesPointNum - 1, value.getLong()); } else { - assertEquals(new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), value.getBinary()); + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); } - }); assertEquals(measurementNum + 1, measurements.size()); } From a8af4b5aff45a15e45ab035f95b6e765d860fa2d Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Thu, 22 May 2025 10:14:31 +0800 Subject: [PATCH 4/8] fix series with empty chunk --- .../tsfile/file/metadata/ChunkMetadata.java | 4 - .../tsfile/read/reader/TsFileLastReader.java | 26 +++- .../read/reader/TsFileLastReaderTest.java | 117 ++++++++++++++++++ 3 files changed, 138 insertions(+), 9 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java index 4be3b3ace..653a8991b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/ChunkMetadata.java @@ -430,8 +430,4 @@ public boolean hasNullValue(int measurementIndex) { public MeasurementSchema toMeasurementSchema() { return new MeasurementSchema(measurementUid, tsDataType, encoding, compressionType); } - - public TSEncoding getEncoding() { - return encoding; - } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java index 0e829b230..d84fff800 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -215,17 +215,33 @@ private Pair convertToLastPoint( : TsPrimitiveType.getByType( seriesMeta.getTsDataType(), seriesMeta.getStatistics().getLastValue()))); } else { - ChunkMetadata chunkMetadata = - (ChunkMetadata) - seriesMeta.getChunkMetadataList().get(seriesMeta.getChunkMetadataList().size() - 1); - Chunk chunk = sequenceReader.readMemChunk(chunkMetadata); + ChunkMetadata lastNonEmptyChunkMetadata = null; + for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) { + ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList().get(i); + if (chunkMetadata.getStatistics() == null || chunkMetadata.getStatistics().getCount() > 0) { + // the chunk of a single chunk series must not be empty + lastNonEmptyChunkMetadata = chunkMetadata; + break; + } + } + + if (lastNonEmptyChunkMetadata == null) { + LOGGER.error( + "All chunks are empty in series {} of file {}", + seriesMeta, + sequenceReader.getFileName()); + return new Pair<>(seriesMeta.getMeasurementId(), null); + } + + Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata); if (!isAligned) { return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); } else { return new Pair<>( seriesMeta.getMeasurementId(), - readAlignedLastPoint(chunk, chunkMetadata, seriesMeta.getStatistics().getEndTime())); + readAlignedLastPoint( + chunk, lastNonEmptyChunkMetadata, seriesMeta.getStatistics().getEndTime())); } } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index ce3495aaa..420b21b19 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -49,6 +49,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +@SuppressWarnings({"ResultOfMethodCallIgnored", "SameParameterValue"}) public class TsFileLastReaderTest { private static final List dataTypes = @@ -97,6 +98,116 @@ private void createFile(int deviceNum, int measurementNum, int seriesPointNum) } } + // the second half measurements will have an emtpy last chunk each + private void createFileWithLastEmptyChunks(int deviceNum, int measurementNum, int seriesPointNum) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { + List measurementSchemaList = new ArrayList<>(); + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType)); + } + for (int i = 0; i < deviceNum; i++) { + writer.registerAlignedTimeseries("device" + i, measurementSchemaList); + } + + // the first half seriesPointNum points are not null for all series + int batchPointNum = seriesPointNum / 2; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, batchPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k); + } + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < batchPointNum; k++) { + typeAddValueFunctions.get(tsDataType).addValue(tablet, k, j); + } + } + writer.writeTree(tablet); + } + writer.flush(); + + // the second half series have no value for the remaining points + batchPointNum = seriesPointNum - batchPointNum; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, seriesPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k + seriesPointNum / 2); + } + for (int j = 0; j < measurementNum / 2; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < seriesPointNum; k++) { + switch (tsDataType) { + case INT64: + tablet.addValue(k, j, (long) k + seriesPointNum / 2); + break; + case BLOB: + tablet.addValue( + k, + j, + Long.toBinaryString(k + seriesPointNum / 2).getBytes(StandardCharsets.UTF_8)); + break; + default: + throw new IllegalArgumentException("Unsupported TSDataType " + tsDataType); + } + } + } + writer.writeTree(tablet); + } + } + } + + private void doReadLastWithEmpty(int deviceNum, int measurementNum, int seriesPointNum) + throws Exception { + long startTime = System.currentTimeMillis(); + Set devices = new HashSet<>(); + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true)) { + while (lastReader.hasNext()) { + Set measurements = new HashSet<>(); + Pair>> next = lastReader.next(); + assertFalse(devices.contains(next.left)); + devices.add(next.left); + + // time column included + assertEquals(measurementNum + 1, next.getRight().size()); + next.right.forEach( + pair -> { + measurements.add(pair.getLeft()); + // the time column is regarded as the first half + int measurementIndex = + pair.left.isEmpty() ? -1 : Integer.parseInt(pair.getLeft().substring(1)); + + if (measurementIndex < measurementNum / 2) { + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } else { + assertEquals(seriesPointNum / 2 - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum / 2 - 1, value.getLong()); + } else { + assertEquals( + new Binary( + Long.toBinaryString(seriesPointNum / 2 - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } + }); + assertEquals(measurementNum + 1, measurements.size()); + } + } + assertEquals(deviceNum, devices.size()); + System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); + } + private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) throws Exception { long startTime = System.currentTimeMillis(); Set devices = new HashSet<>(); @@ -161,6 +272,12 @@ public void testManyMany() throws Exception { testReadLast(1000, 1000, 1000); } + @Test + public void lastLastEmptyChunks() throws Exception { + createFileWithLastEmptyChunks(100, 100, 100); + doReadLastWithEmpty(100, 100, 100); + } + @Ignore("Performance") @Test public void testManyRead() throws Exception { From 14b20856c440b3f3e6de0083eab7721b75fb3599 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Thu, 22 May 2025 10:16:10 +0800 Subject: [PATCH 5/8] fix test name --- .../org/apache/tsfile/read/reader/TsFileLastReaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index 420b21b19..de31e91cf 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -273,7 +273,7 @@ public void testManyMany() throws Exception { } @Test - public void lastLastEmptyChunks() throws Exception { + public void testLastEmptyChunks() throws Exception { createFileWithLastEmptyChunks(100, 100, 100); doReadLastWithEmpty(100, 100, 100); } From ba0e5781c5d6acda2699e4984fc80ab380e7c7fd Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Thu, 22 May 2025 10:17:31 +0800 Subject: [PATCH 6/8] reduce test scale --- .../org/apache/tsfile/read/reader/TsFileLastReaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index de31e91cf..806e77aac 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -269,7 +269,7 @@ public void testManyPoints() throws Exception { @Test public void testManyMany() throws Exception { - testReadLast(1000, 1000, 1000); + testReadLast(100, 100, 100); } @Test From 2de89a0ebc88bed2c06f1d40291d8c4025e32d7d Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Thu, 22 May 2025 11:22:36 +0800 Subject: [PATCH 7/8] fix last empty page (cherry picked from commit 40105d171816b0473325aaae4040b8484fe9db86) --- .../tsfile/read/reader/TsFileLastReader.java | 17 ++++++-- .../read/reader/TsFileLastReaderTest.java | 41 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java index d84fff800..8c01215c1 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -168,14 +168,23 @@ private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetad PageHeader lastPageHeader = null; ByteBuffer lastPageData = null; while (chunkData.hasRemaining()) { + PageHeader pageHeader; if (chunk.isSinglePageChunk()) { - lastPageHeader = PageHeader.deserializeFrom(chunkData, chunkMetadata.getStatistics()); + pageHeader = PageHeader.deserializeFrom(chunkData, chunkMetadata.getStatistics()); } else { - lastPageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB); + pageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB); + } + ByteBuffer pageData = chunkData.slice(); + pageData.limit(pageData.position() + pageHeader.getCompressedSize()); + chunkData.position(chunkData.position() + pageHeader.getCompressedSize()); + + if ((pageHeader.getStatistics() == null && pageHeader.getUncompressedSize() != 0) + || (pageHeader.getStatistics() != null && pageHeader.getStatistics().getCount() > 0)) { + lastPageHeader = pageHeader; + lastPageData = pageData; } - lastPageData = chunkData.slice(); - chunkData.position(chunkData.position() + lastPageHeader.getCompressedSize()); } + if (lastPageHeader != null) { CompressionType compressionType = chunk.getHeader().getCompressionType(); if (compressionType != CompressionType.UNCOMPRESSED) { diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index 806e77aac..2f490f67b 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -22,15 +22,18 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.utils.WriteUtils.TabletAddValueFunction; import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.Ignore; import org.junit.Test; @@ -278,6 +281,44 @@ public void testLastEmptyChunks() throws Exception { doReadLastWithEmpty(100, 100, 100); } + @Test + public void testLastEmptyPage() throws Exception { + try (TsFileIOWriter ioWriter = new TsFileIOWriter(file)) { + ioWriter.startChunkGroup(Factory.DEFAULT_FACTORY.create("root.db1.d1")); + List measurementSchemaList = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.BLOB)); + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemaList); + alignedChunkWriter.write( + 0, + new TsPrimitiveType[] { + TsPrimitiveType.getByType(TSDataType.INT64, 0L), + TsPrimitiveType.getByType( + TSDataType.BLOB, new Binary("0".getBytes(StandardCharsets.UTF_8))) + }); + alignedChunkWriter.sealCurrentPage(); + alignedChunkWriter.write( + 1, new TsPrimitiveType[] {TsPrimitiveType.getByType(TSDataType.INT64, 1L), null}); + alignedChunkWriter.writeToFileWriter(ioWriter); + ioWriter.endChunkGroup(); + + ioWriter.endFile(); + } + + try (TsFileLastReader lastReader = new TsFileLastReader(filePath)) { + Pair>> next = lastReader.next(); + assertEquals(Factory.DEFAULT_FACTORY.create("root.db1.d1"), next.getLeft()); + assertEquals(3, next.getRight().size()); + assertEquals("s1", next.getRight().get(1).left); + assertEquals("s2", next.getRight().get(2).left); + assertEquals(1, next.getRight().get(1).right.getTimestamp()); + assertEquals(1, next.getRight().get(1).right.getValue().getLong()); + assertEquals(0, next.getRight().get(2).right.getTimestamp()); + assertEquals("0", next.getRight().get(2).right.getValue().getStringValue()); + } + } + @Ignore("Performance") @Test public void testManyRead() throws Exception { From 487688d6b745d6c5f18d6ae7adcb0747c66241f5 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Thu, 22 May 2025 18:00:04 +0800 Subject: [PATCH 8/8] add ignore blob --- .../tsfile/read/reader/TsFileLastReader.java | 66 +++++++++++-------- .../read/reader/TsFileLastReaderTest.java | 49 ++++++++++---- 2 files changed, 78 insertions(+), 37 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java index 8c01215c1..f89b30df6 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -58,6 +58,8 @@ public class TsFileLastReader private final TsFileSequenceReader sequenceReader; private boolean asyncIO = true; + // when true, blob series will return a null TimeValuePair + private boolean ignoreBlob = false; private Iterator>> timeseriesMetadataIter; private Pair>> nextValue; @@ -68,9 +70,16 @@ public TsFileLastReader(String filePath) throws IOException { sequenceReader = new TsFileSequenceReader(filePath); } - public TsFileLastReader(String filePath, boolean asyncIO) throws IOException { + /** + * @param filePath path of the TsFile + * @param asyncIO use asynchronous IO or not + * @param ignoreBlob whether to ignore series with blob type (the returned TimeValuePair will be + * null) + */ + public TsFileLastReader(String filePath, boolean asyncIO, boolean ignoreBlob) throws IOException { this(filePath); this.asyncIO = asyncIO; + this.ignoreBlob = ignoreBlob; } @Override @@ -224,39 +233,44 @@ private Pair convertToLastPoint( : TsPrimitiveType.getByType( seriesMeta.getTsDataType(), seriesMeta.getStatistics().getLastValue()))); } else { - ChunkMetadata lastNonEmptyChunkMetadata = null; - for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) { - ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList().get(i); - if (chunkMetadata.getStatistics() == null || chunkMetadata.getStatistics().getCount() > 0) { - // the chunk of a single chunk series must not be empty - lastNonEmptyChunkMetadata = chunkMetadata; - break; - } - } + return readLastPoint(seriesMeta, isAligned); + } + } + + private Pair readLastPoint( + TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException { + if (seriesMeta.getChunkMetadataList() == null) { + return new Pair<>(seriesMeta.getMeasurementId(), null); + } - if (lastNonEmptyChunkMetadata == null) { - LOGGER.error( - "All chunks are empty in series {} of file {}", - seriesMeta, - sequenceReader.getFileName()); - return new Pair<>(seriesMeta.getMeasurementId(), null); + ChunkMetadata lastNonEmptyChunkMetadata = null; + for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) { + ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList().get(i); + if (chunkMetadata.getStatistics() == null || chunkMetadata.getStatistics().getCount() > 0) { + // the chunk of a single chunk series must not be empty + lastNonEmptyChunkMetadata = chunkMetadata; + break; } + } - Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata); + if (lastNonEmptyChunkMetadata == null) { + return new Pair<>(seriesMeta.getMeasurementId(), null); + } - if (!isAligned) { - return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); - } else { - return new Pair<>( - seriesMeta.getMeasurementId(), - readAlignedLastPoint( - chunk, lastNonEmptyChunkMetadata, seriesMeta.getStatistics().getEndTime())); - } + Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata); + + if (!isAligned) { + return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); + } else { + return new Pair<>( + seriesMeta.getMeasurementId(), + readAlignedLastPoint( + chunk, lastNonEmptyChunkMetadata, seriesMeta.getStatistics().getEndTime())); } } private void init() throws IOException { - timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false, true); + timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false, !ignoreBlob); if (asyncIO) { int queueCapacity = 1024; lastValueQueue = new ArrayBlockingQueue<>(queueCapacity); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java index 2f490f67b..40b1cc8f0 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -51,6 +51,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; @SuppressWarnings({"ResultOfMethodCallIgnored", "SameParameterValue"}) public class TsFileLastReaderTest { @@ -165,7 +167,7 @@ private void doReadLastWithEmpty(int deviceNum, int measurementNum, int seriesPo throws Exception { long startTime = System.currentTimeMillis(); Set devices = new HashSet<>(); - try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true)) { + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true, false)) { while (lastReader.hasNext()) { Set measurements = new HashSet<>(); Pair>> next = lastReader.next(); @@ -211,10 +213,11 @@ private void doReadLastWithEmpty(int deviceNum, int measurementNum, int seriesPo System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); } - private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) throws Exception { + private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum, boolean ignoreBlob) + throws Exception { long startTime = System.currentTimeMillis(); Set devices = new HashSet<>(); - try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true)) { + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true, ignoreBlob)) { while (lastReader.hasNext()) { Set measurements = new HashSet<>(); Pair>> next = lastReader.next(); @@ -226,14 +229,31 @@ private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) t next.right.forEach( pair -> { measurements.add(pair.getLeft()); + // the time column is regarded as the first half + int measurementIndex = + pair.left.isEmpty() ? -1 : Integer.parseInt(pair.getLeft().substring(1)); + TSDataType tsDataType = + measurementIndex == -1 + ? TSDataType.INT64 + : dataTypes.get(measurementIndex % dataTypes.size()); + + if (tsDataType == TSDataType.BLOB && ignoreBlob) { + assertNull(pair.getRight()); + return; + } + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); - TsPrimitiveType value = pair.getRight().getValue(); - if (value.getDataType() == TSDataType.INT64) { - assertEquals(seriesPointNum - 1, value.getLong()); + if (pair.getRight() == null) { + assertTrue(ignoreBlob); } else { - assertEquals( - new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), - value.getBinary()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); + } } }); assertEquals(measurementNum + 1, measurements.size()); @@ -246,7 +266,7 @@ private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum) t private void testReadLast(int deviceNum, int measurementNum, int seriesPointNum) throws Exception { createFile(deviceNum, measurementNum, seriesPointNum); - doReadLast(deviceNum, measurementNum, seriesPointNum); + doReadLast(deviceNum, measurementNum, seriesPointNum, false); file.delete(); } @@ -319,6 +339,13 @@ TSDataType.BLOB, new Binary("0".getBytes(StandardCharsets.UTF_8))) } } + @Test + public void testIgnoreBlob() throws Exception { + createFile(10, 10, 10); + doReadLast(10, 10, 10, true); + file.delete(); + } + @Ignore("Performance") @Test public void testManyRead() throws Exception { @@ -327,7 +354,7 @@ public void testManyRead() throws Exception { int seriesPointNum = 1; createFile(deviceNum, measurementNum, seriesPointNum); for (int i = 0; i < 10; i++) { - doReadLast(deviceNum, measurementNum, seriesPointNum); + doReadLast(deviceNum, measurementNum, seriesPointNum, false); } file.delete(); }