diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 5910de5f1d39e..0aa9d40e28e6d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -68,7 +68,6 @@ * */ public abstract class BaseHoodieTableFileIndex implements AutoCloseable { - private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class); private final String[] partitionColumns; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java index 9e407aa7660e5..9b5e8c1dd6f02 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java @@ -153,6 +153,10 @@ public Stream getAllFileSlices() { return Stream.empty(); } + public Stream getAllFileSlicesBeforeOn(String maxInstantTime) { + return fileSlices.values().stream().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime)); + } + /** * Gets the latest slice - this can contain either. *

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 5818636caef2c..8cfd92d01fec8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -413,18 +413,21 @@ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) { * base-files. * * @param fileSlice File Slice + * @param includeEmptyFileSlice include empty file-slice */ - protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) { + protected Stream filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) { if (isFileSliceAfterPendingCompaction(fileSlice)) { LOG.debug("File Slice (" + fileSlice + ") is in pending compaction"); // Base file is filtered out of the file-slice as the corresponding compaction // instant not completed yet. - FileSlice transformed = - new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId()); + FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId()); fileSlice.getLogFiles().forEach(transformed::addLogFile); - return transformed; + if (transformed.isEmpty() && !includeEmptyFileSlice) { + return Stream.of(); + } + return Stream.of(transformed); } - return fileSlice; + return Stream.of(fileSlice); } protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) { @@ -606,9 +609,9 @@ public final Stream getLatestFileSlices(String partitionStr) { String partitionPath = formatPartitionKey(partitionStr); ensurePartitionLoadedCorrectly(partitionPath); return fetchLatestFileSlices(partitionPath) - .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId())) - .map(this::filterBaseFileAfterPendingCompaction) - .map(this::addBootstrapBaseFileIfPresent); + .filter(slice -> !isFileGroupReplaced(slice.getFileGroupId())) + .flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true)) + .map(this::addBootstrapBaseFileIfPresent); } finally { readLock.unlock(); } @@ -627,7 +630,10 @@ public final Option getLatestFileSlice(String partitionStr, String fi return Option.empty(); } else { Option fs = fetchLatestFileSlice(partitionPath, fileId); - return fs.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent); + if (!fs.isPresent()) { + return Option.empty(); + } + return Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null)); } } finally { readLock.unlock(); @@ -665,13 +671,21 @@ public final Stream getLatestFileSlicesBeforeOrOn(String partitionStr readLock.lock(); String partitionPath = formatPartitionKey(partitionStr); ensurePartitionLoadedCorrectly(partitionPath); - Stream fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime) - .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime)); + Stream> allFileSliceStream = fetchAllStoredFileGroups(partitionPath) + .filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime)) + .map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime)); if (includeFileSlicesInPendingCompaction) { - return fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent); + return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false))) + .map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get) + .map(this::addBootstrapBaseFileIfPresent); } else { - return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId())) - .map(this::addBootstrapBaseFileIfPresent); + return allFileSliceStream + .map(sliceStream -> + Option.fromJavaOptional(sliceStream + .filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId())) + .filter(slice -> !slice.isEmpty()) + .findFirst())) + .filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent); } } finally { readLock.unlock(); @@ -893,7 +907,6 @@ protected abstract Option> getPendingCompactio */ abstract Stream fetchBootstrapBaseFiles(); - /** * Checks if partition is pre-loaded and available in store. * @@ -967,7 +980,7 @@ Stream fetchLatestFileSliceInRange(List commitsToReturn) { */ Stream fetchAllFileSlices(String partitionPath) { return fetchAllStoredFileGroups(partitionPath).map(this::addBootstrapBaseFileIfPresent) - .map(HoodieFileGroup::getAllFileSlices).flatMap(sliceList -> sliceList); + .flatMap(HoodieFileGroup::getAllFileSlices); } /** @@ -1003,8 +1016,7 @@ private Stream fetchLatestBaseFiles() { * @param partitionPath partition-path */ Stream fetchAllBaseFiles(String partitionPath) { - return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getAllBaseFiles) - .flatMap(baseFileList -> baseFileList); + return fetchAllStoredFileGroups(partitionPath).flatMap(HoodieFileGroup::getAllBaseFiles); } /** @@ -1023,18 +1035,6 @@ Stream fetchLatestFileSlices(String partitionPath) { .map(Option::get); } - /** - * Default implementation for fetching latest file-slices for a partition path as of instant. - * - * @param partitionPath Partition Path - * @param maxCommitTime Instant Time - */ - Stream fetchLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) { - return fetchAllStoredFileGroups(partitionPath) - .map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)).filter(Option::isPresent) - .map(Option::get); - } - /** * Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet. * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala new file mode 100644 index 0000000000000..3f6934d973427 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryMergeOnReadOptimizedTable.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase { + test("Test Query Merge_On_Read Read_Optimized table") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '$tablePath' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql("set hoodie.parquet.max.file.size = 10000") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)") + spark.sql(s"update $tableName set price = 11 where id = 1") + spark.sql(s"update $tableName set price = 21 where id = 2") + spark.sql(s"update $tableName set price = 31 where id = 3") + spark.sql(s"update $tableName set price = 41 where id = 4") + + // expect that all complete parquet files can be scanned + assertQueryResult(4, tablePath) + + // async schedule compaction job + spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')") + .collect() + + // expect that all complete parquet files can be scanned with a pending compaction job + assertQueryResult(4, tablePath) + + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)") + + // expect that all complete parquet files can be scanned with a pending compaction job + assertQueryResult(5, tablePath) + + // async run compaction job + spark.sql(s"call run_compaction(op => 'run', table => '$tableName')") + .collect() + + // assert that all complete parquet files can be scanned after compaction + assertQueryResult(5, tablePath) + } + } + + def assertQueryResult(expected: Any, + tablePath: String): Unit = { + val actual = spark.read.format("org.apache.hudi").option("hoodie.datasource.query.type", "read_optimized").load(tablePath).count() + assertResult(expected)(actual) + } +}