diff --git a/cpp/pom.xml b/cpp/pom.xml index d39c67c0d..cc62aa8c5 100644 --- a/cpp/pom.xml +++ b/cpp/pom.xml @@ -22,7 +22,7 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile-cpp pom diff --git a/java/common/pom.xml b/java/common/pom.xml index 0eb4066d3..bcd54f5e1 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT common TsFile: Java: Common diff --git a/java/examples/pom.xml b/java/examples/pom.xml index 9223f5960..5a484cfc1 100644 --- a/java/examples/pom.xml +++ b/java/examples/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT examples TsFile: Java: Examples @@ -36,7 +36,7 @@ org.apache.tsfile tsfile - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT diff --git a/java/pom.xml b/java/pom.xml index 1d99dbba4..df7cec5a8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -24,10 +24,10 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT pom TsFile: Java diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 0bc2c89a8..8cc58d1ad 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tools TsFile: Java: Tools @@ -32,7 +32,7 @@ org.apache.tsfile common - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT commons-cli @@ -50,7 +50,7 @@ org.apache.tsfile tsfile - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT ch.qos.logback diff --git a/java/tsfile/pom.xml b/java/tsfile/pom.xml index 5b22ea2ea..8625f0cfd 100644 --- a/java/tsfile/pom.xml +++ b/java/tsfile/pom.xml @@ -24,7 +24,7 @@ org.apache.tsfile tsfile-java - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile TsFile: Java: TsFile @@ -38,7 +38,7 @@ org.apache.tsfile common - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT com.github.luben diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java index 657f183c2..25b1a1276 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/TimeseriesMetadata.java @@ -121,6 +121,11 @@ public TimeseriesMetadata(TimeseriesMetadata timeseriesMetadata) { } public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean needChunkMetadata) { + return deserializeFrom(buffer, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + ByteBuffer buffer, boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) { TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(buffer)); timeseriesMetaData.setMeasurementId(ReadWriteIOUtils.readVarIntString(buffer)); @@ -128,7 +133,8 @@ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean need int chunkMetaDataListDataSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(Statistics.deserialize(buffer, timeseriesMetaData.dataType)); - if (needChunkMetadata) { + if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); timeseriesMetaData.chunkMetadataList = new ArrayList<>(); @@ -145,6 +151,14 @@ public static TimeseriesMetadata deserializeFrom(ByteBuffer buffer, boolean need public static TimeseriesMetadata deserializeFrom( TsFileInput tsFileInput, boolean needChunkMetadata) throws IOException { + return deserializeFrom(tsFileInput, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + TsFileInput tsFileInput, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) + throws IOException { InputStream inputStream = tsFileInput.wrapAsInputStream(); TimeseriesMetadata timeseriesMetaData = new TimeseriesMetadata(); timeseriesMetaData.setTimeSeriesMetadataType(ReadWriteIOUtils.readByte(inputStream)); @@ -155,7 +169,8 @@ public static TimeseriesMetadata deserializeFrom( timeseriesMetaData.setStatistics( Statistics.deserialize(inputStream, timeseriesMetaData.dataType)); long startOffset = tsFileInput.position(); - if (needChunkMetadata) { + if ((timeseriesMetaData.getTsDataType() != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (timeseriesMetaData.getTsDataType() == TSDataType.BLOB && needChunkMetadataForBlob)) { timeseriesMetaData.chunkMetadataList = new ArrayList<>(); while (tsFileInput.position() < startOffset + chunkMetaDataListDataSize) { timeseriesMetaData.chunkMetadataList.add( @@ -175,6 +190,14 @@ public static TimeseriesMetadata deserializeFrom( */ public static TimeseriesMetadata deserializeFrom( ByteBuffer buffer, Set excludedMeasurements, boolean needChunkMetadata) { + return deserializeFrom(buffer, excludedMeasurements, needChunkMetadata, needChunkMetadata); + } + + public static TimeseriesMetadata deserializeFrom( + ByteBuffer buffer, + Set excludedMeasurements, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) { byte timeseriesType = ReadWriteIOUtils.readByte(buffer); String measurementID = ReadWriteIOUtils.readVarIntString(buffer); TSDataType tsDataType = ReadWriteIOUtils.readDataType(buffer); @@ -188,7 +211,9 @@ public static TimeseriesMetadata deserializeFrom( timeseriesMetaData.setDataSizeOfChunkMetaDataList(chunkMetaDataListDataSize); timeseriesMetaData.setStatistics(statistics); - if (!excludedMeasurements.contains(measurementID) && needChunkMetadata) { + if (!excludedMeasurements.contains(measurementID) + && ((tsDataType != TSDataType.BLOB && needChunkMetadataForNonBlob) + || (tsDataType == TSDataType.BLOB && needChunkMetadataForBlob))) { // measurement is not in the excluded set and need chunk metadata ByteBuffer byteBuffer = buffer.slice(); byteBuffer.limit(chunkMetaDataListDataSize); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java index db12ac4bc..cb8af6eab 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/statistics/TimeStatistics.java @@ -76,22 +76,22 @@ public void update(long[] time, int batchSize, int arrayOffset) { @Override public Long getMinValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "min value")); + return getStartTime(); } @Override public Long getMaxValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "max value")); + return getEndTime(); } @Override public Long getFirstValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "first value")); + return getStartTime(); } @Override public Long getLastValue() { - throw new StatisticsClassException(String.format(STATS_UNSUPPORTED_MSG, TIME, "last value")); + return getEndTime(); } @Override diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index dc07bfc2e..033e3d044 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -87,9 +87,11 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -1342,11 +1344,32 @@ private void generateMetadataIndex( Map> timeseriesMetadataMap, boolean needChunkMetadata) throws IOException { + generateMetadataIndex( + metadataIndex, + buffer, + deviceId, + type, + timeseriesMetadataMap, + needChunkMetadata, + needChunkMetadata); + } + + private void generateMetadataIndex( + IMetadataIndexEntry metadataIndex, + ByteBuffer buffer, + IDeviceID deviceId, + MetadataIndexNodeType type, + Map> timeseriesMetadataMap, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) + throws IOException { try { if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { List timeseriesMetadataList = new ArrayList<>(); while (buffer.hasRemaining()) { - timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata)); + timeseriesMetadataList.add( + TimeseriesMetadata.deserializeFrom( + buffer, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -1375,7 +1398,8 @@ private void generateMetadataIndex( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } else { // when the buffer length is over than Integer.MAX_VALUE, // using tsFileInput to get timeseriesMetadataList @@ -1386,7 +1410,8 @@ private void generateMetadataIndex( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } } } @@ -1407,13 +1432,35 @@ private void generateMetadataIndexUsingTsFileInput( Map> timeseriesMetadataMap, boolean needChunkMetadata) throws IOException { + generateMetadataIndexUsingTsFileInput( + metadataIndex, + start, + end, + deviceId, + type, + timeseriesMetadataMap, + needChunkMetadata, + needChunkMetadata); + } + + private void generateMetadataIndexUsingTsFileInput( + IMetadataIndexEntry metadataIndex, + long start, + long end, + IDeviceID deviceId, + MetadataIndexNodeType type, + Map> timeseriesMetadataMap, + boolean needChunkMetadataForNonBlob, + boolean needChunkMetadataForBlob) + throws IOException { try { tsFileInput.position(start); if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { List timeseriesMetadataList = new ArrayList<>(); while (tsFileInput.position() < end) { timeseriesMetadataList.add( - TimeseriesMetadata.deserializeFrom(tsFileInput, needChunkMetadata)); + TimeseriesMetadata.deserializeFrom( + tsFileInput, needChunkMetadataForNonBlob, needChunkMetadataForBlob)); } timeseriesMetadataMap .computeIfAbsent(deviceId, k -> new ArrayList<>()) @@ -1440,7 +1487,8 @@ private void generateMetadataIndexUsingTsFileInput( deviceId, metadataIndexNode.getNodeType(), timeseriesMetadataMap, - needChunkMetadata); + needChunkMetadataForNonBlob, + needChunkMetadataForBlob); } } } catch (StopReadTsFileByInterruptException e) { @@ -1492,6 +1540,11 @@ public Map> getAllTimeseriesMetadata( return timeseriesMetadataMap; } + public Iterator>> iterAllTimeseriesMetadata( + boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + return new TimeseriesMetadataIterator(needChunkMetadataForNonBlob, needChunkMetadataForBlob); + } + /* This method will only deserialize the TimeseriesMetadata, not including chunk metadata list */ public List getDeviceTimeseriesMetadataWithoutChunkMetadata(IDeviceID device) throws IOException { @@ -2973,4 +3026,134 @@ public int hashCode() { public DeserializeConfig getDeserializeContext() { return deserializeConfig; } + + private class TimeseriesMetadataIterator + implements Iterator>> { + + private final Deque nodeStack = new ArrayDeque<>(); + private final boolean needChunkMetadataForNonBlob; + private final boolean needCHunkMetadataForBlob; + private Pair> nextValue; + private MetadataIndexNode currentLeafDeviceNode; + private int currentLeafDeviceNodeIndex; + + public TimeseriesMetadataIterator( + boolean needChunkMetadataForNonBlob, boolean needChunkMetadataForBlob) throws IOException { + this.needChunkMetadataForNonBlob = needChunkMetadataForNonBlob; + this.needCHunkMetadataForBlob = needChunkMetadataForBlob; + if (tsFileMetaData == null) { + readFileMetadata(); + } + + nodeStack.addAll(tsFileMetaData.getTableMetadataIndexNodeMap().values()); + } + + @Override + public boolean hasNext() { + if (nextValue != null) { + return true; + } + + try { + loadNextValue(); + } catch (IOException e) { + logger.warn("Cannot read timeseries metadata from {},", file, e); + return false; + } + return nextValue != null; + } + + @Override + public Pair> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Pair> ret = nextValue; + nextValue = null; + return ret; + } + + private void loadNextLeafDeviceNode() throws IOException { + while (!nodeStack.isEmpty()) { + MetadataIndexNode node = nodeStack.pop(); + MetadataIndexNodeType nodeType = node.getNodeType(); + if (nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE)) { + currentLeafDeviceNode = node; + currentLeafDeviceNodeIndex = 0; + return; + } + + List childrenIndex = node.getChildren(); + for (int i = 0; i < childrenIndex.size(); i++) { + long endOffset; + IMetadataIndexEntry childIndex = childrenIndex.get(i); + endOffset = node.getEndOffset(); + if (i != childrenIndex.size() - 1) { + endOffset = childrenIndex.get(i + 1).getOffset(); + } + + MetadataIndexNode child; + if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) { + ByteBuffer buffer = readData(childIndex.getOffset(), endOffset); + child = deserializeConfig.deserializeMetadataIndexNode(buffer, true); + } else { + tsFileInput.position(childIndex.getOffset()); + child = + deserializeConfig.deserializeMetadataIndexNode( + tsFileInput.wrapAsInputStream(), true); + } + nodeStack.push(child); + } + } + } + + private void loadNextValue() throws IOException { + if (currentLeafDeviceNode == null + || currentLeafDeviceNodeIndex >= currentLeafDeviceNode.getChildren().size()) { + currentLeafDeviceNode = null; + loadNextLeafDeviceNode(); + } + if (currentLeafDeviceNode == null) { + return; + } + + IMetadataIndexEntry childIndex = + currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex); + int childNum = currentLeafDeviceNode.getChildren().size(); + IDeviceID deviceId = ((DeviceMetadataIndexEntry) childIndex).getDeviceID(); + + Map> nextValueMap = new HashMap<>(1); + long endOffset = currentLeafDeviceNode.getEndOffset(); + if (currentLeafDeviceNodeIndex != childNum - 1) { + endOffset = + currentLeafDeviceNode.getChildren().get(currentLeafDeviceNodeIndex + 1).getOffset(); + } + if (endOffset - childIndex.getOffset() < Integer.MAX_VALUE) { + ByteBuffer nextBuffer = readData(childIndex.getOffset(), endOffset); + generateMetadataIndex( + childIndex, + nextBuffer, + deviceId, + currentLeafDeviceNode.getNodeType(), + nextValueMap, + needChunkMetadataForNonBlob, + needCHunkMetadataForBlob); + } else { + // when the buffer length is over than Integer.MAX_VALUE, + // using tsFileInput to get timeseriesMetadataList + generateMetadataIndexUsingTsFileInput( + childIndex, + childIndex.getOffset(), + endOffset, + deviceId, + currentLeafDeviceNode.getNodeType(), + nextValueMap, + needChunkMetadataForNonBlob, + needCHunkMetadataForBlob); + } + currentLeafDeviceNodeIndex++; + Entry> entry = nextValueMap.entrySet().iterator().next(); + nextValue = new Pair<>(entry.getKey(), entry.getValue()); + } + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java index dd7791971..a7e2e9eaf 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/common/Chunk.java @@ -383,4 +383,8 @@ public Chunk rewrite(TSDataType newType) throws IOException { chunkWriter.getStatistics(), encryptParam); } + + public boolean isSinglePageChunk() { + return (getHeader().getChunkType() & 0x3F) == MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java new file mode 100644 index 000000000..f89b30df6 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/TsFileLastReader.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader; + +import org.apache.tsfile.compress.IUnCompressor; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.reader.chunk.ChunkReader; +import org.apache.tsfile.read.reader.page.ValuePageReader; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; + +/** Conveniently retrieve last points of all timeseries from a TsFile. */ +public class TsFileLastReader + implements AutoCloseable, Iterator>>> { + + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileLastReader.class); + + private final TsFileSequenceReader sequenceReader; + private boolean asyncIO = true; + // when true, blob series will return a null TimeValuePair + private boolean ignoreBlob = false; + private Iterator>> timeseriesMetadataIter; + private Pair>> nextValue; + + private BlockingQueue>>> lastValueQueue; + private ForkJoinTask asyncTask; + + public TsFileLastReader(String filePath) throws IOException { + sequenceReader = new TsFileSequenceReader(filePath); + } + + /** + * @param filePath path of the TsFile + * @param asyncIO use asynchronous IO or not + * @param ignoreBlob whether to ignore series with blob type (the returned TimeValuePair will be + * null) + */ + public TsFileLastReader(String filePath, boolean asyncIO, boolean ignoreBlob) throws IOException { + this(filePath); + this.asyncIO = asyncIO; + this.ignoreBlob = ignoreBlob; + } + + @Override + public boolean hasNext() { + if (timeseriesMetadataIter == null) { + try { + init(); + } catch (IOException e) { + LOGGER.error("Cannot read timeseries metadata from {}", sequenceReader.getFileName(), e); + return false; + } + } + + // already meet the terminator + if (nextValue != null) { + return nextValue.getLeft() != null; + } + + if (asyncIO) { + return hasNextAsync(); + } else { + return hasNextSync(); + } + } + + private boolean hasNextSync() { + if (!timeseriesMetadataIter.hasNext()) { + nextValue = new Pair<>(null, null); + } else { + Pair> next = timeseriesMetadataIter.next(); + try { + nextValue = new Pair<>(next.left, convertToLastPoints(next.right)); + } catch (IOException e) { + LOGGER.error("Cannot read timeseries metadata from {}", sequenceReader.getFileName(), e); + return false; + } + } + return nextValue.left != null; + } + + private boolean hasNextAsync() { + try { + nextValue = lastValueQueue.take(); + if (nextValue.getLeft() == null) { + // the terminator + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + return nextValue.left != null; + } + + /** + * @return (deviceId, measurementId, lastPoint) + */ + @Override + public Pair>> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Pair>> ret = nextValue; + nextValue = null; + return ret; + } + + private List> convertToLastPoints( + List timeseriesMetadataList) throws IOException { + boolean isAligned = timeseriesMetadataList.get(0).getTsDataType() == TSDataType.VECTOR; + List> list = new ArrayList<>(); + for (TimeseriesMetadata meta : timeseriesMetadataList) { + Pair stringTimeValuePairPair = convertToLastPoint(meta, isAligned); + list.add(stringTimeValuePairPair); + } + return list; + } + + private TimeValuePair readNonAlignedLastPoint(Chunk chunk) throws IOException { + ChunkReader chunkReader = new ChunkReader(chunk); + BatchData batchData = null; + while (chunkReader.hasNextSatisfiedPage()) { + batchData = chunkReader.nextPageData(); + } + if (batchData != null) { + return batchData.getLastPairBeforeOrEqualTimestamp(Long.MAX_VALUE); + } else { + return null; + } + } + + private TimeValuePair readAlignedLastPoint(Chunk chunk, ChunkMetadata chunkMetadata, long endTime) + throws IOException { + ByteBuffer chunkData = chunk.getData(); + PageHeader lastPageHeader = null; + ByteBuffer lastPageData = null; + while (chunkData.hasRemaining()) { + PageHeader pageHeader; + if (chunk.isSinglePageChunk()) { + pageHeader = PageHeader.deserializeFrom(chunkData, chunkMetadata.getStatistics()); + } else { + pageHeader = PageHeader.deserializeFrom(chunkData, TSDataType.BLOB); + } + ByteBuffer pageData = chunkData.slice(); + pageData.limit(pageData.position() + pageHeader.getCompressedSize()); + chunkData.position(chunkData.position() + pageHeader.getCompressedSize()); + + if ((pageHeader.getStatistics() == null && pageHeader.getUncompressedSize() != 0) + || (pageHeader.getStatistics() != null && pageHeader.getStatistics().getCount() > 0)) { + lastPageHeader = pageHeader; + lastPageData = pageData; + } + } + + if (lastPageHeader != null) { + CompressionType compressionType = chunk.getHeader().getCompressionType(); + if (compressionType != CompressionType.UNCOMPRESSED) { + ByteBuffer uncompressedPage = ByteBuffer.allocate(lastPageHeader.getUncompressedSize()); + IUnCompressor.getUnCompressor(compressionType).uncompress(lastPageData, uncompressedPage); + lastPageData = uncompressedPage; + lastPageData.flip(); + } + + ValuePageReader valuePageReader = + new ValuePageReader( + lastPageHeader, + lastPageData, + TSDataType.BLOB, + Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.BLOB)); + TsPrimitiveType lastValue = null; + for (int i = 0; i < valuePageReader.getSize(); i++) { + // the timestamp here is not necessary + lastValue = valuePageReader.nextValue(0, i); + } + return new TimeValuePair(endTime, lastValue); + } else { + return null; + } + } + + private Pair convertToLastPoint( + TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException { + if (seriesMeta.getTsDataType() != TSDataType.BLOB) { + return new Pair<>( + seriesMeta.getMeasurementId(), + new TimeValuePair( + seriesMeta.getStatistics().getEndTime(), + seriesMeta.getTsDataType() == TSDataType.VECTOR + ? TsPrimitiveType.getByType( + TSDataType.INT64, seriesMeta.getStatistics().getEndTime()) + : TsPrimitiveType.getByType( + seriesMeta.getTsDataType(), seriesMeta.getStatistics().getLastValue()))); + } else { + return readLastPoint(seriesMeta, isAligned); + } + } + + private Pair readLastPoint( + TimeseriesMetadata seriesMeta, boolean isAligned) throws IOException { + if (seriesMeta.getChunkMetadataList() == null) { + return new Pair<>(seriesMeta.getMeasurementId(), null); + } + + ChunkMetadata lastNonEmptyChunkMetadata = null; + for (int i = seriesMeta.getChunkMetadataList().size() - 1; i >= 0; i--) { + ChunkMetadata chunkMetadata = (ChunkMetadata) seriesMeta.getChunkMetadataList().get(i); + if (chunkMetadata.getStatistics() == null || chunkMetadata.getStatistics().getCount() > 0) { + // the chunk of a single chunk series must not be empty + lastNonEmptyChunkMetadata = chunkMetadata; + break; + } + } + + if (lastNonEmptyChunkMetadata == null) { + return new Pair<>(seriesMeta.getMeasurementId(), null); + } + + Chunk chunk = sequenceReader.readMemChunk(lastNonEmptyChunkMetadata); + + if (!isAligned) { + return new Pair<>(seriesMeta.getMeasurementId(), readNonAlignedLastPoint(chunk)); + } else { + return new Pair<>( + seriesMeta.getMeasurementId(), + readAlignedLastPoint( + chunk, lastNonEmptyChunkMetadata, seriesMeta.getStatistics().getEndTime())); + } + } + + private void init() throws IOException { + timeseriesMetadataIter = sequenceReader.iterAllTimeseriesMetadata(false, !ignoreBlob); + if (asyncIO) { + int queueCapacity = 1024; + lastValueQueue = new ArrayBlockingQueue<>(queueCapacity); + asyncTask = + ForkJoinPool.commonPool() + .submit( + () -> { + try { + while (timeseriesMetadataIter.hasNext()) { + Pair> deviceSeriesMetadata = + timeseriesMetadataIter.next(); + lastValueQueue.put( + new Pair<>( + deviceSeriesMetadata.left, + convertToLastPoints(deviceSeriesMetadata.right))); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER.error("Error while reading timeseries metadata", e); + } finally { + try { + lastValueQueue.put(new Pair<>(null, null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return null; + }); + } + } + + @Override + public void close() throws Exception { + if (asyncIO) { + asyncTask.cancel(true); + } + sequenceReader.close(); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java index 317b493c5..fc9dd3755 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java @@ -619,4 +619,8 @@ public byte[] getBitmap() throws IOException { uncompressDataIfNecessary(); return Arrays.copyOf(bitmap, bitmap.length); } + + public int getSize() { + return size; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java b/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java index cbd9e7014..844dcfd37 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/WriteUtils.java @@ -61,4 +61,9 @@ public static int compareStrings(String a, String b) { } return a.compareTo(b); } + + @FunctionalInterface + public interface TabletAddValueFunction { + void addValue(Tablet tablet, int row, int column); + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java index f64dc059d..d84cd0577 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java @@ -412,12 +412,18 @@ public void addValue(int rowIndex, String measurement, int val) { @TsFileApi public void addValue(int rowIndex, int columnIndex, int val) { - if (!(values[columnIndex] instanceof int[])) { + if (!(values[columnIndex] instanceof int[]) && !(values[columnIndex] instanceof long[])) { throw new IllegalArgumentException( - "The data type of column index " + columnIndex + " is not INT32"); + "The data type of column index " + columnIndex + " is not INT32 or INT64"); } - final int[] sensor = (int[]) values[columnIndex]; - sensor[rowIndex] = val; + if (values[columnIndex] instanceof int[]) { + final int[] sensor = (int[]) values[columnIndex]; + sensor[rowIndex] = val; + } else if (values[columnIndex] instanceof long[]) { + final long[] sensor = (long[]) values[columnIndex]; + sensor[rowIndex] = val; + } + updateBitMap(rowIndex, columnIndex, false); } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java new file mode 100644 index 000000000..40b1cc8f0 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/TsFileLastReaderTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.utils.WriteUtils.TabletAddValueFunction; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; + +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@SuppressWarnings({"ResultOfMethodCallIgnored", "SameParameterValue"}) +public class TsFileLastReaderTest { + + private static final List dataTypes = + Arrays.asList(TSDataType.INT64, TSDataType.BLOB); + private static final Map typeAddValueFunctions = + new HashMap<>(); + + static { + typeAddValueFunctions.put( + TSDataType.INT64, ((tablet, row, column) -> tablet.addValue(row, column, (long) row))); + typeAddValueFunctions.put( + TSDataType.BLOB, + ((tablet, row, column) -> + tablet.addValue( + row, column, Long.toBinaryString(row).getBytes(StandardCharsets.UTF_8)))); + } + + private final String filePath = "target/test.tsfile"; + private final File file = new File(filePath); + + private void createFile(int deviceNum, int measurementNum, int seriesPointNum) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { + List measurementSchemaList = new ArrayList<>(); + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType)); + } + for (int i = 0; i < deviceNum; i++) { + writer.registerAlignedTimeseries("device" + i, measurementSchemaList); + } + + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, seriesPointNum); + for (int k = 0; k < seriesPointNum; k++) { + tablet.addTimestamp(k, k); + } + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < seriesPointNum; k++) { + typeAddValueFunctions.get(tsDataType).addValue(tablet, k, j); + } + } + writer.writeTree(tablet); + } + } + } + + // the second half measurements will have an emtpy last chunk each + private void createFileWithLastEmptyChunks(int deviceNum, int measurementNum, int seriesPointNum) + throws IOException, WriteProcessException { + try (TsFileWriter writer = new TsFileWriter(file)) { + List measurementSchemaList = new ArrayList<>(); + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + measurementSchemaList.add(new MeasurementSchema("s" + j, tsDataType)); + } + for (int i = 0; i < deviceNum; i++) { + writer.registerAlignedTimeseries("device" + i, measurementSchemaList); + } + + // the first half seriesPointNum points are not null for all series + int batchPointNum = seriesPointNum / 2; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, batchPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k); + } + for (int j = 0; j < measurementNum; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < batchPointNum; k++) { + typeAddValueFunctions.get(tsDataType).addValue(tablet, k, j); + } + } + writer.writeTree(tablet); + } + writer.flush(); + + // the second half series have no value for the remaining points + batchPointNum = seriesPointNum - batchPointNum; + for (int i = 0; i < deviceNum; i++) { + Tablet tablet = new Tablet("device" + i, measurementSchemaList, seriesPointNum); + for (int k = 0; k < batchPointNum; k++) { + tablet.addTimestamp(k, k + seriesPointNum / 2); + } + for (int j = 0; j < measurementNum / 2; j++) { + TSDataType tsDataType = dataTypes.get(j % dataTypes.size()); + for (int k = 0; k < seriesPointNum; k++) { + switch (tsDataType) { + case INT64: + tablet.addValue(k, j, (long) k + seriesPointNum / 2); + break; + case BLOB: + tablet.addValue( + k, + j, + Long.toBinaryString(k + seriesPointNum / 2).getBytes(StandardCharsets.UTF_8)); + break; + default: + throw new IllegalArgumentException("Unsupported TSDataType " + tsDataType); + } + } + } + writer.writeTree(tablet); + } + } + } + + private void doReadLastWithEmpty(int deviceNum, int measurementNum, int seriesPointNum) + throws Exception { + long startTime = System.currentTimeMillis(); + Set devices = new HashSet<>(); + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true, false)) { + while (lastReader.hasNext()) { + Set measurements = new HashSet<>(); + Pair>> next = lastReader.next(); + assertFalse(devices.contains(next.left)); + devices.add(next.left); + + // time column included + assertEquals(measurementNum + 1, next.getRight().size()); + next.right.forEach( + pair -> { + measurements.add(pair.getLeft()); + // the time column is regarded as the first half + int measurementIndex = + pair.left.isEmpty() ? -1 : Integer.parseInt(pair.getLeft().substring(1)); + + if (measurementIndex < measurementNum / 2) { + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } else { + assertEquals(seriesPointNum / 2 - 1, pair.getRight().getTimestamp()); + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum / 2 - 1, value.getLong()); + } else { + assertEquals( + new Binary( + Long.toBinaryString(seriesPointNum / 2 - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } + }); + assertEquals(measurementNum + 1, measurements.size()); + } + } + assertEquals(deviceNum, devices.size()); + System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); + } + + private void doReadLast(int deviceNum, int measurementNum, int seriesPointNum, boolean ignoreBlob) + throws Exception { + long startTime = System.currentTimeMillis(); + Set devices = new HashSet<>(); + try (TsFileLastReader lastReader = new TsFileLastReader(filePath, true, ignoreBlob)) { + while (lastReader.hasNext()) { + Set measurements = new HashSet<>(); + Pair>> next = lastReader.next(); + assertFalse(devices.contains(next.left)); + devices.add(next.left); + + // time column included + assertEquals(measurementNum + 1, next.getRight().size()); + next.right.forEach( + pair -> { + measurements.add(pair.getLeft()); + // the time column is regarded as the first half + int measurementIndex = + pair.left.isEmpty() ? -1 : Integer.parseInt(pair.getLeft().substring(1)); + TSDataType tsDataType = + measurementIndex == -1 + ? TSDataType.INT64 + : dataTypes.get(measurementIndex % dataTypes.size()); + + if (tsDataType == TSDataType.BLOB && ignoreBlob) { + assertNull(pair.getRight()); + return; + } + + assertEquals(seriesPointNum - 1, pair.getRight().getTimestamp()); + if (pair.getRight() == null) { + assertTrue(ignoreBlob); + } else { + TsPrimitiveType value = pair.getRight().getValue(); + if (value.getDataType() == TSDataType.INT64) { + assertEquals(seriesPointNum - 1, value.getLong()); + } else { + assertEquals( + new Binary(Long.toBinaryString(seriesPointNum - 1), StandardCharsets.UTF_8), + value.getBinary()); + } + } + }); + assertEquals(measurementNum + 1, measurements.size()); + } + } + assertEquals(deviceNum, devices.size()); + System.out.printf("Last point iteration takes %dms%n", System.currentTimeMillis() - startTime); + } + + private void testReadLast(int deviceNum, int measurementNum, int seriesPointNum) + throws Exception { + createFile(deviceNum, measurementNum, seriesPointNum); + doReadLast(deviceNum, measurementNum, seriesPointNum, false); + file.delete(); + } + + @Test + public void testSmall() throws Exception { + testReadLast(10, 10, 10); + } + + @Test + public void testManyDevices() throws Exception { + testReadLast(10000, 10, 10); + } + + @Test + public void testManyMeasurement() throws Exception { + testReadLast(10, 10000, 10); + } + + @Test + public void testManyPoints() throws Exception { + testReadLast(100, 10, 10000); + } + + @Test + public void testManyMany() throws Exception { + testReadLast(100, 100, 100); + } + + @Test + public void testLastEmptyChunks() throws Exception { + createFileWithLastEmptyChunks(100, 100, 100); + doReadLastWithEmpty(100, 100, 100); + } + + @Test + public void testLastEmptyPage() throws Exception { + try (TsFileIOWriter ioWriter = new TsFileIOWriter(file)) { + ioWriter.startChunkGroup(Factory.DEFAULT_FACTORY.create("root.db1.d1")); + List measurementSchemaList = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.BLOB)); + AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(measurementSchemaList); + alignedChunkWriter.write( + 0, + new TsPrimitiveType[] { + TsPrimitiveType.getByType(TSDataType.INT64, 0L), + TsPrimitiveType.getByType( + TSDataType.BLOB, new Binary("0".getBytes(StandardCharsets.UTF_8))) + }); + alignedChunkWriter.sealCurrentPage(); + alignedChunkWriter.write( + 1, new TsPrimitiveType[] {TsPrimitiveType.getByType(TSDataType.INT64, 1L), null}); + alignedChunkWriter.writeToFileWriter(ioWriter); + ioWriter.endChunkGroup(); + + ioWriter.endFile(); + } + + try (TsFileLastReader lastReader = new TsFileLastReader(filePath)) { + Pair>> next = lastReader.next(); + assertEquals(Factory.DEFAULT_FACTORY.create("root.db1.d1"), next.getLeft()); + assertEquals(3, next.getRight().size()); + assertEquals("s1", next.getRight().get(1).left); + assertEquals("s2", next.getRight().get(2).left); + assertEquals(1, next.getRight().get(1).right.getTimestamp()); + assertEquals(1, next.getRight().get(1).right.getValue().getLong()); + assertEquals(0, next.getRight().get(2).right.getTimestamp()); + assertEquals("0", next.getRight().get(2).right.getValue().getStringValue()); + } + } + + @Test + public void testIgnoreBlob() throws Exception { + createFile(10, 10, 10); + doReadLast(10, 10, 10, true); + file.delete(); + } + + @Ignore("Performance") + @Test + public void testManyRead() throws Exception { + int deviceNum = 10000; + int measurementNum = 1000; + int seriesPointNum = 1; + createFile(deviceNum, measurementNum, seriesPointNum); + for (int i = 0; i < 10; i++) { + doReadLast(deviceNum, measurementNum, seriesPointNum, false); + } + file.delete(); + } +} diff --git a/pom.xml b/pom.xml index 2e1d55335..08fb36557 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT pom Apache TsFile Project Parent POM @@ -68,7 +68,7 @@ maven-surefire-plugin 3.5.0 - ${argLine} -Xmx1024m + ${argLine} diff --git a/python/pom.xml b/python/pom.xml index 051e77f0e..88e48818e 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -22,7 +22,7 @@ org.apache.tsfile tsfile-parent - 2.1.0-250325-SNAPSHOT + 2.1.0-SNAPSHOT tsfile-python pom