From feb7bcf7b71ecc65d796a460c433a431357abd9d Mon Sep 17 00:00:00 2001 From: Xingjun Wang Date: Sat, 24 Sep 2022 11:55:24 +0800 Subject: [PATCH] [HUDI-4433] rebase last upstream master [HUDI-4433] fix after rebase --- .../hudi/cli/commands/RepairsCommand.java | 2 +- .../hudi/cli/integ/ITTestRepairsCommand.java | 52 +++++++++++++++++++ .../testutils/HoodieTestDataGenerator.java | 1 + 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index f0ff924e227ff..2b11e20a10d42 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -69,7 +69,7 @@ public class RepairsCommand { @ShellMethod(key = "repair deduplicate", value = "De-duplicate a partition path contains duplicates & produce repaired files to replace with") public String deduplicate( - @ShellOption(value = {"--duplicatedPartitionPath"}, help = "Partition Path containing the duplicates") + @ShellOption(value = {"--duplicatedPartitionPath"}, defaultValue = "", help = "Partition Path containing the duplicates") final String duplicatedPartitionPath, @ShellOption(value = {"--repairedOutputPath"}, help = "Location to place the repaired files") final String repairedOutputPath, diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java index 5938a8ffe243e..69db47136e918 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java @@ -69,6 +69,7 @@ public class ITTestRepairsCommand extends HoodieCLIIntegrationTestBase { private String duplicatedPartitionPath; private String duplicatedPartitionPathWithUpdates; private String duplicatedPartitionPathWithUpserts; + private String duplicatedNoPartitionPath; private String repairedOutputPath; private HoodieFileFormat fileFormat; @@ -78,6 +79,7 @@ public void init() throws Exception { duplicatedPartitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; duplicatedPartitionPathWithUpdates = HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH; duplicatedPartitionPathWithUpserts = HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; + duplicatedNoPartitionPath = HoodieTestDataGenerator.NO_PARTITION_PATH; repairedOutputPath = Paths.get(basePath, "tmp").toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); @@ -135,6 +137,23 @@ public void init() throws Exception { .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "7", dupRecords) .withInserts(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "8", dupRecords); + // init cow table for non-partitioned table tests + String cowNonPartitionedTablePath = Paths.get(basePath, "cow_table_non_partitioned").toString(); + + // Create cow table and connect + new TableCommand().createTable( + cowNonPartitionedTablePath, "cow_table_non_partitioned", HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema); + + cowNonPartitionedTable.addCommit("20160401010101") + .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "1", hoodieRecords1) + .getFileIdWithLogFile(HoodieTestDataGenerator.NO_PARTITION_PATH); + + cowNonPartitionedTable.addCommit("20160401010202") + .withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "2", dupRecords); + fileFormat = metaClient.getTableConfig().getBaseFileFormat(); } @@ -232,6 +251,39 @@ public void testDeduplicateWithUpserts(HoodieTableType tableType) throws IOExcep assertEquals(100, result.count()); } + /** + * Test case dry run deduplicate for non-partitioned dataset. + */ + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + public void testDeduplicateNoPartitionWithInserts(HoodieTableType tableType) throws IOException { + String tablePath = Paths.get(basePath, "cow_table_non_partitioned").toString(); + connectTableAndReloadMetaClient(tablePath); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + fs.listStatus(new Path(Paths.get(tablePath, duplicatedNoPartitionPath).toString()))); + List filteredStatuses = fsView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); + assertEquals(2, filteredStatuses.size(), "There should be 2 files."); + + // Before deduplicate, all files contain 110 records + String[] files = filteredStatuses.toArray(new String[0]); + Dataset df = readFiles(files); + assertEquals(110, df.count()); + + // use default value without specifying duplicatedPartitionPath + String cmdStr = String.format("repair deduplicate --repairedOutputPath %s --sparkMaster %s", + repairedOutputPath, "local"); + Object resultForCmd = shell.evaluate(() -> cmdStr); + assertTrue(ShellEvaluationResultUtil.isSuccess(resultForCmd)); + assertEquals(RepairsCommand.DEDUPLICATE_RETURN_PREFIX + repairedOutputPath, resultForCmd.toString()); + + // After deduplicate, there are 100 records + FileStatus[] fileStatus = fs.listStatus(new Path(repairedOutputPath)); + files = Arrays.stream(fileStatus).map(status -> status.getPath().toString()).toArray(String[]::new); + Dataset result = readFiles(files); + assertEquals(100, result.count()); + } + /** * Test case for real run deduplicate. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 81a1f32ca2bf2..8614060126dfa 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -89,6 +89,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { // with default bloom filter with 60,000 entries and 0.000000001 FPRate public static final int BLOOM_FILTER_BYTES = 323495; private static Logger logger = LogManager.getLogger(HoodieTestDataGenerator.class); + public static final String NO_PARTITION_PATH = ""; public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15"; public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16"; public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";