Skip to content

Commit c7576f7

Browse files
authored
[HUDI-4130] Remove the upgrade/downgrade for flink #initTable (#5642)
1 parent 1da0b21 commit c7576f7

3 files changed

Lines changed: 10 additions & 4 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1551,7 +1551,7 @@ private void setWriteTimer(HoodieTable table) {
15511551
}
15521552
}
15531553

1554-
private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
1554+
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
15551555
UpgradeDowngrade upgradeDowngrade =
15561556
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);
15571557

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,14 +407,20 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<Strin
407407
return getHoodieTable();
408408
}
409409

410+
@Override
411+
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
412+
// do nothing.
413+
// flink executes the upgrade/downgrade once when initializing the first instant on start up,
414+
// no need to execute the upgrade/downgrade on each write in streaming.
415+
}
416+
410417
/**
411418
* Upgrade downgrade the Hoodie table.
412419
*
413420
* <p>This action should only be executed once for each commit.
414421
* The modification of the table properties is not thread safe.
415422
*/
416-
public void upgradeDowngrade(String instantTime) {
417-
HoodieTableMetaClient metaClient = createMetaClient(true);
423+
public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) {
418424
new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
419425
.run(HoodieTableVersion.current(), instantTime);
420426
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ private void initInstant(String instant) {
394394
// starts a new instant
395395
startInstant();
396396
// upgrade downgrade
397-
this.writeClient.upgradeDowngrade(this.instant);
397+
this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
398398
}, "initialize instant %s", instant);
399399
}
400400

0 commit comments

Comments
 (0)