Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ private void initialize() throws QueryProcessException {
QueryDataSource queryDataSource =
new QueryDataSource(dataSource.getSeqResources(), dataSource.getUnseqResources());

queryDataSource.setSingleDevice(dataSource.isSingleDevice());

queryDataSource.setDataTTL(dataSource.getDataTTL());

sourceOperator.initQueryDataSource(queryDataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ public void initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProce
closedFilePaths = new HashSet<>();
unClosedFilePaths = new HashSet<>();
addUsedFilesForQuery(sharedQueryDataSource);
sharedQueryDataSource.setSingleDevice(selectedDeviceIdSet.size() == 1);
}
} finally {
setInitQueryDataSourceCost(System.nanoTime() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.IMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand Down Expand Up @@ -68,6 +69,8 @@ public class SeriesScanUtil {

// The path of the target series which will be scanned.
protected final PartialPath seriesPath;

private final IDeviceID deviceID;
protected boolean isAligned = false;
private final TSDataType dataType;

Expand Down Expand Up @@ -113,6 +116,7 @@ public SeriesScanUtil(
SeriesScanOptions scanOptions,
FragmentInstanceContext context) {
this.seriesPath = seriesPath;
this.deviceID = seriesPath.getIDeviceID();
this.dataType = seriesPath.getSeriesType();

this.scanOptions = scanOptions;
Expand Down Expand Up @@ -155,7 +159,7 @@ public SeriesScanUtil(
* @param dataSource the query data source
*/
public void initQueryDataSource(QueryDataSource dataSource) {
dataSource.fillOrderIndexes(seriesPath.getDevice(), orderUtils.getAscending());
dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending());
this.dataSource = dataSource;

// updated filter concerning TTL
Expand Down Expand Up @@ -1116,12 +1120,10 @@ && timeAllSelected(firstTimeSeriesMetadata)) {

private void unpackAllOverlappedTsFilesToTimeSeriesMetadata(long endpointTime)
throws IOException {
while (orderUtils.hasNextUnseqResource()
&& orderUtils.isOverlapped(endpointTime, orderUtils.getNextUnseqFileResource(false))) {
while (orderUtils.hasNextUnseqResource() && orderUtils.isCurUnSeqOverlappedWith(endpointTime)) {
unpackUnseqTsFileResource();
}
while (orderUtils.hasNextSeqResource()
&& orderUtils.isOverlapped(endpointTime, orderUtils.getNextSeqFileResource(false))) {
while (orderUtils.hasNextSeqResource() && orderUtils.isCurSeqOverlappedWith(endpointTime)) {
unpackSeqTsFileResource();
}
}
Expand Down Expand Up @@ -1256,15 +1258,15 @@ public interface TimeOrderUtils {

long getOrderTime(Statistics<? extends Object> statistics);

long getOrderTime(TsFileResource fileResource);

long getOverlapCheckTime(Statistics<? extends Object> range);

boolean isOverlapped(Statistics<? extends Object> left, Statistics<? extends Object> right);

boolean isOverlapped(long time, Statistics<? extends Object> right);

boolean isOverlapped(long time, TsFileResource right);
boolean isCurSeqOverlappedWith(long time);

boolean isCurUnSeqOverlappedWith(long time);

<T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);

Expand Down Expand Up @@ -1300,12 +1302,6 @@ public long getOrderTime(Statistics statistics) {
return statistics.getEndTime();
}

@SuppressWarnings("squid:S3740")
@Override
public long getOrderTime(TsFileResource fileResource) {
return fileResource.getEndTime(seriesPath.getIDeviceID());
}

@SuppressWarnings("squid:S3740")
@Override
public long getOverlapCheckTime(Statistics range) {
Expand All @@ -1325,8 +1321,13 @@ public boolean isOverlapped(long time, Statistics right) {
}

@Override
public boolean isOverlapped(long time, TsFileResource right) {
return time <= right.getEndTime(seriesPath.getIDeviceID());
public boolean isCurSeqOverlappedWith(long time) {
return time <= dataSource.getCurrentSeqOrderTime(curSeqFileIndex);
}

@Override
public boolean isCurUnSeqOverlappedWith(long time) {
return time <= dataSource.getCurrentUnSeqOrderTime(curUnseqFileIndex);
}

@Override
Expand Down Expand Up @@ -1365,30 +1366,26 @@ public boolean getAscending() {

@Override
public boolean hasNextSeqResource() {
while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(), true, false)) {
while (dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID)) {
if (dataSource.isSeqSatisfied(
deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
break;
}
curSeqFileIndex--;
}
return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
return dataSource.hasNextSeqResource(curSeqFileIndex, false, deviceID);
}

@Override
public boolean hasNextUnseqResource() {
while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(), false, false)) {
while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID)) {
if (dataSource.isUnSeqSatisfied(
deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
break;
}
curUnseqFileIndex++;
}
return dataSource.hasNextUnseqResource(curUnseqFileIndex);
return dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID);
}

@Override
Expand Down Expand Up @@ -1423,12 +1420,6 @@ public long getOrderTime(Statistics statistics) {
return statistics.getStartTime();
}

@SuppressWarnings("squid:S3740")
@Override
public long getOrderTime(TsFileResource fileResource) {
return fileResource.getStartTime(seriesPath.getIDeviceID());
}

@SuppressWarnings("squid:S3740")
@Override
public long getOverlapCheckTime(Statistics range) {
Expand All @@ -1448,8 +1439,13 @@ public boolean isOverlapped(long time, Statistics right) {
}

@Override
public boolean isOverlapped(long time, TsFileResource right) {
return time >= right.getStartTime(seriesPath.getIDeviceID());
public boolean isCurSeqOverlappedWith(long time) {
return time >= dataSource.getCurrentSeqOrderTime(curSeqFileIndex);
}

@Override
public boolean isCurUnSeqOverlappedWith(long time) {
return time >= dataSource.getCurrentUnSeqOrderTime(curUnseqFileIndex);
}

@Override
Expand Down Expand Up @@ -1488,30 +1484,26 @@ public boolean getAscending() {

@Override
public boolean hasNextSeqResource() {
while (dataSource.hasNextSeqResource(curSeqFileIndex, getAscending())) {
TsFileResource tsFileResource = dataSource.getSeqResourceByIndex(curSeqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(), true, false)) {
while (dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID)) {
if (dataSource.isSeqSatisfied(
deviceID, curSeqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
break;
}
curSeqFileIndex++;
}
return dataSource.hasNextSeqResource(curSeqFileIndex, getAscending());
return dataSource.hasNextSeqResource(curSeqFileIndex, true, deviceID);
}

@Override
public boolean hasNextUnseqResource() {
while (dataSource.hasNextUnseqResource(curUnseqFileIndex)) {
TsFileResource tsFileResource = dataSource.getUnseqResourceByIndex(curUnseqFileIndex);
if (tsFileResource != null
&& tsFileResource.isSatisfied(
seriesPath.getIDeviceID(), scanOptions.getGlobalTimeFilter(), false, false)) {
while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID)) {
if (dataSource.isUnSeqSatisfied(
deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) {
break;
}
curUnseqFileIndex++;
}
return dataSource.hasNextUnseqResource(curUnseqFileIndex);
return dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public static class TimeSeriesMetadataCacheKey {

private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TimeSeriesMetadataCacheKey.class)
+ 2 * RamUsageEstimator.shallowSizeOfInstance(String.class);
+ RamUsageEstimator.shallowSizeOfInstance(String.class);

private final int regionId;
private final long timePartitionId;
Expand Down Expand Up @@ -296,9 +296,7 @@ public TimeSeriesMetadataCacheKey(
}

public long getRetainedSizeInBytes() {
return INSTANCE_SIZE
+ sizeOfCharArray(((PlainDeviceID) device).toStringID().length())
+ sizeOfCharArray(measurement.length());
return INSTANCE_SIZE + device.ramBytesUsed() + sizeOfCharArray(measurement.length());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void perform() throws Exception {
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
IDeviceID device = deviceInfo.left;
boolean isAligned = deviceInfo.right;
queryDataSource.fillOrderIndexes(((PlainDeviceID) device).toStringID(), true);
queryDataSource.fillOrderIndexes(device, true);

if (isAligned) {
compactAlignedSeries(
Expand Down Expand Up @@ -220,7 +220,7 @@ private void compactNonAlignedSeries(
device,
measurementListArray[i],
fragmentInstanceContext,
queryDataSource,
new QueryDataSource(queryDataSource),
compactionWriter,
schemaMap,
i)));
Expand Down
Loading