Skip to content

Commit e44ae01

Browse files
codopeneverdizzy
authored andcommitted
[HUDI-4648] Support rename partition through CLI (apache#6569)
(cherry picked from commit d2c46fb)
1 parent 46047c6 commit e44ae01

3 files changed

Lines changed: 146 additions & 21 deletions

File tree

hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.apache.hudi.common.table.timeline.HoodieTimeline;
3434
import org.apache.hudi.common.util.CleanerUtils;
3535
import org.apache.hudi.common.util.Option;
36-
import org.apache.hudi.common.util.StringUtils;
3736
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
37+
import org.apache.hudi.common.util.StringUtils;
3838
import org.apache.hudi.exception.HoodieIOException;
3939

4040
import org.apache.avro.AvroRuntimeException;
@@ -289,4 +289,34 @@ public String repairDeprecatePartition(
289289
}
290290
return "Repair succeeded";
291291
}
292+
293+
@CliCommand(value = "rename partition",
294+
help = "Rename partition. Usage: rename partition --oldPartition <oldPartition> --newPartition <newPartition>")
295+
public String renamePartition(
296+
@CliOption(key = {"oldPartition"}, help = "Partition value to be renamed", mandatory = true,
297+
unspecifiedDefaultValue = "") String oldPartition,
298+
@CliOption(key = {"newPartition"}, help = "New partition value after rename", mandatory = true,
299+
unspecifiedDefaultValue = "") String newPartition,
300+
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
301+
unspecifiedDefaultValue = "") String sparkPropertiesPath,
302+
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
303+
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
304+
help = "Spark executor memory") final String sparkMemory) throws Exception {
305+
if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) {
306+
sparkPropertiesPath =
307+
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
308+
}
309+
310+
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
311+
sparkLauncher.addAppArgs(SparkMain.SparkCommand.RENAME_PARTITION.toString(), master, sparkMemory,
312+
HoodieCLI.getTableMetaClient().getBasePathV2().toString(), oldPartition, newPartition);
313+
Process process = sparkLauncher.launch();
314+
InputStreamConsumer.captureOutput(process);
315+
int exitCode = process.waitFor();
316+
317+
if (exitCode != 0) {
318+
return "rename partition failed!";
319+
}
320+
return "rename partition succeeded";
321+
}
292322
}

hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public class SparkMain {
9494
enum SparkCommand {
9595
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
9696
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
97-
CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE, REPAIR_DEPRECATED_PARTITION
97+
CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE,
98+
REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION
9899
}
99100

100101
public static void main(String[] args) throws Exception {
@@ -282,6 +283,10 @@ public static void main(String[] args) throws Exception {
282283
assert (args.length == 4);
283284
returnCode = repairDeprecatedPartition(jsc, args[3]);
284285
break;
286+
case RENAME_PARTITION:
287+
assert (args.length == 6);
288+
returnCode = renamePartition(jsc, args[3], args[4], args[5]);
289+
break;
285290
default:
286291
break;
287292
}
@@ -428,35 +433,77 @@ private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplica
428433

429434
public static int repairDeprecatedPartition(JavaSparkContext jsc, String basePath) {
430435
SQLContext sqlContext = new SQLContext(jsc);
431-
Dataset<Row> recordsToRewrite = sqlContext.read().option("hoodie.datasource.read.extract.partition.values.from.path","false").format("hudi").load(basePath)
432-
.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH + "'")
433-
.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD).drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
434-
.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
436+
Dataset<Row> recordsToRewrite = getRecordsToRewrite(basePath, PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH, sqlContext);
435437

436438
if (!recordsToRewrite.isEmpty()) {
437439
recordsToRewrite.cache();
438-
HoodieTableMetaClient metaClient =
439-
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
440-
441-
Map<String, String> propsMap = new HashMap<>();
442-
metaClient.getTableConfig().getProps().forEach((k, v) -> propsMap.put(k.toString(), v.toString()));
443-
propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), "true");
444-
propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), metaClient.getTableConfig().getRecordKeyFieldProp());
445-
propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), metaClient.getTableConfig().getPartitionFieldProp());
446-
propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), metaClient.getTableConfig().getKeyGeneratorClassName());
440+
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
441+
Map<String, String> propsMap = getPropsForRewrite(metaClient);
442+
rewriteRecordsToNewPartition(basePath, PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH, recordsToRewrite, metaClient, propsMap);
443+
// after re-writing, we can safely delete older data.
444+
deleteOlderPartition(basePath, PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH, recordsToRewrite, propsMap);
445+
}
446+
return 0;
447+
}
447448

448-
recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(),
449-
functions.lit(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)).write().options(propsMap)
450-
.option("hoodie.datasource.write.operation","insert").format("hudi").mode("Append").save(basePath);
449+
public static int renamePartition(JavaSparkContext jsc, String basePath, String oldPartition, String newPartition) {
450+
SQLContext sqlContext = new SQLContext(jsc);
451+
Dataset<Row> recordsToRewrite = getRecordsToRewrite(basePath, oldPartition, sqlContext);
451452

453+
if (!recordsToRewrite.isEmpty()) {
454+
recordsToRewrite.cache();
455+
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
456+
Map<String, String> propsMap = getPropsForRewrite(metaClient);
457+
rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap);
452458
// after re-writing, we can safely delete older data.
453-
propsMap.put("hoodie.datasource.write.partitions.to.delete", PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH);
454-
recordsToRewrite.write().options(propsMap).option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value()).format("hudi")
455-
.mode("Append").save(basePath);
459+
deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap);
456460
}
457461
return 0;
458462
}
459463

464+
private static void deleteOlderPartition(String basePath, String oldPartition, Dataset<Row> recordsToRewrite, Map<String, String> propsMap) {
465+
propsMap.put("hoodie.datasource.write.partitions.to.delete", oldPartition);
466+
recordsToRewrite.write()
467+
.options(propsMap)
468+
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value())
469+
.format("hudi")
470+
.mode("Append")
471+
.save(basePath);
472+
}
473+
474+
private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) {
475+
recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition))
476+
.write()
477+
.options(propsMap)
478+
.option("hoodie.datasource.write.operation", "insert")
479+
.format("hudi")
480+
.mode("Append")
481+
.save(basePath);
482+
}
483+
484+
private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) {
485+
return sqlContext.read()
486+
.option("hoodie.datasource.read.extract.partition.values.from.path", "false")
487+
.format("hudi")
488+
.load(basePath)
489+
.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'")
490+
.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
491+
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
492+
.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)
493+
.drop(HoodieRecord.FILENAME_METADATA_FIELD)
494+
.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
495+
}
496+
497+
private static Map<String, String> getPropsForRewrite(HoodieTableMetaClient metaClient) {
498+
Map<String, String> propsMap = new HashMap<>();
499+
metaClient.getTableConfig().getProps().forEach((k, v) -> propsMap.put(k.toString(), v.toString()));
500+
propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), "true");
501+
propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), metaClient.getTableConfig().getRecordKeyFieldProp());
502+
propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), metaClient.getTableConfig().getPartitionFieldProp());
503+
propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), metaClient.getTableConfig().getKeyGeneratorClassName());
504+
return propsMap;
505+
}
506+
460507
private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath,
461508
String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass,
462509
String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider,

hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import static org.apache.hudi.common.table.HoodieTableConfig.VERSION;
7474
import static org.apache.hudi.common.table.HoodieTableConfig.generateChecksum;
7575
import static org.apache.hudi.common.table.HoodieTableConfig.validateChecksum;
76+
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
7677
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
7778
import static org.junit.jupiter.api.Assertions.assertEquals;
7879
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -319,4 +320,51 @@ public void testRepairDeprecatedPartition() throws IOException {
319320
}
320321
}
321322

323+
@Test
324+
public void testRenamePartition() throws IOException {
325+
tablePath = tablePath + "/rename_partition_test/";
326+
HoodieTableMetaClient.withPropertyBuilder()
327+
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
328+
.setTableName(tableName())
329+
.setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue())
330+
.setPayloadClassName("org.apache.hudi.common.model.HoodieAvroPayload")
331+
.setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_1)
332+
.setPartitionFields("partition_path")
333+
.setRecordKeyFields("_row_key")
334+
.setKeyGeneratorClassProp(SimpleKeyGenerator.class.getCanonicalName())
335+
.initTable(HoodieCLI.conf, tablePath);
336+
337+
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
338+
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(TRIP_EXAMPLE_SCHEMA).build();
339+
340+
try (SparkRDDWriteClient client = new SparkRDDWriteClient(context(), config)) {
341+
String newCommitTime = "001";
342+
int numRecords = 20;
343+
client.startCommitWithTime(newCommitTime);
344+
345+
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
346+
JavaRDD<HoodieRecord> writeRecords = context().getJavaSparkContext().parallelize(records, 1);
347+
List<WriteStatus> result = client.upsert(writeRecords, newCommitTime).collect();
348+
Assertions.assertNoWriteErrors(result);
349+
350+
SQLContext sqlContext = context().getSqlContext();
351+
long totalRecs = sqlContext.read().format("hudi").load(tablePath).count();
352+
assertEquals(totalRecs, 20);
353+
long totalRecsInOldPartition = sqlContext.read().format("hudi").load(tablePath)
354+
.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + DEFAULT_FIRST_PARTITION_PATH + "'").count();
355+
356+
// Execute rename partition command
357+
assertEquals(0, SparkMain.renamePartition(jsc(), tablePath, DEFAULT_FIRST_PARTITION_PATH, "2016/03/18"));
358+
359+
// there should not be any records in old partition
360+
totalRecs = sqlContext.read().format("hudi").load(tablePath)
361+
.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + DEFAULT_FIRST_PARTITION_PATH + "'").count();
362+
assertEquals(totalRecs, 0);
363+
364+
// all records from old partition should have been migrated to new partition
365+
totalRecs = sqlContext.read().format("hudi").load(tablePath)
366+
.filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + "2016/03/18" + "'").count();
367+
assertEquals(totalRecs, totalRecsInOldPartition);
368+
}
369+
}
322370
}

0 commit comments

Comments
 (0)