Skip to content

Commit 767c196

Browse files
authored
[HUDI-4303] Adding 4 to 5 upgrade handler to check for old deprecated "default" partition value (#6248)
- Added FourToFiveUpgradeHandler to detect hudi tables with "default" partition and throwing exception. - Added a new write config ("hoodie.skip.default.partition.validation") when enabled, will bypass the above validation. If users have a hudi table where "default" partition was created intentionally and not as sentinel, they can enable this config to get past the validation.
1 parent a76f694 commit 767c196

9 files changed

Lines changed: 230 additions & 7 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,13 @@ public class HoodieWriteConfig extends HoodieConfig {
480480
.sinceVersion("0.11.0")
481481
.withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services.");
482482

483+
public static final ConfigProperty<Boolean> SKIP_DEFAULT_PARTITION_VALIDATION = ConfigProperty
484+
.key("hoodie.skip.default.partition.validation")
485+
.defaultValue(false)
486+
.sinceVersion("0.12.0")
487+
.withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. "
488+
+ "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation");
489+
483490
private ConsistencyGuardConfig consistencyGuardConfig;
484491
private FileSystemRetryConfig fileSystemRetryConfig;
485492

@@ -2038,6 +2045,11 @@ public WriteConcurrencyMode getWriteConcurrencyMode() {
20382045
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
20392046
}
20402047

2048+
// misc configs
2049+
public Boolean doSkipDefaultPartitionValidation() {
2050+
return getBoolean(SKIP_DEFAULT_PARTITION_VALIDATION);
2051+
}
2052+
20412053
/**
20422054
* Are any table services configured to run inline for both scheduling and execution?
20432055
*
@@ -2517,6 +2529,11 @@ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
25172529
return this;
25182530
}
25192531

2532+
public Builder doSkipDefaultPartitionValidation(boolean skipDefaultPartitionValidation) {
2533+
writeConfig.setValue(SKIP_DEFAULT_PARTITION_VALIDATION, String.valueOf(skipDefaultPartitionValidation));
2534+
return this;
2535+
}
2536+
25202537
protected void setDefaults() {
25212538
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
25222539
// Check for mandatory properties
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.table.upgrade;
21+
22+
import org.apache.hudi.common.config.ConfigProperty;
23+
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.config.HoodieWriteConfig;
25+
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
public class FiveToFourDowngradeHandler implements DowngradeHandler {
30+
31+
@Override
32+
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
33+
return new HashMap<>();
34+
}
35+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.table.upgrade;
21+
22+
import org.apache.hudi.common.config.ConfigProperty;
23+
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.config.HoodieWriteConfig;
25+
import org.apache.hudi.exception.HoodieException;
26+
27+
import org.apache.hadoop.fs.FileSystem;
28+
import org.apache.hadoop.fs.Path;
29+
import org.apache.log4j.LogManager;
30+
import org.apache.log4j.Logger;
31+
32+
import java.io.IOException;
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
36+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
37+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH;
38+
39+
/**
40+
* Upgrade handler to upgrade Hudi's table version from 4 to 5.
41+
*/
42+
public class FourToFiveUpgradeHandler implements UpgradeHandler {
43+
44+
private static final Logger LOG = LogManager.getLogger(FourToFiveUpgradeHandler.class);
45+
46+
@Override
47+
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
48+
try {
49+
FileSystem fs = new Path(config.getBasePath()).getFileSystem(context.getHadoopConf().get());
50+
if (!config.doSkipDefaultPartitionValidation() && fs.exists(new Path(config.getBasePath() + "/" + DEPRECATED_DEFAULT_PARTITION_PATH))) {
51+
LOG.error(String.format("\"%s\" partition detected. From 0.12, we are changing the default partition in hudi to %s "
52+
+ " Please read and write back the data in \"%s\" partition in hudi to new partition path \"%s\". \"\n"
53+
+ " Sample spark command to use to re-write the data: \n\n"
54+
+ " val df = spark.read.format(\"hudi\").load(HUDI_TABLE_PATH).filter(col(\"PARTITION_PATH_COLUMN\") === \"%s\"); \t \n\n"
55+
+ " df.drop(\"_hoodie_commit_time\").drop(\"_hoodie_commit_seqno\").drop(\"_hoodie_record_key\")\"\n"
56+
+ " .drop(\"_hoodie_partition_path\").drop(\"_hoodie_file_name\").withColumn(PARTITION_PATH_COLUMN,\"%s\")\"\n"
57+
+ " .write.options(writeOptions).mode(Append).save(HUDI_TABLE_PATH);\t\n\"\n"
58+
+ " Please fix values for PARTITION_PATH_COLUMN, HUDI_TABLE_PATH and set all write configs in above command before running. "
59+
+ " Also do delete the records in old partition once above command succeeds. "
60+
+ " Sample spark command to delete old partition records: \n\n"
61+
+ " val df = spark.read.format(\"hudi\").load(HUDI_TABLE_PATH).filter(col(\"PARTITION_PATH_COLUMN\") === \"%s\"); \t \n\n"
62+
+ " df.write.option(\"hoodie.datasource.write.operation\",\"delete\").options(writeOptions).mode(Append).save(HUDI_TABLE_PATH);\t\n\"\n",
63+
DEPRECATED_DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH, DEPRECATED_DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH,
64+
DEPRECATED_DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH, DEPRECATED_DEFAULT_PARTITION_PATH));
65+
throw new HoodieException(String.format("Old deprecated \"%s\" partition found in hudi table. This needs a migration step before we can upgrade ",
66+
DEPRECATED_DEFAULT_PARTITION_PATH));
67+
}
68+
} catch (IOException e) {
69+
LOG.error("Fetching file system instance failed", e);
70+
throw new HoodieException("Fetching FileSystem instance failed ", e);
71+
}
72+
return new HashMap<>();
73+
}
74+
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, Ho
145145
return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
146146
} else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.FOUR) {
147147
return new ThreeToFourUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
148+
} else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.FIVE) {
149+
return new FourToFiveUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
148150
} else {
149151
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
150152
}
@@ -159,6 +161,8 @@ protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion,
159161
return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
160162
} else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.THREE) {
161163
return new FourToThreeDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
164+
} else if (fromVersion == HoodieTableVersion.FIVE && toVersion == HoodieTableVersion.FOUR) {
165+
return new FiveToFourDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
162166
} else {
163167
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
164168
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2015,7 +2015,7 @@ public void testUpgradeDowngrade() throws IOException {
20152015
assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime());
20162016

20172017
initMetaClient();
2018-
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode());
2018+
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
20192019
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
20202020
FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
20212021
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
@@ -2095,7 +2095,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte
20952095
}
20962096

20972097
initMetaClient();
2098-
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode());
2098+
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
20992099
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
21002100
FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
21012101
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hudi.common.util.Option;
3939
import org.apache.hudi.common.util.collection.Pair;
4040
import org.apache.hudi.config.HoodieWriteConfig;
41+
import org.apache.hudi.exception.HoodieException;
4142
import org.apache.hudi.keygen.SimpleKeyGenerator;
4243
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
4344
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -82,8 +83,10 @@
8283
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
8384
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
8485
import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
86+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH;
8587
import static org.junit.jupiter.api.Assertions.assertEquals;
8688
import static org.junit.jupiter.api.Assertions.assertFalse;
89+
import static org.junit.jupiter.api.Assertions.assertThrows;
8790
import static org.junit.jupiter.api.Assertions.assertTrue;
8891

8992
/**
@@ -326,10 +329,78 @@ public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOExcepti
326329
assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()));
327330
}
328331

332+
@Test
333+
public void testUpgradeFourtoFive() throws Exception {
334+
testUpgradeFourToFiveInternal(false, false);
335+
}
336+
337+
@Test
338+
public void testUpgradeFourtoFiveWithDefaultPartition() throws Exception {
339+
testUpgradeFourToFiveInternal(true, false);
340+
}
341+
342+
@Test
343+
public void testUpgradeFourtoFiveWithDefaultPartitionWithSkipValidation() throws Exception {
344+
testUpgradeFourToFiveInternal(true, true);
345+
}
346+
347+
private void testUpgradeFourToFiveInternal(boolean assertDefaultPartition, boolean skipDefaultPartitionValidation) throws Exception {
348+
String tableName = metaClient.getTableConfig().getTableName();
349+
// clean up and re instantiate meta client w/ right table props
350+
cleanUp();
351+
initSparkContexts();
352+
initPath();
353+
initTestDataGenerator();
354+
355+
Map<String, String> params = new HashMap<>();
356+
addNewTableParamsToProps(params, tableName);
357+
Properties properties = new Properties();
358+
params.forEach((k,v) -> properties.setProperty(k, v));
359+
360+
initMetaClient(getTableType(), properties);
361+
// init config, table and client.
362+
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false)
363+
.doSkipDefaultPartitionValidation(skipDefaultPartitionValidation).withProps(params).build();
364+
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
365+
// Write inserts
366+
doInsert(client);
367+
368+
if (assertDefaultPartition) {
369+
doInsertWithDefaultPartition(client);
370+
}
371+
372+
// downgrade table props
373+
downgradeTableConfigsFromFiveToFour(cfg);
374+
375+
// perform upgrade
376+
if (assertDefaultPartition && !skipDefaultPartitionValidation) {
377+
// if "default" partition is present, upgrade should fail
378+
assertThrows(HoodieException.class, () -> new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
379+
.run(HoodieTableVersion.FIVE, null), "Upgrade from 4 to 5 is expected to fail if \"default\" partition is present.");
380+
} else {
381+
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
382+
.run(HoodieTableVersion.FIVE, null);
383+
384+
// verify hoodie.table.version got upgraded
385+
metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()).build();
386+
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
387+
assertTableVersionFromPropertyFile(HoodieTableVersion.FIVE);
388+
389+
// verify table props
390+
assertTableProps(cfg);
391+
}
392+
}
393+
329394
private void addNewTableParamsToProps(Map<String, String> params) {
395+
addNewTableParamsToProps(params, metaClient.getTableConfig().getTableName());
396+
}
397+
398+
private void addNewTableParamsToProps(Map<String, String> params, String tableName) {
330399
params.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
400+
params.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "uuid");
401+
params.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
331402
params.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
332-
params.put(HoodieTableConfig.NAME.key(), metaClient.getTableConfig().getTableName());
403+
params.put(HoodieTableConfig.NAME.key(), tableName);
333404
params.put(BASE_FILE_FORMAT.key(), BASE_FILE_FORMAT.defaultValue().name());
334405
}
335406

@@ -342,6 +413,16 @@ private void doInsert(SparkRDDWriteClient client) {
342413
client.insert(writeRecords, commit1).collect();
343414
}
344415

416+
private void doInsertWithDefaultPartition(SparkRDDWriteClient client) {
417+
// Write 1 (only inserts)
418+
dataGen = new HoodieTestDataGenerator(new String[]{DEPRECATED_DEFAULT_PARTITION_PATH});
419+
String commit1 = "005";
420+
client.startCommitWithTime(commit1);
421+
List<HoodieRecord> records = dataGen.generateInserts(commit1, 100);
422+
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
423+
client.insert(writeRecords, commit1).collect();
424+
}
425+
345426
private void downgradeTableConfigsFromTwoToOne(HoodieWriteConfig cfg) throws IOException {
346427
Properties properties = new Properties(cfg.getProps());
347428
properties.remove(HoodieTableConfig.RECORDKEY_FIELDS.key());
@@ -368,6 +449,15 @@ private void downgradeTableConfigsFromThreeToTwo(HoodieWriteConfig cfg) throws I
368449
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.TWO);
369450
}
370451

452+
private void downgradeTableConfigsFromFiveToFour(HoodieWriteConfig cfg) throws IOException {
453+
Properties properties = new Properties();
454+
cfg.getProps().forEach((k,v) -> properties.setProperty((String) k, (String) v));
455+
properties.setProperty(HoodieTableConfig.VERSION.key(), "4");
456+
metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType(), properties);
457+
// set hoodie.table.version to 4 in hoodie.properties file
458+
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR);
459+
}
460+
371461
private void assertTableProps(HoodieWriteConfig cfg) {
372462
HoodieTableConfig tableConfig = metaClient.getTableConfig();
373463
Properties originalProps = cfg.getProps();

hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public enum HoodieTableVersion {
3636
// 0.10.0 onwards
3737
THREE(3),
3838
// 0.11.0 onwards
39-
FOUR(4);
39+
FOUR(4),
40+
// 0.12.0 onwards
41+
FIVE(5);
4042

4143
private final int versionCode;
4244

@@ -49,7 +51,7 @@ public int versionCode() {
4951
}
5052

5153
public static HoodieTableVersion current() {
52-
return FOUR;
54+
return FIVE;
5355
}
5456

5557
public static HoodieTableVersion versionFromCode(int versionCode) {

hudi-common/src/main/java/org/apache/hudi/common/util/PartitionPathEncodeUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
*/
2626
public class PartitionPathEncodeUtils {
2727

28+
public static final String DEPRECATED_DEFAULT_PARTITION_PATH = "default";
2829
public static final String DEFAULT_PARTITION_PATH = "__HIVE_DEFAULT_PARTITION__";
2930

3031
static BitSet charToEscape = new BitSet(128);

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase {
5555
.build
5656

5757
// verify hoodie.table.version of the original table
58-
assertResult(HoodieTableVersion.FOUR.versionCode) {
58+
assertResult(HoodieTableVersion.FIVE.versionCode) {
5959
metaClient.getTableConfig.getTableVersion.versionCode()
6060
}
61-
assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.FOUR.versionCode)
61+
assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.FIVE.versionCode)
6262

6363
// downgrade table to ZERO
6464
checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'ZERO')""")(Seq(true))

0 commit comments

Comments
 (0)