Skip to content

Commit 09a4a81

Browse files
committed
[HUDI-3015] Implement #reset and #sync for metadata filesystem view
1 parent f5b07a7 commit 09a4a81

9 files changed

Lines changed: 63 additions & 40 deletions

File tree

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -100,38 +100,33 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E
100100
.build()).withAutoCommit(false).withProperties(properties).build();
101101
// Create the first commit
102102
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
103-
try {
104-
ExecutorService executors = Executors.newFixedThreadPool(2);
105-
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
106-
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
107-
Future future1 = executors.submit(() -> {
108-
String newCommitTime = "004";
109-
int numRecords = 100;
110-
String commitTimeBetweenPrevAndNew = "002";
111-
try {
112-
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
113-
} catch (Exception e1) {
114-
assertTrue(e1 instanceof HoodieWriteConflictException);
115-
throw new RuntimeException(e1);
116-
}
117-
});
118-
Future future2 = executors.submit(() -> {
119-
String newCommitTime = "005";
120-
int numRecords = 100;
121-
String commitTimeBetweenPrevAndNew = "002";
122-
try {
123-
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
124-
} catch (Exception e2) {
125-
assertTrue(e2 instanceof HoodieWriteConflictException);
126-
throw new RuntimeException(e2);
127-
}
128-
});
129-
future1.get();
130-
future2.get();
131-
fail("Should not reach here, this means concurrent writes were handled incorrectly");
132-
} catch (Exception e) {
133-
// Expected to fail due to overlapping commits
134-
}
103+
ExecutorService executors = Executors.newFixedThreadPool(2);
104+
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
105+
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
106+
Future future1 = executors.submit(() -> {
107+
String newCommitTime = "004";
108+
int numRecords = 100;
109+
String commitTimeBetweenPrevAndNew = "002";
110+
try {
111+
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
112+
} catch (Exception e1) {
113+
assertTrue(e1 instanceof HoodieWriteConflictException);
114+
throw new RuntimeException(e1);
115+
}
116+
});
117+
Future future2 = executors.submit(() -> {
118+
String newCommitTime = "005";
119+
int numRecords = 100;
120+
String commitTimeBetweenPrevAndNew = "002";
121+
try {
122+
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
123+
} catch (Exception e2) {
124+
assertTrue(e2 instanceof HoodieWriteConflictException);
125+
throw new RuntimeException(e2);
126+
}
127+
});
128+
future1.get();
129+
future2.get();
135130
}
136131

137132
@Test

hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ public void close() {
257257
* Clears the partition Map and reset view states.
258258
*/
259259
@Override
260-
public final void reset() {
260+
public void reset() {
261261
try {
262262
writeLock.lock();
263263
clear();
@@ -1135,8 +1135,7 @@ public void sync() {
11351135
*/
11361136
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
11371137
refreshTimeline(newTimeline);
1138-
addedPartitions.clear();
1139-
resetViewState();
1138+
clear();
11401139
// Initialize with new Hoodie timeline.
11411140
init(metaClient, newTimeline);
11421141
}

hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ public HoodieTimeline getTimeline() {
253253

254254
@Override
255255
public void sync() {
256-
preferredView.reset();
257-
secondaryView.reset();
256+
preferredView.sync();
257+
secondaryView.sync();
258258
}
259259

260260
@Override

hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,8 @@ public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
413413
public boolean refresh() {
414414
Map<String, String> paramsMap = getParams();
415415
try {
416+
// refresh the local timeline first.
417+
this.timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
416418
return executeRequest(REFRESH_TABLE, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
417419
} catch (IOException e) {
418420
throw new HoodieRemoteException(e);
@@ -450,7 +452,6 @@ public void close() {
450452

451453
@Override
452454
public void reset() {
453-
timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
454455
refresh();
455456
}
456457

hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,9 @@ public Option<String> getLatestCompactionTime() {
139139
public void close() throws Exception {
140140
// no-op
141141
}
142+
143+
@Override
144+
public void reset() {
145+
// no-op
146+
}
142147
}

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,4 +441,10 @@ public Option<String> getLatestCompactionTime() {
441441
}
442442
return Option.empty();
443443
}
444+
445+
@Override
446+
public void reset() {
447+
initIfNeeded();
448+
dataMetaClient.reloadActiveTimeline();
449+
}
444450
}

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,18 @@ protected FileStatus[] listPartition(Path partitionPath) throws IOException {
6565
return tableMetadata.getAllFilesInPartition(partitionPath);
6666
}
6767

68+
@Override
69+
public void reset() {
70+
super.reset();
71+
tableMetadata.reset();
72+
}
73+
74+
@Override
75+
public void sync() {
76+
super.sync();
77+
tableMetadata.reset();
78+
}
79+
6880
@Override
6981
public void close() {
7082
try {

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,9 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad
113113
* Returns the timestamp of the latest compaction.
114114
*/
115115
Option<String> getLatestCompactionTime();
116+
117+
/**
118+
* Clear the states of the table metadata.
119+
*/
120+
void reset();
116121
}

hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,8 @@ public void testGetTimeline() {
622622
@Test
623623
public void testSync() {
624624
fsView.sync();
625-
verify(primary, times(1)).reset();
626-
verify(secondary, times(1)).reset();
625+
verify(primary, times(1)).sync();
626+
verify(secondary, times(1)).sync();
627627
}
628628

629629
@Test

0 commit comments

Comments
 (0)