Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncException;
Expand Down Expand Up @@ -47,12 +49,14 @@
import org.apache.parquet.schema.MessageType;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
Expand Down Expand Up @@ -191,19 +195,27 @@ public void addPartitionsToTable(String tableName, List<String> partitionsToAdd)
}
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
try {
ValidationUtils.checkArgument(syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM) <= 0,
"batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
StorageDescriptor sd = client.getTable(databaseName, tableName).getSd();
List<Partition> partitionList = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null);
}).collect(Collectors.toList());
client.add_partitions(partitionList, true, false);
List<Partition> partitionList = new ArrayList<>();
int batchSyncPartitionNum = syncConfig.getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
for (List<String> batch : CollectionUtils.batches(partitionsToAdd, batchSyncPartitionNum)) {
batch.forEach(x -> {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.getString(META_SYNC_BASE_PATH), x).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(x);
partitionSd.setLocation(fullPartitionPath);
partitionList.add(new Partition(partitionValues, databaseName, tableName, 0, 0, partitionSd, null));
});
client.add_partitions(partitionList, true, false);
LOG.info("HMSDDLExecutor add a batch partitions done: " + partitionList.size());
partitionList.clear();
}
} catch (TException e) {
LOG.error(databaseName + "." + tableName + " add partition failed", e);
throw new HoodieHiveSyncException(databaseName + "." + tableName + " add partition failed", e);
Expand Down