From 62aeaf9d920917a53041bc84593a3a39d8df11b2 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Wed, 20 Oct 2021 08:18:55 -0700 Subject: [PATCH 1/4] [HUDI-2591] Bootstrap metadata table only if upgrade / downgrade is not required. --- .../apache/hudi/client/SparkRDDWriteClient.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index b8437d39f8995..361386ee9c58f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -96,10 +96,19 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService); + bootstrapMetadataTable(); + } + + private void bootstrapMetadataTable() { if (config.isMetadataTableEnabled()) { - // If the metadata table does not exist, it should be bootstrapped here - // TODO: Check if we can remove this requirement - auto bootstrap on commit - SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context); + // Defer bootstrap if upgrade / downgrade is pending + HoodieTableMetaClient metaClient = createMetaClient(true); + UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( + metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); + if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { + // TODO: Check if we can remove this requirement - auto bootstrap on commit + SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context); + } } } @@ -213,7 +222,6 @@ public HoodieWriteResult insertOverwrite(JavaRDD> records, final return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } - /** * Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table. From 12596e6dfd6246818cd86d2f1134f12c919dea22 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Fri, 5 Nov 2021 14:23:00 -0700 Subject: [PATCH 2/4] Fixed the constructor calls to ensure that the metadata table bootstrap call is not missed out in any code path. --- .../main/java/org/apache/hudi/client/SparkRDDWriteClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 361386ee9c58f..d2dd1d7fbec34 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -84,13 +84,13 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig client @Deprecated public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) { - super(context, writeConfig); + this(context, writeConfig, Option.empty()); } @Deprecated public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, Option timelineService) { - super(context, writeConfig, timelineService); + this(context, writeConfig, timelineService); } public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, From d3051325eabfe3dc00075ff2c22fd2b587cb4c93 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 8 Nov 2021 08:38:10 -0800 Subject: [PATCH 3/4] Enabled auto-bootstrapping of table post upgrade/downgrade. The timestamp of the current instant (under progress) is passed to the metadata bootstrap code which ignores this specific in-progress instant. --- .../HoodieBackedTableMetadataWriter.java | 59 +++++++++++++------ .../FlinkHoodieBackedTableMetadataWriter.java | 7 ++- .../hudi/client/SparkRDDWriteClient.java | 12 ++-- .../SparkHoodieBackedTableMetadataWriter.java | 41 +++++++++---- .../apache/hudi/table/HoodieSparkTable.java | 2 +- .../functional/TestHoodieBackedMetadata.java | 16 ++--- 6 files changed, 88 insertions(+), 49 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index eb0c6ea899bcc..726592fc999ca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -104,16 +104,18 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Hudi backed table metadata writer. * - * @param hadoopConf - Hadoop configuration to use for the metadata writer - * @param writeConfig - Writer config - * @param engineContext - Engine context - * @param actionMetadata - Optional action metadata to help decide bootstrap operations - * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param hadoopConf - Hadoop configuration to use for the metadata writer + * @param writeConfig - Writer config + * @param engineContext - Engine context + * @param actionMetadata - Optional action metadata to help decide bootstrap operations + * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param instantInProgressTimestamp - Timestamp of any instant in progress */ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, - Option actionMetadata) { + Option actionMetadata, + Option instantInProgressTimestamp) { this.dataWriteConfig = writeConfig; this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(hadoopConf); @@ -137,7 +139,7 @@ protected HoodieBackedTableMetadataWriter(Configu initRegistry(); this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); - initialize(engineContext, actionMetadata); + initialize(engineContext, actionMetadata, instantInProgressTimestamp); initTableMetadata(); } else { enabled = false; @@ -145,6 +147,11 @@ protected HoodieBackedTableMetadataWriter(Configu } } + public HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, + HoodieEngineContext engineContext) { + this(hadoopConf, writeConfig, engineContext, Option.empty(), Option.empty()); + } + protected abstract void initRegistry(); /** @@ -234,11 +241,17 @@ public HoodieBackedTableMetadata metadata() { /** * Initialize the metadata table if it does not exist. - *

+ * * If the metadata table does not exist, then file and partition listing is used to bootstrap the table. + * + * @param engineContext + * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase + * @param instantInProgressTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored + * while deciding to bootstrap the metadata table. */ protected abstract void initialize(HoodieEngineContext engineContext, - Option actionMetadata); + Option actionMetadata, + Option instantInProgressTimestamp); public void initTableMetadata() { try { @@ -260,11 +273,13 @@ public void initTableMetadata() { * @param dataMetaClient - Meta client for the data table * @param actionMetadata - Optional action metadata * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param instantInProgressTimestamp - Timestamp of an instant in progress on the dataset. This instant is ignored * @throws IOException */ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, - Option actionMetadata) throws IOException { + Option actionMetadata, + Option instantInProgressTimestamp) throws IOException { HoodieTimer timer = new HoodieTimer().startTimer(); boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), @@ -291,7 +306,7 @@ protected void bootstrapIfNeeded(HoodieEngineCont if (!exists) { // Initialize for the first time by listing partitions and files directly from the file system - if (bootstrapFromFilesystem(engineContext, dataMetaClient)) { + if (bootstrapFromFilesystem(engineContext, dataMetaClient, instantInProgressTimestamp)) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); } } @@ -347,23 +362,29 @@ private boolean isBootstrapNeeded(Option instantInProgressTimestamp) throws IOException { ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); // We can only bootstrap if there are no pending operations on the dataset - Option pendingDataInstant = Option.fromJavaOptional(dataMetaClient.getActiveTimeline() - .getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst()); - if (pendingDataInstant.isPresent()) { + List pendingDataInstant = dataMetaClient.getActiveTimeline() + .getInstants().filter(i -> !i.isCompleted()) + .filter(i -> !instantInProgressTimestamp.isPresent() || !i.getTimestamp().equals(instantInProgressTimestamp.get())) + .collect(Collectors.toList()); + + if (!pendingDataInstant.isEmpty()) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); - LOG.warn("Cannot bootstrap metadata table as operation is in progress in dataset: " + pendingDataInstant.get()); + LOG.warn("Cannot bootstrap metadata table as operation(s) are in progress on the dataset: " + + Arrays.toString(pendingDataInstant.toArray())); return false; } // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the latest commit timestamp. - String createInstantTime = dataMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst() - .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); + // Otherwise, we use the timestamp of the latest completed action. + String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() + .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); HoodieTableMetaClient.withPropertyBuilder() diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9ae3e622d35da..c9a70d8a0c75f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -62,7 +62,7 @@ FlinkHoodieBackedTableMetadataWriter(Configuratio HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, Option actionMetadata) { - super(hadoopConf, writeConfig, engineContext, actionMetadata); + super(hadoopConf, writeConfig, engineContext, actionMetadata, Option.empty()); } @Override @@ -78,10 +78,11 @@ protected void initRegistry() { @Override protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata) { + Option actionMetadata, + Option instantInProgressTimestamp) { try { if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, instantInProgressTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index d2dd1d7fbec34..c1f45f13f547a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -96,10 +96,10 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService); - bootstrapMetadataTable(); + bootstrapMetadataTable(Option.empty()); } - private void bootstrapMetadataTable() { + private void bootstrapMetadataTable(Option instantInProgressTimestamp) { if (config.isMetadataTableEnabled()) { // Defer bootstrap if upgrade / downgrade is pending HoodieTableMetaClient metaClient = createMetaClient(true); @@ -107,7 +107,8 @@ private void bootstrapMetadataTable() { metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { // TODO: Check if we can remove this requirement - auto bootstrap on commit - SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context); + SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(), + instantInProgressTimestamp); } } } @@ -377,7 +378,7 @@ public HoodieWriteMetadata> cluster(String clusteringInstan private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD writeStatuses, HoodieTable>, JavaRDD, JavaRDD> table, String clusteringCommitTime) { - + List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); @@ -447,6 +448,9 @@ protected HoodieTable>, JavaRDD, JavaRDD HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, - Option actionMetadata) { - return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata); + Option actionMetadata, + Option instantInProgressTimestamp) { + return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, + instantInProgressTimestamp); + } + + public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, + HoodieEngineContext context) { + return create(conf, writeConfig, context, Option.empty(), Option.empty()); } SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, - Option actionMetadata) { - super(hadoopConf, writeConfig, engineContext, actionMetadata); + Option actionMetadata, + Option instantInProgressTimestamp) { + super(hadoopConf, writeConfig, engineContext, actionMetadata, instantInProgressTimestamp); } @Override @@ -84,7 +102,8 @@ protected void initRegistry() { @Override protected void initialize(HoodieEngineContext engineContext, - Option actionMetadata) { + Option actionMetadata, + Option instantInProgressTimestamp) { try { metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { if (registry instanceof DistributedRegistry) { @@ -94,7 +113,7 @@ protected void initialize(HoodieEngineContext eng }); if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, instantInProgressTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index abbfd316741a2..f14d39c700643 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -128,7 +128,7 @@ public Option getMetad } if (isMetadataTableAvailable) { return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, - actionMetadata)); + actionMetadata, Option.empty())); } else { return Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index de757a0800905..3f2fe44bb6789 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -935,25 +935,19 @@ public void testUpgradeDowngrade() throws IOException { // set hoodie.table.version to 2 in hoodie.properties file changeTableVersion(HoodieTableVersion.TWO); - // With next commit the table should be deleted (as part of upgrade) + // With next commit the table should be deleted (as part of upgrade) and then re-bootstrapped automatically commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); metaClient.reloadActiveTimeline(); + FileStatus prevStatus = fs.getFileStatus(new Path(metadataTableBasePath)); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { records = dataGen.generateInserts(commitTimestamp, 5); client.startCommitWithTime(commitTimestamp); writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamp).collect(); assertNoWriteErrors(writeStatuses); } - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); - - // With next commit the table should be re-bootstrapped (currently in the constructor. To be changed) - commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { - records = dataGen.generateInserts(commitTimestamp, 5); - client.startCommitWithTime(commitTimestamp); - writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), commitTimestamp).collect(); - assertNoWriteErrors(writeStatuses); - } + assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); + FileStatus currentStatus = fs.getFileStatus(new Path(metadataTableBasePath)); + assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime()); initMetaClient(); assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); From efc802dc91182396b87ebba70baaa503b6955054 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Tue, 9 Nov 2021 00:35:51 -0800 Subject: [PATCH 4/4] Addressed review comments. --- .../HoodieBackedTableMetadataWriter.java | 32 +++++++++---------- .../FlinkHoodieBackedTableMetadataWriter.java | 4 +-- .../hudi/client/SparkRDDWriteClient.java | 8 ++--- .../SparkHoodieBackedTableMetadataWriter.java | 16 +++++----- .../functional/TestHoodieBackedMetadata.java | 13 +------- 5 files changed, 31 insertions(+), 42 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 726592fc999ca..562988fde3fe8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -104,18 +104,18 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta /** * Hudi backed table metadata writer. * - * @param hadoopConf - Hadoop configuration to use for the metadata writer - * @param writeConfig - Writer config - * @param engineContext - Engine context - * @param actionMetadata - Optional action metadata to help decide bootstrap operations - * @param - Action metadata types extending Avro generated SpecificRecordBase - * @param instantInProgressTimestamp - Timestamp of any instant in progress + * @param hadoopConf - Hadoop configuration to use for the metadata writer + * @param writeConfig - Writer config + * @param engineContext - Engine context + * @param actionMetadata - Optional action metadata to help decide bootstrap operations + * @param - Action metadata types extending Avro generated SpecificRecordBase + * @param inflightInstantTimestamp - Timestamp of any instant in progress */ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, Option actionMetadata, - Option instantInProgressTimestamp) { + Option inflightInstantTimestamp) { this.dataWriteConfig = writeConfig; this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(hadoopConf); @@ -139,7 +139,7 @@ protected HoodieBackedTableMetadataWriter(Configu initRegistry(); this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); - initialize(engineContext, actionMetadata, instantInProgressTimestamp); + initialize(engineContext, actionMetadata, inflightInstantTimestamp); initTableMetadata(); } else { enabled = false; @@ -246,12 +246,12 @@ public HoodieBackedTableMetadata metadata() { * * @param engineContext * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase - * @param instantInProgressTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored + * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored * while deciding to bootstrap the metadata table. */ protected abstract void initialize(HoodieEngineContext engineContext, Option actionMetadata, - Option instantInProgressTimestamp); + Option inflightInstantTimestamp); public void initTableMetadata() { try { @@ -273,13 +273,13 @@ public void initTableMetadata() { * @param dataMetaClient - Meta client for the data table * @param actionMetadata - Optional action metadata * @param - Action metadata types extending Avro generated SpecificRecordBase - * @param instantInProgressTimestamp - Timestamp of an instant in progress on the dataset. This instant is ignored + * @param inflightInstantTimestamp - Timestamp of an instant in progress on the dataset. This instant is ignored * @throws IOException */ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient, Option actionMetadata, - Option instantInProgressTimestamp) throws IOException { + Option inflightInstantTimestamp) throws IOException { HoodieTimer timer = new HoodieTimer().startTimer(); boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), @@ -306,7 +306,7 @@ protected void bootstrapIfNeeded(HoodieEngineCont if (!exists) { // Initialize for the first time by listing partitions and files directly from the file system - if (bootstrapFromFilesystem(engineContext, dataMetaClient, instantInProgressTimestamp)) { + if (bootstrapFromFilesystem(engineContext, dataMetaClient, inflightInstantTimestamp)) { metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); } } @@ -362,16 +362,16 @@ private boolean isBootstrapNeeded(Option instantInProgressTimestamp) throws IOException { + Option inflightInstantTimestamp) throws IOException { ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); // We can only bootstrap if there are no pending operations on the dataset List pendingDataInstant = dataMetaClient.getActiveTimeline() .getInstants().filter(i -> !i.isCompleted()) - .filter(i -> !instantInProgressTimestamp.isPresent() || !i.getTimestamp().equals(instantInProgressTimestamp.get())) + .filter(i -> !inflightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inflightInstantTimestamp.get())) .collect(Collectors.toList()); if (!pendingDataInstant.isEmpty()) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index c9a70d8a0c75f..d30455f47f29a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -79,10 +79,10 @@ protected void initRegistry() { @Override protected void initialize(HoodieEngineContext engineContext, Option actionMetadata, - Option instantInProgressTimestamp) { + Option inflightInstantTimestamp) { try { if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, instantInProgressTimestamp); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index c1f45f13f547a..9aee143c1a617 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -96,10 +96,10 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService); - bootstrapMetadataTable(Option.empty()); + initializeMetadataTable(Option.empty()); } - private void bootstrapMetadataTable(Option instantInProgressTimestamp) { + private void initializeMetadataTable(Option inflightInstantTimestamp) { if (config.isMetadataTableEnabled()) { // Defer bootstrap if upgrade / downgrade is pending HoodieTableMetaClient metaClient = createMetaClient(true); @@ -108,7 +108,7 @@ private void bootstrapMetadataTable(Option instantInProgressTimestamp) { if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { // TODO: Check if we can remove this requirement - auto bootstrap on commit SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(), - instantInProgressTimestamp); + inflightInstantTimestamp); } } } @@ -450,7 +450,7 @@ protected HoodieTable>, JavaRDD, JavaRDD HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context, Option actionMetadata, - Option instantInProgressTimestamp) { + Option inflightInstantTimestamp) { return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, - instantInProgressTimestamp); + inflightInstantTimestamp); } public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, @@ -81,8 +81,8 @@ SparkHoodieBackedTableMetadataWriter(Configuratio HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, Option actionMetadata, - Option instantInProgressTimestamp) { - super(hadoopConf, writeConfig, engineContext, actionMetadata, instantInProgressTimestamp); + Option inflightInstantTimestamp) { + super(hadoopConf, writeConfig, engineContext, actionMetadata, inflightInstantTimestamp); } @Override @@ -103,7 +103,7 @@ protected void initRegistry() { @Override protected void initialize(HoodieEngineContext engineContext, Option actionMetadata, - Option instantInProgressTimestamp) { + Option inflightInstantTimestamp) { try { metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { if (registry instanceof DistributedRegistry) { @@ -113,7 +113,7 @@ protected void initialize(HoodieEngineContext eng }); if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, instantInProgressTimestamp); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata, inflightInstantTimestamp); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 3f2fe44bb6789..6a1bdfc1e0ff6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1021,7 +1021,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte .withProperties(properties) .build(); - // With next commit the table should be deleted (as part of upgrade) and partial commit should be rolled back. + // With next commit the table should be re-bootstrapped and partial commit should be rolled back. metaClient.reloadActiveTimeline(); commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { @@ -1030,17 +1030,6 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); assertNoWriteErrors(writeStatuses.collect()); } - assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist"); - - // With next commit the table should be re-bootstrapped (currently in the constructor. To be changed) - commitTimestamp = HoodieActiveTimeline.createNewInstantTime(); - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { - records = dataGen.generateInserts(commitTimestamp, 5); - client.startCommitWithTime(commitTimestamp); - writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp); - assertNoWriteErrors(writeStatuses.collect()); - } - assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist"); initMetaClient(); assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());