Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;

import javax.annotation.Nonnull;
Expand All @@ -53,10 +54,14 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
public static final String SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
public static final String PLAN_PARTITION_FILTER_MODE =
Expand Down Expand Up @@ -589,18 +594,46 @@ public Builder withDataOptimizeBuildCurveSampleNumber(int sampleNumber) {
}

public HoodieClusteringConfig build() {
clusteringConfig.setDefaultValue(
PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType));
clusteringConfig.setDefaultValue(
EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType));
setDefaults();
validate();

return clusteringConfig;
}

private void setDefaults() {
// Consistent hashing bucket index
if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
&& clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
&& clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
&& clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())) {
clusteringConfig.setDefaultValue(PLAN_STRATEGY_CLASS_NAME, SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
clusteringConfig.setDefaultValue(EXECUTION_STRATEGY_CLASS_NAME, SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
} else {
clusteringConfig.setDefaultValue(
PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType));
clusteringConfig.setDefaultValue(
EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType));
}
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
}

private void validate() {
boolean inlineCluster = clusteringConfig.getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
boolean inlineClusterSchedule = clusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING);
ValidationUtils.checkArgument(!(inlineCluster && inlineClusterSchedule), String.format("Either of inline clustering (%s) or "
+ "schedule inline clustering (%s) can be enabled. Both can't be set to true at the same time. %s,%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(),
HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), inlineCluster, inlineClusterSchedule));
return clusteringConfig;

// Consistent hashing bucket index
if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
&& clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
&& clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
&& clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())) {
ValidationUtils.checkArgument(clusteringConfig.getString(PLAN_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
"Consistent hashing bucket index only supports clustering plan strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
ValidationUtils.checkArgument(clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
"Consistent hashing bucket index only supports clustering execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
}
}

private String getDefaultPlanStrategyClassName(EngineType engineType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.log4j.LogManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use

import org.apache.logging.log4j.LogaManager;
import org.apache.logging.log4j.Logger;

instead as we moved to log4j

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will check this across this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YuweiXiao This still needs to be corrected.

Copy link
Contributor Author

@YuweiXiao YuweiXiao Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I found updating it would break the runtime dependency (ClassNotFoundException). So I guess it may be better to update all logging related import in another PR? WDYT?

import org.apache.log4j.Logger;

import javax.annotation.concurrent.Immutable;

import java.io.File;
Expand Down Expand Up @@ -62,6 +65,8 @@
+ "which tags incoming records as either inserts or updates to older records.")
public class HoodieIndexConfig extends HoodieConfig {

private static final Logger LOG = LogManager.getLogger(HoodieIndexConfig.class);

public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
.key("hoodie.index.type")
// Builder#getDefaultIndexType has already set it according to engine type
Expand Down Expand Up @@ -263,12 +268,37 @@ public class HoodieIndexConfig extends HoodieConfig {
.withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, "
+ "and each partition is divided to N buckets.");

public static final ConfigProperty<String> BUCKET_INDEX_MAX_NUM_BUCKETS = ConfigProperty
.key("hoodie.bucket.index.max.num.buckets")
.noDefaultValue()
.withDocumentation("Only applies if bucket index engine is consistent hashing. Determine the upper bound of "
+ "the number of buckets in the hudi table. Bucket resizing cannot be done higher than this max limit.");

public static final ConfigProperty<String> BUCKET_INDEX_MIN_NUM_BUCKETS = ConfigProperty
.key("hoodie.bucket.index.min.num.buckets")
.noDefaultValue()
.withDocumentation("Only applies if bucket index engine is consistent hashing. Determine the lower bound of "
+ "the number of buckets in the hudi table. Bucket resizing cannot be done lower than this min limit.");

public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty
.key("hoodie.bucket.index.hash.field")
.noDefaultValue()
.withDocumentation("Index key. It is used to index the record and find its file group. "
+ "If not set, use record key field as default");

public static final ConfigProperty<Double> BUCKET_SPLIT_THRESHOLD = ConfigProperty
.key("hoodie.bucket.index.split.threshold")
.defaultValue(2.0)
.withDocumentation("Control if the bucket should be split when using consistent hashing bucket index."
+ "Specifically, if a file slice size reaches `hoodie.xxxx.max.file.size` * threshold, then split will be carried out.");

public static final ConfigProperty<Double> BUCKET_MERGE_THRESHOLD = ConfigProperty
.key("hoodie.bucket.index.merge.threshold")
.defaultValue(0.2)
.withDocumentation("Control if buckets should be merged when using consistent hashing bucket index"
+ "Specifically, if a file slice size is smaller than `hoodie.xxxx.max.file.size` * threshold, then it will be considered"
+ "as a merge candidate.");

/**
* Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}.
*/
Expand Down Expand Up @@ -600,6 +630,16 @@ public Builder withBucketNum(String bucketNum) {
return this;
}

public Builder withBucketMaxNum(int bucketMaxNum) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, String.valueOf(bucketMaxNum));
return this;
}

public Builder withBucketMinNum(int bucketMinNum) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, String.valueOf(bucketMinNum));
return this;
}

public Builder withIndexKeyField(String keyField) {
hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, keyField);
return this;
Expand Down Expand Up @@ -650,6 +690,20 @@ private void validateBucketIndexConfig() {
if (hoodieIndexConfig.getIntOrDefault(BUCKET_INDEX_NUM_BUCKETS) <= 0) {
throw new HoodieIndexException("When using bucket index, hoodie.bucket.index.num.buckets cannot be negative.");
}
int bucketNum = hoodieIndexConfig.getInt(BUCKET_INDEX_NUM_BUCKETS);
if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_MAX_NUM_BUCKETS))) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, Integer.toString(bucketNum));
} else if (hoodieIndexConfig.getInt(BUCKET_INDEX_MAX_NUM_BUCKETS) < bucketNum) {
LOG.warn("Maximum bucket number is smaller than bucket number, maximum: " + hoodieIndexConfig.getInt(BUCKET_INDEX_MAX_NUM_BUCKETS) + ", bucketNum: " + bucketNum);
hoodieIndexConfig.setValue(BUCKET_INDEX_MAX_NUM_BUCKETS, Integer.toString(bucketNum));
}

if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_MIN_NUM_BUCKETS))) {
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, Integer.toString(bucketNum));
} else if (hoodieIndexConfig.getInt(BUCKET_INDEX_MIN_NUM_BUCKETS) > bucketNum) {
LOG.warn("Minimum bucket number is larger than the bucket number, minimum: " + hoodieIndexConfig.getInt(BUCKET_INDEX_MIN_NUM_BUCKETS) + ", bucketNum: " + bucketNum);
hoodieIndexConfig.setValue(BUCKET_INDEX_MIN_NUM_BUCKETS, Integer.toString(bucketNum));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1644,13 +1644,42 @@ public int getBucketIndexNumBuckets() {
return getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
}

public int getBucketIndexMaxNumBuckets() {
return getInt(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS);
}

public int getBucketIndexMinNumBuckets() {
return getInt(HoodieIndexConfig.BUCKET_INDEX_MIN_NUM_BUCKETS);
}

public double getBucketSplitThreshold() {
return getDouble(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD);
}

public double getBucketMergeThreshold() {
return getDouble(HoodieIndexConfig.BUCKET_MERGE_THRESHOLD);
}

public String getBucketIndexHashField() {
return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD);
}

/**
* storage properties.
*/
public long getMaxFileSize(HoodieFileFormat format) {
switch (format) {
case PARQUET:
return getParquetMaxFileSize();
case HFILE:
return getHFileMaxFileSize();
case ORC:
return getOrcMaxFileSize();
default:
throw new HoodieNotSupportedException("Unknown file format: " + format);
}
}

public long getParquetMaxFileSize() {
return getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public abstract HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException;


/**
* Extracts the location of written records, and updates the index.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable, String instant) throws HoodieIndexException {
return updateLocation(writeStatuses, context, hoodieTable);
}


/**
* Rollback the effects of the commit made at instantTime.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
public interface BucketIndexLocationMapper extends Serializable {

/**
* Get record location given hoodie key and partition path
* Get record location given hoodie key
*/
Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath);
Option<HoodieRecordLocation> getRecordLocation(HoodieKey key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
HoodiKey already has partitionPath


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.hash.HashID;
import org.apache.hudi.exception.HoodieClusteringException;

import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;

public class ConsistentBucketIdentifier extends BucketIdentifier {

Expand Down Expand Up @@ -68,7 +76,7 @@ public int getNumBuckets() {
/**
* Get bucket of the given file group
*
* @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
* @param fileId the file group id. NOTE: not filePrefix (i.e., uuid)
*/
public ConsistentHashingNode getBucketByFileId(String fileId) {
return fileIdToBucket.get(fileId);
Expand All @@ -88,17 +96,107 @@ protected ConsistentHashingNode getBucket(int hashValue) {
return tailMap.isEmpty() ? ring.firstEntry().getValue() : tailMap.get(tailMap.firstKey());
}

/**
* Get the former node of the given node (inferred from file id).
*/
public ConsistentHashingNode getFormerBucket(String fileId) {
return getFormerBucket(getBucketByFileId(fileId).getValue());
}

/**
* Get the former node of the given node (inferred from hash value).
*/
public ConsistentHashingNode getFormerBucket(int hashValue) {
SortedMap<Integer, ConsistentHashingNode> headMap = ring.headMap(hashValue);
return headMap.isEmpty() ? ring.lastEntry().getValue() : headMap.get(headMap.lastKey());
}

public List<ConsistentHashingNode> mergeBucket(List<String> fileIds) {
ValidationUtils.checkArgument(fileIds.size() >= 2, "At least two file groups should be provided for merging");
// Get nodes using fileIds
List<ConsistentHashingNode> nodes = fileIds.stream().map(this::getBucketByFileId).collect(Collectors.toList());

// Validate the input
for (int i = 0; i < nodes.size() - 1; ++i) {
ValidationUtils.checkState(getFormerBucket(nodes.get(i + 1).getValue()).getValue() == nodes.get(i).getValue(), "Cannot merge discontinuous hash range");
}

// Create child nodes with proper tag (keep the last one and delete other nodes)
List<ConsistentHashingNode> childNodes = new ArrayList<>(nodes.size());
for (int i = 0; i < nodes.size() - 1; ++i) {
childNodes.add(new ConsistentHashingNode(nodes.get(i).getValue(), null, ConsistentHashingNode.NodeTag.DELETE));
}
childNodes.add(new ConsistentHashingNode(nodes.get(nodes.size() - 1).getValue(), FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE));
return childNodes;
}

public Option<List<ConsistentHashingNode>> splitBucket(String fileId) {
ConsistentHashingNode bucket = getBucketByFileId(fileId);
ValidationUtils.checkState(bucket != null, "FileId has no corresponding bucket");
return splitBucket(bucket);
}

/**
* Split bucket in the range middle, also generate the corresponding file ids
*
* TODO support different split criteria, e.g., distribute records evenly using statistics
*
* @param bucket parent bucket
* @return lists of children buckets
*/
public Option<List<ConsistentHashingNode>> splitBucket(@NotNull ConsistentHashingNode bucket) {
ConsistentHashingNode formerBucket = getFormerBucket(bucket.getValue());

long mid = (long) formerBucket.getValue() + bucket.getValue()
+ (formerBucket.getValue() < bucket.getValue() ? 0 : (HoodieConsistentHashingMetadata.HASH_VALUE_MASK + 1L));
mid = (mid >> 1) & HoodieConsistentHashingMetadata.HASH_VALUE_MASK;

// Cannot split as it already is the smallest bucket range
if (mid == formerBucket.getValue() || mid == bucket.getValue()) {
return Option.empty();
}

return Option.of(Arrays.asList(
new ConsistentHashingNode((int) mid, FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE),
new ConsistentHashingNode(bucket.getValue(), FSUtils.createNewFileIdPfx(), ConsistentHashingNode.NodeTag.REPLACE))
);
}

/**
* Initialize necessary data structure to facilitate bucket identifying.
* Specifically, we construct:
* - An in-memory tree (ring) to speed up range mapping searching.
* - A hash table (fileIdToBucket) to allow lookup of bucket using fileId.
* <p>
* Children nodes are also considered, and will override the original nodes,
* which is used during bucket resizing (i.e., children nodes take the place
* of the original nodes)
*/
private void initialize() {
for (ConsistentHashingNode p : metadata.getNodes()) {
ring.put(p.getValue(), p);
// One bucket has only one file group, so append 0 directly
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p);
}

// Handle children nodes, i.e., replace or delete the original nodes
ConsistentHashingNode tmp;
for (ConsistentHashingNode p : metadata.getChildrenNodes()) {
switch (p.getTag()) {
case REPLACE:
tmp = ring.put(p.getValue(), p);
if (tmp != null) {
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPrefix(), 0));
}
fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p);
break;
case DELETE:
tmp = ring.remove(p.getValue());
fileIdToBucket.remove(FSUtils.createNewFileId(tmp.getFileIdPrefix(), 0));
break;
default:
throw new HoodieClusteringException("Children node is tagged as NORMAL or unknown tag: " + p);
}
}
}
}
Loading