Skip to content

Commit db8e1c3

Browse files
committed
[HUDI-4167] Remove the timeline refresh with initializing hoodie table
The timeline refresh invokes the fs view #sync, which has two actions now: 1. reload the timeline of the fs view, so that the nest fs view request is based on this timeline metadata 2. if this is a local fs view, clear all the local states or if this is a remote fs view, send request to sync the remote fs view But, let's see the construction, the meta client is instantiated freshly so the timeline is already the latest, the table is also constructed freshly, so the fs view has no local states, that means, the #sync is unnecessary totally. In this patch, the metadata lifecycle and data set fs view are kept in sync, when the fs view is refreshed, the underneath metadata is also refreshed synchronouly. The freshness of the metadata follows the same rules as data fs view: 1. if the fs view is local, the visibility is based on the client table metadata client's latest commit 2. if the fs view is remote, the timeline server would #sync the fs view and metadata together based on the lagging server local timeline From the perspective of client, no need to care about the refresh action anymore no matter whether the metadata table is enabled or not. Removes the timeline refresh has a benifit: if avoids unncecessary #refresh of the remote fs view, if all the clients send request to
1 parent 329da34 commit db8e1c3

13 files changed

Lines changed: 40 additions & 69 deletions

File tree

hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, Stri
296296
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
297297
HoodieWriteConfig config = client.getConfig();
298298
HoodieEngineContext context = client.getEngineContext();
299-
HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
299+
HoodieSparkTable table = HoodieSparkTable.create(config, context);
300300
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
301301
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
302302
return 0;

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -296,11 +296,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
296296
}
297297
}
298298

299-
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
300-
return createTable(config, hadoopConf, false);
301-
}
302-
303-
protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
299+
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);
304300

305301
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
306302
try {
@@ -365,7 +361,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
365361
*/
366362
protected void rollbackFailedBootstrap() {
367363
LOG.info("Rolling back pending bootstrap if present");
368-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
364+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
369365
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
370366
Option<String> instant = Option.fromJavaOptional(
371367
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
@@ -542,9 +538,6 @@ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata me
542538
return;
543539
}
544540
if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) {
545-
if (config.isMetadataTableEnabled()) {
546-
table.getHoodieView().sync();
547-
}
548541
// Do an inline compaction if enabled
549542
if (config.inlineCompactionEnabled()) {
550543
runAnyPendingCompactions(table);
@@ -634,7 +627,7 @@ protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArch
634627
* Run any pending compactions.
635628
*/
636629
public void runAnyPendingCompactions() {
637-
runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled()));
630+
runAnyPendingCompactions(createTable(config, hadoopConf));
638631
}
639632

640633
/**
@@ -644,7 +637,7 @@ public void runAnyPendingCompactions() {
644637
* @param comment - Comment for the savepoint
645638
*/
646639
public void savepoint(String user, String comment) {
647-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
640+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
648641
if (table.getCompletedCommitsTimeline().empty()) {
649642
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
650643
}
@@ -668,7 +661,7 @@ public void savepoint(String user, String comment) {
668661
* @param comment - Comment for the savepoint
669662
*/
670663
public void savepoint(String instantTime, String user, String comment) {
671-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
664+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
672665
table.savepoint(context, instantTime, user, comment);
673666
}
674667

@@ -680,7 +673,7 @@ public void savepoint(String instantTime, String user, String comment) {
680673
* @return true if the savepoint was deleted successfully
681674
*/
682675
public void deleteSavepoint(String savepointTime) {
683-
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
676+
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
684677
SavepointHelpers.deleteSavepoint(table, savepointTime);
685678
}
686679

@@ -1012,7 +1005,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
10121005
*/
10131006
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
10141007
String instantTime = HoodieActiveTimeline.createNewInstantTime();
1015-
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
1008+
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf)
10161009
.scheduleIndexing(context, instantTime, partitionTypes);
10171010
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
10181011
}
@@ -1024,7 +1017,7 @@ public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionType
10241017
* @return {@link Option<HoodieIndexCommitMetadata>} after successful indexing.
10251018
*/
10261019
public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
1027-
return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime);
1020+
return createTable(config, hadoopConf).index(context, indexInstantTime);
10281021
}
10291022

10301023
/**
@@ -1339,17 +1332,17 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
13391332
return Option.empty();
13401333
case CLUSTER:
13411334
LOG.info("Scheduling clustering at instant time :" + instantTime);
1342-
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
1335+
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
13431336
.scheduleClustering(context, instantTime, extraMetadata);
13441337
return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
13451338
case COMPACT:
13461339
LOG.info("Scheduling compaction at instant time :" + instantTime);
1347-
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
1340+
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
13481341
.scheduleCompaction(context, instantTime, extraMetadata);
13491342
return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
13501343
case CLEAN:
13511344
LOG.info("Scheduling cleaning at instant time :" + instantTime);
1352-
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
1345+
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
13531346
.scheduleCleaning(context, instantTime, extraMetadata);
13541347
return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
13551348
default:
@@ -1702,6 +1695,6 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
17021695
// try to save history schemas
17031696
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
17041697
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
1705-
commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), commitActionType);
1698+
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
17061699
}
17071700
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,6 @@ public HoodieCleanMetadata execute() {
261261
}
262262
}
263263
table.getMetaClient().reloadActiveTimeline();
264-
if (config.isMetadataTableEnabled()) {
265-
table.getHoodieView().sync();
266-
}
267264
});
268265
}
269266

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Optio
117117
}
118118

119119
@Override
120-
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf,
121-
boolean refreshTimeline) {
120+
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
122121
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
123122
}
124123

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,6 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
6262
public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
6363
HoodieFlinkEngineContext context,
6464
HoodieTableMetaClient metaClient) {
65-
return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled());
66-
}
67-
68-
public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
69-
HoodieFlinkEngineContext context,
70-
HoodieTableMetaClient metaClient,
71-
boolean refreshTimeline) {
7265
final HoodieFlinkTable<T> hoodieFlinkTable;
7366
switch (metaClient.getTableType()) {
7467
case COPY_ON_WRITE:
@@ -80,9 +73,6 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
8073
default:
8174
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
8275
}
83-
if (refreshTimeline) {
84-
hoodieFlinkTable.getHoodieView().sync();
85-
}
8676
return hoodieFlinkTable;
8777
}
8878

hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering
104104
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
105105
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
106106
HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance());
107-
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false);
107+
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
108108
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);
109109

110110
// Create some partitions, and put some files

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,7 @@ public boolean commit(String instantTime,
8989
}
9090

9191
@Override
92-
protected HoodieTable createTable(HoodieWriteConfig config,
93-
Configuration hadoopConf,
94-
boolean refreshTimeline) {
92+
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
9593
return HoodieJavaTable.create(config, context);
9694
}
9795

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,8 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op
123123
}
124124

125125
@Override
126-
protected HoodieTable createTable(HoodieWriteConfig config,
127-
Configuration hadoopConf,
128-
boolean refreshTimeline) {
129-
return HoodieSparkTable.create(config, context, refreshTimeline);
126+
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
127+
return HoodieSparkTable.create(config, context);
130128
}
131129

132130
@Override
@@ -333,7 +331,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata,
333331

334332
@Override
335333
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
336-
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, true);
334+
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
337335
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
338336
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
339337
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
@@ -352,7 +350,7 @@ protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionIns
352350

353351
@Override
354352
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
355-
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled());
353+
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
356354
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
357355
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
358356
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
@@ -434,7 +432,7 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<Strin
434432
}
435433

436434
// Create a Hoodie table which encapsulated the commits and files visible
437-
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
435+
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
438436
}
439437

440438
/**

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,30 +54,18 @@ protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context
5454
}
5555

5656
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
57-
return create(config, context, false);
58-
}
59-
60-
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context,
61-
boolean refreshTimeline) {
6257
HoodieTableMetaClient metaClient =
6358
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
6459
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
6560
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
6661
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
6762
.setProperties(config.getProps()).build();
68-
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
63+
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
6964
}
7065

7166
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
7267
HoodieSparkEngineContext context,
7368
HoodieTableMetaClient metaClient) {
74-
return create(config, context, metaClient, false);
75-
}
76-
77-
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
78-
HoodieSparkEngineContext context,
79-
HoodieTableMetaClient metaClient,
80-
boolean refreshTimeline) {
8169
HoodieSparkTable<T> hoodieSparkTable;
8270
switch (metaClient.getTableType()) {
8371
case COPY_ON_WRITE:
@@ -89,9 +77,6 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW
8977
default:
9078
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
9179
}
92-
if (refreshTimeline) {
93-
hoodieSparkTable.getHoodieView().sync();
94-
}
9580
return hoodieSparkTable;
9681
}
9782

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private void verifyBaseMetadataTable() throws IOException {
111111
assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
112112

113113
// Files within each partition should match
114-
HoodieTable table = HoodieSparkTable.create(writeConfig, context, true);
114+
HoodieTable table = HoodieSparkTable.create(writeConfig, context);
115115
TableFileSystemView tableView = table.getHoodieView();
116116
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
117117
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);

0 commit comments

Comments
 (0)