diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java index 17a326e01..c7f97a908 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java @@ -114,6 +114,11 @@ public TimeseriesMetadata(TimeseriesMetadata timeseriesMetadata) { } public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) { + return deserializeFrom(buffer, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + ByteBuffer buffer, boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) { TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(buffer)); timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readVarIntString(buffer)); @@ -121,7 +126,8 @@ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean need int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(Statistics.deserialize(buffer, timeseriesMetaData.dataType)); - if (needChunkMetadata) { + if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); timeseriesMetaData.chunkMetadataList = new ArrayList<>(); @@ -138,6 +144,14 @@ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean need public static TimeseriesMetadata deserializeFrom( TsFileInput tsFileInput, boolean needChunkMetadata) throws IOException { + return deserializeFrom(tsFileInput, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + TsFileInput tsFileInput, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) + throws IOException { InputStream inputStream = tsFileInput.wrapAsInputStream(); TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(inputStream)); @@ -148,7 +162,8 @@ public static TimeseriesMetadata deserializeFrom( timeseriesMetaData.setStatistics( Statistics.deserialize(inputStream, timeseriesMetaData.dataType)); long startOffset = tsFileInput.position(); - if (needChunkMetadata) { + if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { timeseriesMetaData.chunkMetadataList = new ArrayList<>(); while (tsFileInput.position() < startOffset + chunkMetaDataListDataSize) { timeseriesMetaData.chunkMetadataList.add( @@ -168,6 +183,14 @@ public static TimeseriesMetadata deserializeFrom( */ public static TimeseriesMetadata deserializeFrom( ByteBuffer buffer, Set excludedMeasurements, boolean needChunkMetadata) { + return deserializeFrom(buffer, excludedMeasurements, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + ByteBuffer buffer, + Set excludedMeasurements, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) { byte timeseriesType = ReadWriteIOUtils.readByte(buffer); String measurementID = ReadWriteIOUtils.readVarIntString(buffer); TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer); @@ -181,7 +204,9 @@ public static TimeseriesMetadata deserializeFrom( timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(statistics); - if (!excludedMeasurements.contains(measurementID) && needChunkMetadata) { + if (!excludedMeasurements.contains(measurementID) + && ((tsDataType != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (tsDataType == TSDataType.BLOB && needChunkMetadataForBlob))) { // measurement is not in the excluded set and need chunk metadata ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java index 48fcb329c..96dcf79f1 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java @@ -76,22 +76,22 @@ public void update(long[] time, int batchSize, int arrayOffset) { @Override public Long getMinValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "min value")); + return getStartTime(); } @Override public Long getMaxValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "max value")); + return getEndTime(); } @Override public Long getFirstValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "first value")); + return getStartTime(); } @Override public Long getLastValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "last value")); + return getEndTime(); } @Override diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index ff7c50069..13c7adb2f 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -75,9 +75,11 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -85,6 +87,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Queue; @@ -1252,11 +1255,32 @@ private void generateMetadataIndex( Map> timeseriesMetadataMap, boolean needChunkMetadata) throws IOException { + generateMetadataIndex( + metadataIndex, + buffer, + deviceId, + type, + timeseriesMetadataMap, + needChunkMetadata, + needChunkMetadata); + } + + private void generateMetadataIndex( + IMetadataIndexEntry metadataIndex, + ByteBuffer buffer, + IDeviceID deviceId, + MetadataIndexNodeType type, + Map> timeseriesMetadataMap, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) + throws IOException { try { if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { List timeseriesMetadataList = new ArrayList<>(); while (buffer.hasRemaining()) { - timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata)); + timeseriesMetadataList.add( + TimeseriesMetadata.deserializeFrom( + buffer, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -1284,7 +1308,8 @@ private void generateMetadataIndex( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } else { // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList @@ -1295,7 +1320,8 @@ private void generateMetadataIndex( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } } } @@ -1316,13 +1342,35 @@ private void generateMetadataIndexUsingTsFileInput( Map> timeseriesMetadataMap, boolean needChunkMetadata) throws IOException { + generateMetadataIndexUsingTsFileInput( + metadataIndex, + start, + end, + deviceId, + type, + timeseriesMetadataMap, + needChunkMetadata, + needChunkMetadata); + } + + private void generateMetadataIndexUsingTsFileInput( + IMetadataIndexEntry metadataIndex, + long start, + long end, + IDeviceID deviceId, + MetadataIndexNodeType type, + Map> timeseriesMetadataMap, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) + throws IOException { try { tsFileInput.position(start); if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { List timeseriesMetadataList = new ArrayList<>(); while (tsFileInput.position() < end) { timeseriesMetadataList.add( - TimeseriesMetadata.deserializeFrom(tsFileInput, needChunkMetadata)); + TimeseriesMetadata.deserializeFrom( + tsFileInput, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -1349,7 +1397,8 @@ private void generateMetadataIndexUsingTsFileInput( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } } } catch (StopReadTsFileByInterruptException e) { @@ -1398,6 +1447,11 @@ public Map> getAllTimeseriesMetadata( return timeseriesMetadataMap; } + public Iterator>> iterAllTimeseriesMetadata( + boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + return new TimeseriesMetadataIterator(needChunkMetadataForNonBlob, needChunkMetadataForBlob); + } + /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */ private List getDeviceTimeseriesMetadataWithoutChunkMetadata(IDeviceID device) throws IOException { @@ -2771,4 +2825,132 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(file); } + + private class TimeseriesMetadataIterator + implements Iterator>> { + + private final Deque nodeStack = new ArrayDeque<>(); + private final boolean needChunkMetadataForNonBlob; + private final boolean needCHunkMetadataForBlob; + private Pair> nextValue; + private MetadataIndexNode currentLeafDeviceNode; + private int currentLeafDeviceNodeIndex; + + public TimeseriesMetadataIterator( + boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + this.needChunkMetadataForNonBlob = needChunkMetadataForNonBlob; + this.needCHunkMetadataForBlob = needChunkMetadataForBlob; + if (tsFileMetaData == null) { + readFileMetadata(); + } + + nodeStack.add(tsFileMetaData.getMetadataIndex()); + } + + @Override + public boolean hasNext() { + if (nextValue != null) { + return true; + } + + try { + loadNextValue(); + } catch (IOException e) { + logger.warn("Cannot read timeseries metadata from {},", file, e); + return false; + } + return nextValue != null; + } + + @Override + public Pair> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Pair> ret = nextValue; + nextValue = null; + return ret; + } + + private void loadNextLeafDeviceNode() throws IOException { + while (!nodeStack.isEmpty()) { + MetadataIndexNode node = nodeStack.pop(); + MetadataIndexNodeType nodeType = node.getNodeType(); + if (nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE)) { + currentLeafDeviceNode = node; + currentLeafDeviceNodeIndex = 0; + return; + } + + List childrenIndex = node.getChildren(); + for (int i = 0; i < childrenIndex.size(); i++) { + long endOffset; + IMetadataIndexEntry childIndex = childrenIndex.get(i); + endOffset = node.getEndOffset(); + if (i != childrenIndex.size() - 1) { + endOffset = childrenIndex.get(i + 1).getOffset(); + } + + MetadataIndexNode child; + if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) { + ByteBuffer buffer = readData(childIndex.getOffset(), endOffset); + child = MetadataIndexNode.deserializeFrom(buffer, true); + } else { + tsFileInput.position(childIndex.getOffset()); + child = MetadataIndexNode.deserializeFrom(tsFileInput.wrapAsInputStream(), true); + } + nodeStack.push(child); + } + } + } + + private void loadNextValue() throws IOException { + if (currentLeafDeviceNode == null + || currentLeafDeviceNodeIndex >= currentLeafDeviceNode.getChildren().size()) { + currentLeafDeviceNode = null; + loadNextLeafDeviceNode(); + } + if (currentLeafDeviceNode == null) { + return; + } + + IMetadataIndexEntry childIndex = + currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex); + int childNum = currentLeafDeviceNode.getChildren().size(); + IDeviceID deviceId = ((DeviceMetadataIndexEntry) childIndex).getDeviceID(); + + Map> nextValueMap = new HashMap<>(1); + long endOffset = currentLeafDeviceNode.getEndOffset(); + if (currentLeafDeviceNodeIndex != childNum - 1) { + endOffset = + currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex + 1).getOffset(); + } + if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) { + ByteBuffer nextBuffer = readData(childIndex.getOffset(), endOffset); + generateMetadataIndex( + childIndex, + nextBuffer, + deviceId, + currentLeafDeviceNode.getNodeType(), + nextValueMap, + needChunkMetadataForNonBlob, + needCHunkMetadataForBlob); + } else { + // when the buffer length is over than Integer.MAX_VALUE, + // using tsFileInput to get timeseriesMetadataList + generateMetadataIndexUsingTsFileInput( + childIndex, + childIndex.getOffset(), + endOffset, + deviceId, + currentLeafDeviceNode.getNodeType(), + nextValueMap, + needChunkMetadataForNonBlob, + needCHunkMetadataForBlob); + } + currentLeafDeviceNodeIndex++; + Entry> entry = nextValueMap.entrySet().iterator().next(); + nextValue = new Pair<>(entry.getKey(), entry.getValue()); + } + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java index 5caab7fe2..fdeb4ec36 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java @@ -171,4 +171,8 @@ public Statistics getChunkStatistic() { public long getRetainedSizeInBytes() { return INSTANCE_SIZE + sizeOfByteArray(chunkData.capacity()); } + + public boolean isSinglePageChunk() { + return (getHeader().getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java new file mode 100644 index 000000000..f89b30df6 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader; + +import org.apache.tsfile.compress.IUnCompressor; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.read.reader.page.ValuePageReader; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; + +/** Conveniently retrieve last points of all timeseries from a TsFile. */ +public class TsFileLastReader + implements AutoCloseable, Iterator>>> { + + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileLastReader.class); + + private final TsFileSequenceReader sequenceReader; + private boolean asyncIO = true; + // when true, blob series will return a null TimeValuePair + private boolean ignoreBlob = false; + private Iterator>> timeseriesMetadataIter; + private Pair>> nextValue; + + private BlockingQueue>>> lastValueQueue; + private ForkJoinTask asyncTask; + + public TsFileLastReader(String filePath) throws IOException { + sequenceReader = new TsFileSequenceReader(filePath); + } + + /** + * @param filePath path of the TsFile + * @param asyncIO use asynchronous IO or not + * @param ignoreBlob whether to ignore series with blob type (the returned TimeValuePair will be + * null) + */ + public TsFileLastReader(String filePath, boolean asyncIO, boolean ignoreBlob) throws IOException { + this(filePath); + this.asyncIO = asyncIO; + this.ignoreBlob = ignoreBlob; + } + + @Override + public boolean hasNext() { + if (timeseriesMetadataIter == null) { + try { + init(); + } catch (IOException e) { + LOGGER.error("Cannot read timeseries metadata from {}", sequenceReader.getFileName(), e); + return false; + } + } + + // already meet the terminator + if (nextValue != null) { + return nextValue.getLeft() != null; + } + + if (asyncIO) { + return hasNextAsync(); + } else { + return hasNextSync(); + } + } + + private boolean hasNextSync() { + if (!timeseriesMetadataIter.hasNext()) { + nextValue = new Pair<>(null, null); + } else { + Pair> next = timeseriesMetadataIter.next(); + try { + nextValue = new Pair<>(next.left, convertToLastPoints(next.right)); + } catch (IOException e) { + LOGGER.error("Cannot read timeseries metadata from {}", sequenceReader.getFileName(), e); + return false; + } + } + return nextValue.left != null; + } + + private boolean hasNextAsync() { + try { + nextValue = lastValueQueue.take(); + if (nextValue.getLeft() == null) { + // the terminator + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return nextValue.left != null; + } + + /** + * @return (deviceId, measurementId, lastPoint) + */ + @Override + public Pair>> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Pair>> ret = nextValue; + nextValue = null; + return ret; + } + + private List> convertToLastPoints( + List timeseriesMetadataList) throws IOException { + boolean isAligned = timeseriesMetadataList.get(0).getTsDataType() == TSDataType.VECTOR; + List> list = new ArrayList<>(); + for (TimeseriesMetadata meta : timeseriesMetadataList) { + Pair stringTimeValuePairPair = convertToLastPoint(meta, isAligned); + list.add(stringTimeValuePairPair); + } + return list; + } + + private TimeValuePair readNonAlignedLastPoint(Chunk chunk) throws IOException { + ChunkReader chunkReader = new ChunkReader(chunk); + BatchData batchData = null; + while (chunkReader.hasNextSatisfiedPage()) { + batchData = chunkReader.nextPageData(); + } + if (batchData != null) { + return batchData.getLastPairBeforeOrEqualTimestamp(Long.MAX_VALUE); + } else { + return null; + } + } + + private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetadata, long endTime) + throws IOException { + ByteBuffer chunkData = chunk.getData(); + PageHeader lastPageHeader = null; + ByteBuffer lastPageData = null; + while (chunkData.hasRemaining()) { + PageHeader pageHeader; + if (chunk.isSinglePageChunk()) { + pageHeader = PageHeader.deserializeFrom(chunkData, chunkMetadata.getStatistics()); + } else { + pageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB); + } + ByteBuffer pageData = chunkData.slice(); + pageData.limit(pageData.position() + pageHeader.getCompressedSize()); + chunkData.position(chunkData.position() + pageHeader.getCompressedSize()); + + if ((pageHeader.getStatistics() == null && pageHeader.getUncompressedSize() != 0) + || (pageHeader.getStatistics() != null && pageHeader.getStatistics().getCount() > 0)) { + lastPageHeader = pageHeader; + lastPageData = pageData; + } + } + + if (lastPageHeader != null) { + CompressionType compressionType = chunk.getHeader().getCompressionType(); + if (compressionType != CompressionType.UNCOMPRESSED) { + ByteBuffer uncompressedPage = ByteBuffer.allocate(lastPageHeader.getUncompressedSize()); + IUnCompressor.getUnCompressor(compressionType).uncompress(lastPageData, uncompressedPage); + lastPageData = uncompressedPage; + lastPageData.flip(); + } + + ValuePageReader valuePageReader = + new ValuePageReader( + lastPageHeader, + lastPageData, + TSDataType.BLOB, + Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.BLOB)); + TsPrimitiveType lastValue = null; + for (int i = 0; i < valuePageReader.getSize(); i++) { + // the timestamp here is not necessary + lastValue = valuePageReader.nextValue(0, i); + } + return new TimeValuePair(endTime, lastValue); + } else { + return null; + } + } + + private Pair convertToLastPoint( + TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException { + if (seriesMeta.getTsDataType() != TSDataType.BLOB) { + return new Pair<>( + seriesMeta.getMeasurementId(), + new TimeValuePair( + seriesMeta.getStatistics().getEndTime(), + seriesMeta.getTsDataType() == TSDataType.VECTOR + ? TsPrimitiveType.getByType( + TSDataType.INT64, seriesMeta.getStatistics().getEndTime()) + : TsPrimitiveType.getByType( + seriesMeta.getTsDataType(), seriesMeta.getStatistics().getLastValue()))); + } else { + return readLastPoint(seriesMeta, isAligned); + } + } + + private Pair readLastPoint( + TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException { + if (seriesMeta.getChunkMetadataList() == null) { + return new Pair<>(seriesMeta.getMeasurementId(), null); + } + + ChunkMetadata lastNonEmptyChunkMetadata = null; + for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) { + ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList().get(i); + if (chunkMetadata.getStatistics() == null || chunkMetadata.getStatistics().getCount() > 0) { + // the chunk of a single chunk series must not be empty + lastNonEmptyChunkMetadata = chunkMetadata; + break; + } + } + + if (lastNonEmptyChunkMetadata == null) { + return new Pair<>(seriesMeta.getMeasurementId(), null); + } + + Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata); + + if (!isAligned) { + return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); + } else { + return new Pair<>( + seriesMeta.getMeasurementId(), + readAlignedLastPoint( + chunk, lastNonEmptyChunkMetadata, seriesMeta.getStatistics().getEndTime())); + } + } + + private void init() throws IOException { + timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false, !ignoreBlob); + if (asyncIO) { + int queueCapacity = 1024; + lastValueQueue = new ArrayBlockingQueue<>(queueCapacity); + asyncTask = + ForkJoinPool.commonPool() + .submit( + () -> { + try { + while (timeseriesMetadataIter.hasNext()) { + Pair> deviceSeriesMetadata = + timeseriesMetadataIter.next(); + lastValueQueue.put( + new Pair<>( + deviceSeriesMetadata.left, + convertToLastPoints(deviceSeriesMetadata.right))); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.error("Error while reading timeseries metadata", e); + } finally { + try { + lastValueQueue.put(new Pair<>(null, null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return null; + }); + } + } + + @Override + public void close() throws Exception { + if (asyncIO) { + asyncTask.cancel(true); + } + sequenceReader.close(); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java index 58e9ff51d..54e3e85f5 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java @@ -611,4 +611,8 @@ public byte[] getBitmap() throws IOException { uncompressDataIfNecessary(); return Arrays.copyOf(bitmap, bitmap.length); } + + public int getSize() { + return size; + } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java new file mode 100644 index 000000000..b9211a56c --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.PlainDeviceID; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; + +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@SuppressWarnings({"ResultOfMethodCallIgnored", "SameParameterValue"}) +public class TsFileLastReaderTest { + + private static final List dataTypes = + Arrays.asList(TSDataType.INT64, TSDataType.BLOB); + + private final String filePath = "target/test.tsfile"; + private final File file = new File(filePath); + + private void createFile(int deviceNum, int measurementNum, int seriesPointNum) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { + List measurementSchemaList = new ArrayList<>(); + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType)); + } + for (int i = 0; i < deviceNum; i++) { + writer.registerAlignedTimeseries(new Path("device" + i), measurementSchemaList); + } + + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, seriesPointNum); + for (int k = 0; k < seriesPointNum; k++) { + tablet.addTimestamp(k, k); + } + tablet.rowSize = seriesPointNum; + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < seriesPointNum; k++) { + switch (tsDataType) { + case INT64: + tablet.addValue("s" + j, k, (long) k); + break; + case BLOB: + tablet.addValue( + "s" + j, k, new Binary(Long.toBinaryString(k), StandardCharsets.UTF_8)); + break; + } + } + } + writer.writeAligned(tablet); + } + } + } + + // the second half measurements will have an emtpy last chunk each + private void createFileWithLastEmptyChunks(int deviceNum, int measurementNum, int seriesPointNum) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { + List measurementSchemaList = new ArrayList<>(); + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType)); + } + for (int i = 0; i < deviceNum; i++) { + writer.registerAlignedTimeseries(new Path("device" + i), measurementSchemaList); + } + + // the first half seriesPointNum points are not null for all series + int batchPointNum = seriesPointNum / 2; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, batchPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k); + } + tablet.rowSize = batchPointNum; + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < batchPointNum; k++) { + switch (tsDataType) { + case INT64: + tablet.addValue("s" + j, k, (long) k); + break; + case BLOB: + tablet.addValue( + "s" + j, k, new Binary(Long.toBinaryString(k), StandardCharsets.UTF_8)); + break; + } + } + } + writer.writeAligned(tablet); + } + writer.flushAllChunkGroups(); + + // the second half series have no value for the remaining points + batchPointNum = seriesPointNum - batchPointNum; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, seriesPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k + seriesPointNum / 2); + } + tablet.rowSize = batchPointNum; + for (int j = 0; j < measurementNum / 2; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < seriesPointNum; k++) { + switch (tsDataType) { + case INT64: + tablet.addValue("s" + j, k, (long) k + seriesPointNum / 2); + break; + case BLOB: + tablet.addValue( + "s" + j, + k, + new Binary( + Long.toBinaryString(k + seriesPointNum / 2) + .getBytes(StandardCharsets.UTF_8))); + break; + } + } + } + for (int j = measurementNum / 2; j < measurementNum; j++) { + for (int k = 0; k < seriesPointNum; k++) { + tablet.addValue("s" + j, k, null); + } + } + writer.writeAligned(tablet); + } + } + } + + private void doReadLastWithEmpty(int deviceNum, int measurementNum, int seriesPointNum) + throws Exception { + long startTime = System.currentTimeMillis(); + Set devices = new HashSet<>(); + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true, false)) { + while (lastReader.hasNext()) { + Set measurements = new HashSet<>(); + Pair>> next = lastReader.next(); + assertFalse(devices.contains(next.left)); + devices.add(next.left); + + // time column included + assertEquals(measurementNum + 1, next.getRight().size()); + next.right.forEach( + pair -> { + measurements.add(pair.getLeft()); + // the time column is regarded as the first half + int measurementIndex = + pair.left.isEmpty() ? -1 : Integer.parseInt(pair.getLeft().substring(1)); + + if (measurementIndex < measurementNum / 2) { + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } else { + assertEquals(seriesPointNum / 2 - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum / 2 - 1, value.getLong()); + } else { + assertEquals( + new Binary( + Long.toBinaryString(seriesPointNum / 2 - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } + }); + assertEquals(measurementNum + 1, measurements.size()); + } + } + assertEquals(deviceNum, devices.size()); + System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); + } + + private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum, boolean ignoreBlob) + throws Exception { + long startTime = System.currentTimeMillis(); + Set devices = new HashSet<>(); + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true, ignoreBlob)) { + while (lastReader.hasNext()) { + Set measurements = new HashSet<>(); + Pair>> next = lastReader.next(); + assertFalse(devices.contains(next.left)); + devices.add(next.left); + + // time column included + assertEquals(measurementNum + 1, next.getRight().size()); + next.right.forEach( + pair -> { + measurements.add(pair.getLeft()); + // the time column is regarded as the first half + int measurementIndex = + pair.left.isEmpty() ? -1 : Integer.parseInt(pair.getLeft().substring(1)); + TSDataType tsDataType = + measurementIndex == -1 + ? TSDataType.INT64 + : dataTypes.get(measurementIndex % dataTypes.size()); + + if (tsDataType == TSDataType.BLOB && ignoreBlob) { + assertNull(pair.getRight()); + return; + } + + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); + if (pair.getRight() == null) { + assertTrue(ignoreBlob); + } else { + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } + }); + assertEquals(measurementNum + 1, measurements.size()); + } + } + assertEquals(deviceNum, devices.size()); + System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); + } + + private void testReadLast(int deviceNum, int measurementNum, int seriesPointNum) + throws Exception { + createFile(deviceNum, measurementNum, seriesPointNum); + doReadLast(deviceNum, measurementNum, seriesPointNum, false); + file.delete(); + } + + @Test + public void testSmall() throws Exception { + testReadLast(10, 10, 10); + } + + @Test + public void testManyDevices() throws Exception { + testReadLast(10000, 10, 10); + } + + @Test + public void testManyMeasurement() throws Exception { + testReadLast(10, 10000, 10); + } + + @Test + public void testManyPoints() throws Exception { + testReadLast(100, 10, 10000); + } + + @Test + public void testManyMany() throws Exception { + testReadLast(100, 100, 100); + } + + @Test + public void testLastEmptyChunks() throws Exception { + createFileWithLastEmptyChunks(100, 100, 100); + doReadLastWithEmpty(100, 100, 100); + } + + @Test + public void testLastEmptyPage() throws Exception { + try (TsFileIOWriter ioWriter = new TsFileIOWriter(file)) { + ioWriter.startChunkGroup(new PlainDeviceID("root.db1.d1")); + List measurementSchemaList = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.BLOB)); + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemaList); + alignedChunkWriter.write( + 0, + new TsPrimitiveType[] { + TsPrimitiveType.getByType(TSDataType.INT64, 0L), + TsPrimitiveType.getByType( + TSDataType.BLOB, new Binary("0".getBytes(StandardCharsets.UTF_8))) + }); + alignedChunkWriter.sealCurrentPage(); + alignedChunkWriter.write( + 1, new TsPrimitiveType[] {TsPrimitiveType.getByType(TSDataType.INT64, 1L), null}); + alignedChunkWriter.writeToFileWriter(ioWriter); + ioWriter.endChunkGroup(); + + ioWriter.endFile(); + } + + try (TsFileLastReader lastReader = new TsFileLastReader(filePath)) { + Pair>> next = lastReader.next(); + assertEquals(new PlainDeviceID("root.db1.d1"), next.getLeft()); + assertEquals(3, next.getRight().size()); + assertEquals("s1", next.getRight().get(1).left); + assertEquals("s2", next.getRight().get(2).left); + assertEquals(1, next.getRight().get(1).right.getTimestamp()); + assertEquals(1, next.getRight().get(1).right.getValue().getLong()); + assertEquals(0, next.getRight().get(2).right.getTimestamp()); + assertEquals("0", next.getRight().get(2).right.getValue().getStringValue()); + } + } + + @Test + public void testIgnoreBlob() throws Exception { + createFile(10, 10, 10); + doReadLast(10, 10, 10, true); + file.delete(); + } + + @Ignore("Performance") + @Test + public void testManyRead() throws Exception { + int deviceNum = 10000; + int measurementNum = 1000; + int seriesPointNum = 1; + createFile(deviceNum, measurementNum, seriesPointNum); + for (int i = 0; i < 10; i++) { + doReadLast(deviceNum, measurementNum, seriesPointNum, false); + } + file.delete(); + } +}