Skip to content

Commit 30f489e

Browse files
microbearzzhanshaoxiongcodope
authored
[HUDI-4729] Fix file group pending compaction cannot be queried when query _ro table (#6516)
File group in pending compaction can not be queried when query _ro table with spark. This commit fixes that. Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
1 parent 6dbe296 commit 30f489e

4 files changed

Lines changed: 116 additions & 31 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
* </ul>
6969
*/
7070
public abstract class BaseHoodieTableFileIndex implements AutoCloseable {
71-
7271
private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
7372

7473
private final String[] partitionColumns;

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ public Stream<FileSlice> getAllFileSlices() {
153153
return Stream.empty();
154154
}
155155

156+
public Stream<FileSlice> getAllFileSlicesBeforeOn(String maxInstantTime) {
157+
return fileSlices.values().stream().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime));
158+
}
159+
156160
/**
157161
* Gets the latest slice - this can contain either.
158162
* <p>

hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -413,18 +413,21 @@ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
413413
* base-files.
414414
*
415415
* @param fileSlice File Slice
416+
* @param includeEmptyFileSlice include empty file-slice
416417
*/
417-
protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) {
418+
protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) {
418419
if (isFileSliceAfterPendingCompaction(fileSlice)) {
419420
LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
420421
// Base file is filtered out of the file-slice as the corresponding compaction
421422
// instant not completed yet.
422-
FileSlice transformed =
423-
new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
423+
FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
424424
fileSlice.getLogFiles().forEach(transformed::addLogFile);
425-
return transformed;
425+
if (transformed.isEmpty() && !includeEmptyFileSlice) {
426+
return Stream.of();
427+
}
428+
return Stream.of(transformed);
426429
}
427-
return fileSlice;
430+
return Stream.of(fileSlice);
428431
}
429432

430433
protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) {
@@ -606,9 +609,9 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
606609
String partitionPath = formatPartitionKey(partitionStr);
607610
ensurePartitionLoadedCorrectly(partitionPath);
608611
return fetchLatestFileSlices(partitionPath)
609-
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
610-
.map(this::filterBaseFileAfterPendingCompaction)
611-
.map(this::addBootstrapBaseFileIfPresent);
612+
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
613+
.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
614+
.map(this::addBootstrapBaseFileIfPresent);
612615
} finally {
613616
readLock.unlock();
614617
}
@@ -627,7 +630,10 @@ public final Option<FileSlice> getLatestFileSlice(String partitionStr, String fi
627630
return Option.empty();
628631
} else {
629632
Option<FileSlice> fs = fetchLatestFileSlice(partitionPath, fileId);
630-
return fs.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
633+
if (!fs.isPresent()) {
634+
return Option.empty();
635+
}
636+
return Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
631637
}
632638
} finally {
633639
readLock.unlock();
@@ -665,13 +671,21 @@ public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr
665671
readLock.lock();
666672
String partitionPath = formatPartitionKey(partitionStr);
667673
ensurePartitionLoadedCorrectly(partitionPath);
668-
Stream<FileSlice> fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
669-
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime));
674+
Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
675+
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
676+
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
670677
if (includeFileSlicesInPendingCompaction) {
671-
return fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
678+
return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
679+
.map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
680+
.map(this::addBootstrapBaseFileIfPresent);
672681
} else {
673-
return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()))
674-
.map(this::addBootstrapBaseFileIfPresent);
682+
return allFileSliceStream
683+
.map(sliceStream ->
684+
Option.fromJavaOptional(sliceStream
685+
.filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
686+
.filter(slice -> !slice.isEmpty())
687+
.findFirst()))
688+
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
675689
}
676690
} finally {
677691
readLock.unlock();
@@ -893,7 +907,6 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingCompactio
893907
*/
894908
abstract Stream<BootstrapBaseFileMapping> fetchBootstrapBaseFiles();
895909

896-
897910
/**
898911
* Checks if partition is pre-loaded and available in store.
899912
*
@@ -967,7 +980,7 @@ Stream<FileSlice> fetchLatestFileSliceInRange(List<String> commitsToReturn) {
967980
*/
968981
Stream<FileSlice> fetchAllFileSlices(String partitionPath) {
969982
return fetchAllStoredFileGroups(partitionPath).map(this::addBootstrapBaseFileIfPresent)
970-
.map(HoodieFileGroup::getAllFileSlices).flatMap(sliceList -> sliceList);
983+
.flatMap(HoodieFileGroup::getAllFileSlices);
971984
}
972985

973986
/**
@@ -1003,8 +1016,7 @@ private Stream<HoodieBaseFile> fetchLatestBaseFiles() {
10031016
* @param partitionPath partition-path
10041017
*/
10051018
Stream<HoodieBaseFile> fetchAllBaseFiles(String partitionPath) {
1006-
return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getAllBaseFiles)
1007-
.flatMap(baseFileList -> baseFileList);
1019+
return fetchAllStoredFileGroups(partitionPath).flatMap(HoodieFileGroup::getAllBaseFiles);
10081020
}
10091021

10101022
/**
@@ -1023,18 +1035,6 @@ Stream<FileSlice> fetchLatestFileSlices(String partitionPath) {
10231035
.map(Option::get);
10241036
}
10251037

1026-
/**
1027-
* Default implementation for fetching latest file-slices for a partition path as of instant.
1028-
*
1029-
* @param partitionPath Partition Path
1030-
* @param maxCommitTime Instant Time
1031-
*/
1032-
Stream<FileSlice> fetchLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) {
1033-
return fetchAllStoredFileGroups(partitionPath)
1034-
.map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)).filter(Option::isPresent)
1035-
.map(Option::get);
1036-
}
1037-
10381038
/**
10391039
* Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet.
10401040
*
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hudi
19+
20+
class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
21+
test("Test Query Merge_On_Read Read_Optimized table") {
22+
withTempDir { tmp =>
23+
val tableName = generateTableName
24+
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
25+
// create table
26+
spark.sql(
27+
s"""
28+
|create table $tableName (
29+
| id int,
30+
| name string,
31+
| price double,
32+
| ts long
33+
|) using hudi
34+
| partitioned by (ts)
35+
| location '$tablePath'
36+
| tblproperties (
37+
| type = 'mor',
38+
| primaryKey = 'id',
39+
| preCombineField = 'ts'
40+
| )
41+
""".stripMargin)
42+
// insert data to table
43+
spark.sql("set hoodie.parquet.max.file.size = 10000")
44+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
45+
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
46+
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
47+
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
48+
spark.sql(s"update $tableName set price = 11 where id = 1")
49+
spark.sql(s"update $tableName set price = 21 where id = 2")
50+
spark.sql(s"update $tableName set price = 31 where id = 3")
51+
spark.sql(s"update $tableName set price = 41 where id = 4")
52+
53+
// expect that all complete parquet files can be scanned
54+
assertQueryResult(4, tablePath)
55+
56+
// async schedule compaction job
57+
spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
58+
.collect()
59+
60+
// expect that all complete parquet files can be scanned with a pending compaction job
61+
assertQueryResult(4, tablePath)
62+
63+
spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
64+
65+
// expect that all complete parquet files can be scanned with a pending compaction job
66+
assertQueryResult(5, tablePath)
67+
68+
// async run compaction job
69+
spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
70+
.collect()
71+
72+
// assert that all complete parquet files can be scanned after compaction
73+
assertQueryResult(5, tablePath)
74+
}
75+
}
76+
77+
def assertQueryResult(expected: Any,
78+
tablePath: String): Unit = {
79+
val actual = spark.read.format("org.apache.hudi").option("hoodie.datasource.query.type", "read_optimized").load(tablePath).count()
80+
assertResult(expected)(actual)
81+
}
82+
}

0 commit comments

Comments
 (0)