Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_IGNORE_EXCEPTIONS;
Expand Down Expand Up @@ -166,29 +168,35 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)
"The last commit that was synced should be updated in the TBLPROPERTIES");

// Adding of new partitions
List<String> newPartition = Collections.singletonList("2050/01/01");
List<String> newPartition = Arrays.asList("2050/01/01", "2040/02/01");
hiveClient.addPartitionsToTable(HiveTestUtil.TABLE_NAME, Collections.emptyList());
assertEquals(5, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
"No new partition should be added");
hiveClient.addPartitionsToTable(HiveTestUtil.TABLE_NAME, newPartition);
assertEquals(6, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
assertEquals(7, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
"New partition should be added");

// Update partitions
hiveClient.updatePartitionsToTable(HiveTestUtil.TABLE_NAME, Collections.emptyList());
assertEquals(6, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
assertEquals(7, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
"Partition count should remain the same");
hiveClient.updatePartitionsToTable(HiveTestUtil.TABLE_NAME, newPartition);
assertEquals(6, hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
"Partition count should remain the same");
List<Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
Set<String> relativePartitionPaths = hivePartitions.stream()
.map(p -> getRelativePartitionPath(new Path(basePath), new Path(p.getStorageLocation())))
.collect(Collectors.toSet());
// partition paths from the storage descriptor should be unique and contain the updated partitions
assertEquals(7, hivePartitions.size(), "Partition count should remain the same");
assertEquals(hivePartitions.size(), relativePartitionPaths.size());
assertTrue(relativePartitionPaths.containsAll(newPartition));

// Alter partitions
// Manually change a hive partition location to check if the sync will detect
// it and generate a partition update event for it.
ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");

List<Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
Expand All @@ -200,7 +208,7 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)

// Sync should update the changed partition to correct path
List<Partition> tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive");
assertEquals(7, tablePartitions.size(), "The one partition we wrote should be added to hive");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be 100");
}
Expand Down