Skip to content

Commit 34427d0

Browse files
committed
Fixing reading from metadata table when there are no inflight commits
1 parent 15ca7a3 commit 34427d0

File tree

2 files changed

+35
-8
lines changed

2 files changed

+35
-8
lines changed

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,8 @@ public void testTurnOffMetadataTableAfterEnable() throws Exception {
344344
HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT);
345345

346346
// Simulate the complete data directory including ".hoodie_partition_metadata" file
347-
File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata");
348-
File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata");
347+
File metaForP1 = new File(metaClient.getBasePath() + "/p1", ".hoodie_partition_metadata");
348+
File metaForP2 = new File(metaClient.getBasePath() + "/p2", ".hoodie_partition_metadata");
349349
metaForP1.createNewFile();
350350
metaForP2.createNewFile();
351351

@@ -1716,8 +1716,8 @@ public void testMultiWriterForDoubleLocking() throws Exception {
17161716

17171717
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
17181718
.withCleanConfig(HoodieCleanConfig.newBuilder()
1719-
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4)
1720-
.build())
1719+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4)
1720+
.build())
17211721
.withAutoCommit(false)
17221722
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
17231723
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
@@ -1818,6 +1818,29 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
18181818
validateMetadata(client);
18191819
}
18201820

1821+
@Test
1822+
public void testMetadataReadWithNoCompletedCommits() throws Exception {
1823+
init(HoodieTableType.COPY_ON_WRITE);
1824+
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
1825+
1826+
List<HoodieRecord> records;
1827+
List<WriteStatus> writeStatuses;
1828+
String[] commitTimestamps = {HoodieActiveTimeline.createNewInstantTime(), HoodieActiveTimeline.createNewInstantTime()};
1829+
1830+
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
1831+
records = dataGen.generateInserts(commitTimestamps[0], 5);
1832+
client.startCommitWithTime(commitTimestamps[0]);
1833+
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamps[0]).collect();
1834+
assertNoWriteErrors(writeStatuses);
1835+
1836+
// make all commits to inflight in metadata table. Still read should go through, just that it may not return any data.
1837+
FileCreateUtils.deleteDeltaCommit(basePath + "/.hoodie/metadata/", commitTimestamps[0]);
1838+
FileCreateUtils.deleteDeltaCommit(basePath + " /.hoodie/metadata/", HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP);
1839+
assertEquals(getAllFiles(metadata(client)).stream().map(p -> p.getName()).map(n -> FSUtils.getCommitTime(n)).collect(Collectors.toSet()).size(), 0);
1840+
}
1841+
}
1842+
1843+
18211844
/**
18221845
* Ensure that the reader only reads completed instants.
18231846
*
@@ -2050,7 +2073,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte
20502073
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
20512074
HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
20522075
.withCleanConfig(HoodieCleanConfig.newBuilder()
2053-
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
2076+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
20542077
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
20552078
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
20562079
.withProperties(properties)
@@ -2078,7 +2101,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte
20782101
// set hoodie.table.version to 2 in hoodie.properties file
20792102
changeTableVersion(HoodieTableVersion.TWO);
20802103
writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).withCleanConfig(HoodieCleanConfig.newBuilder()
2081-
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
2104+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
20822105
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
20832106
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
20842107
.withProperties(properties)

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,8 +1052,12 @@ private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient meta
10521052
HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient));
10531053
Stream<FileSlice> fileSliceStream;
10541054
if (mergeFileSlices) {
1055-
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
1056-
partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
1055+
if (metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) {
1056+
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
1057+
partition, metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
1058+
} else {
1059+
return Collections.EMPTY_LIST;
1060+
}
10571061
} else {
10581062
fileSliceStream = fsView.getLatestFileSlices(partition);
10591063
}

0 commit comments

Comments
 (0)