diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index acca48537d4c..6264d2fc1a01 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -232,11 +232,11 @@ public static HoodieDefaultTimeline getTimeline(HoodieTableMetaClient metaClient */ public static HoodieTimeline getCommitsTimelineAfter( HoodieTableMetaClient metaClient, String exclusiveStartInstantTime, Option lastMaxCompletionTime) { - HoodieDefaultTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieDefaultTimeline writeTimeline = metaClient.getActiveTimeline().getWriteTimeline(); - HoodieDefaultTimeline timeline = activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime) - ? metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(activeTimeline) - : activeTimeline; + HoodieDefaultTimeline timeline = writeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime) + ? metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(writeTimeline) + : writeTimeline; HoodieDefaultTimeline timelineSinceLastSync = (HoodieDefaultTimeline) timeline.getCommitsTimeline() .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 6b48e748b503..a1f00b8eaf09 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -59,6 +59,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -67,6 +68,9 @@ import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION; @@ -77,6 +81,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -323,6 +328,33 @@ public void testGetCommitsTimelineAfter() throws IOException { new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")), TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, Option.of(startTs))); verify(mockMetaClient, times(1)).getArchivedTimeline(any()); + + // Should load both archived and active timeline + startTs = "005"; + mockMetaClient = prepareMetaClient( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "003", "003"), + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "007", "007"), + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009", "009"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")), + Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001", "001"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "005", "005"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "006", "006"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "008", "008")), + startTs + ); + verifyTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, COMMIT_ACTION, "006", "006"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "008", "008"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")), + TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, Option.of(startTs))); + verify(mockMetaClient, times(1)).getArchivedTimeline(any()); } private HoodieTableMetaClient prepareMetaClient( @@ -337,6 +369,8 @@ private HoodieTableMetaClient prepareMetaClient( HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(mockMetaClient); when(mockMetaClient.getActiveTimeline()) .thenReturn(activeTimeline); + Set validWriteActions = CollectionUtils.createSet( + COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION); when(mockMetaClient.getArchivedTimeline(any())) .thenReturn(mockArchivedTimeline); HoodieDefaultTimeline mergedTimeline = new HoodieDefaultTimeline( @@ -346,6 +380,14 @@ private HoodieTableMetaClient prepareMetaClient( .mergeTimeline(activeTimeline); when(mockArchivedTimeline.mergeTimeline(eq(activeTimeline))) .thenReturn(mergedTimeline); + HoodieDefaultTimeline mergedWriteTimeline = new HoodieDefaultTimeline( + archivedInstants.stream() + .filter(instant -> instant.getTimestamp().compareTo(startTs) >= 0), + i -> Option.empty()) + .mergeTimeline(activeTimeline.getWriteTimeline()); + when(mockArchivedTimeline.mergeTimeline(argThat(timeline -> timeline.filter( + instant -> instant.getAction().equals(ROLLBACK_ACTION)).countInstants() == 0))) + .thenReturn(mergedWriteTimeline); return mockMetaClient; } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index da23d8876c78..c51ceadd7ff3 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieSyncTableStrategy; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; @@ -390,6 +391,41 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode, tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); assertEquals(Option.of("300"), hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME)); assertEquals(7, tablePartitions.size()); + + // Add the following instants to the active timeline and sync: "400" (rollback), "500" (commit) + // Last commit time sync is "500" after Hive sync + HiveTestUtil.addRollbackInstantToTable("400", "350"); + HiveTestUtil.commitToTable("500", 7, useSchemaFromCommitMetadata); + reInitHiveSyncClient(); + reSyncHiveTable(); + tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(Option.of("500"), hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME)); + assertEquals(8, tablePartitions.size()); + + // Add more instants with adding a partition and simulate the case where the commit adding + // the new partition is archived. + // Before simulated archival: "300", "400" (rollback), "500", "600" (adding new partition), "700", "800" + // After simulated archival: "400" (rollback), "700", "800" + // In this case, listing all partitions should be triggered to catch up. + HiveTestUtil.commitToTable("600", 8, useSchemaFromCommitMetadata); + HiveTestUtil.commitToTable("700", 1, useSchemaFromCommitMetadata); + HiveTestUtil.commitToTable("800", 1, useSchemaFromCommitMetadata); + HiveTestUtil.removeCommitFromActiveTimeline("300", COMMIT_ACTION); + HiveTestUtil.removeCommitFromActiveTimeline("500", COMMIT_ACTION); + HiveTestUtil.removeCommitFromActiveTimeline("600", COMMIT_ACTION); + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(hiveClient.config.getHadoopConf()).setBasePath(basePath).build(); + assertEquals( + Arrays.asList("400", "700", "800"), + metaClient.getActiveTimeline().getInstants().stream() + .map(HoodieInstant::getTimestamp).sorted() + .collect(Collectors.toList())); + + reInitHiveSyncClient(); + reSyncHiveTable(); + tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(Option.of("800"), hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME)); + assertEquals(9, tablePartitions.size()); } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index d63c026eb6d7..85a5789317de 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -19,6 +19,7 @@ package org.apache.hudi.hive.testutils; import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; @@ -81,6 +82,7 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -92,6 +94,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; +import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRollbackMetadata; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; @@ -263,6 +266,32 @@ public static void createReplaceCommit(String instantTime, String partitions, Wr createReplaceCommitFile(replaceCommitMetadata, instantTime); } + public static void addRollbackInstantToTable(String instantTime, String commitToRollback) + throws IOException { + HoodieRollbackMetadata rollbackMetadata = HoodieRollbackMetadata.newBuilder() + .setVersion(1) + .setStartRollbackTime(instantTime) + .setTotalFilesDeleted(1) + .setTimeTakenInMillis(1000) + .setCommitsRollback(Collections.singletonList(commitToRollback)) + .setPartitionMetadata(Collections.emptyMap()) + .setInstantsRollback(Collections.emptyList()) + .build(); + + createMetaFile( + basePath, + HoodieTimeline.makeRequestedRollbackFileName(instantTime), + "".getBytes()); + createMetaFile( + basePath, + HoodieTimeline.makeInflightRollbackFileName(instantTime), + "".getBytes()); + createMetaFile( + basePath, + HoodieTimeline.makeRollbackFileName(instantTime), + serializeRollbackMetadata(rollbackMetadata).get()); + } + public static void createCOWTableWithSchema(String instantTime, String schemaFileName) throws IOException, URISyntaxException { Path path = new Path(basePath); @@ -521,21 +550,17 @@ private static void checkResult(boolean result) { } public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime, String basePath) throws IOException { - byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" - + HoodieTimeline.makeCommitFileName(instantTime)); - FSDataOutputStream fsout = fileSystem.create(fullPath, true); - fsout.write(bytes); - fsout.close(); + createMetaFile( + basePath, + HoodieTimeline.makeCommitFileName(instantTime), + commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); } public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException { - byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" - + HoodieTimeline.makeReplaceFileName(instantTime)); - FSDataOutputStream fsout = fileSystem.create(fullPath, true); - fsout.write(bytes); - fsout.close(); + createMetaFile( + basePath, + HoodieTimeline.makeReplaceFileName(instantTime), + commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); } public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException { @@ -545,19 +570,23 @@ public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadat private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { - byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" - + HoodieTimeline.makeCommitFileName(instantTime)); - FSDataOutputStream fsout = fileSystem.create(fullPath, true); - fsout.write(bytes); - fsout.close(); + createMetaFile( + basePath, + HoodieTimeline.makeCommitFileName(instantTime), + commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); } private static void createDeltaCommitFile(HoodieCommitMetadata deltaCommitMetadata, String deltaCommitTime) throws IOException { - byte[] bytes = deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" - + HoodieTimeline.makeDeltaFileName(deltaCommitTime)); + createMetaFile( + basePath, + HoodieTimeline.makeDeltaFileName(deltaCommitTime), + deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); + } + + private static void createMetaFile(String basePath, String fileName, byte[] bytes) + throws IOException { + Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" + fileName); FSDataOutputStream fsout = fileSystem.create(fullPath, true); fsout.write(bytes); fsout.close();