Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ public class HoodieIndexConfig extends HoodieConfig {
+ "When true, the index lookup uses bloom filters and column stats from metadata "
+ "table when available to speed up the process.");

public static final ConfigProperty<Integer> BLOOM_INDEX_METADATA_READ_PARALLELISM = ConfigProperty
.key("hoodie.bloom.index.metadata.read.parallelism")
.defaultValue(10)
.sinceVersion("0.13.0")
.withDocumentation("Only applies if index type is BLOOM and metadata table is enabled "
+ "for index lookup (hoodie.bloom.index.use.metadata=true). "
+ "Determines the parallelism for reading the index from metadata table.");

public static final ConfigProperty<Integer> BLOOM_INDEX_METADATA_BLOOM_FILTER_READ_BATCH_SIZE = ConfigProperty
.key("hoodie.bloom.index.metadata.bloom.filter.read.batch.size")
.defaultValue(128)
.sinceVersion("0.13.0")
.withDocumentation("Only applies if index type is BLOOM and metadata table is enabled "
+ "for index lookup (hoodie.bloom.index.use.metadata=true). "
+ "Determines the batch size for reading bloom filters from metadata table. "
+ "Smaller value puts less pressure on the executor memory.");

public static final ConfigProperty<String> BLOOM_INDEX_TREE_BASED_FILTER = ConfigProperty
.key("hoodie.bloom.index.use.treebased.filter")
.defaultValue("true")
Expand Down Expand Up @@ -540,6 +557,16 @@ public Builder bloomIndexUseMetadata(boolean useMetadata) {
return this;
}

public Builder bloomIndexMetadataReadParallelism(int parallelism) {
hoodieIndexConfig.setValue(BLOOM_INDEX_METADATA_READ_PARALLELISM, String.valueOf(parallelism));
return this;
}

public Builder bloomIndexMetadataBloomFilterReadBatchSize(int batchSize) {
hoodieIndexConfig.setValue(BLOOM_INDEX_METADATA_BLOOM_FILTER_READ_BATCH_SIZE, String.valueOf(batchSize));
return this;
}

public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) {
hoodieIndexConfig.setValue(BLOOM_INDEX_TREE_BASED_FILTER, String.valueOf(useTreeFilter));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,14 @@ public boolean getBloomIndexUseMetadata() {
return getBooleanOrDefault(HoodieIndexConfig.BLOOM_INDEX_USE_METADATA);
}

public int getBloomIndexMetadataReadParallelism() {
return getIntOrDefault(HoodieIndexConfig.BLOOM_INDEX_METADATA_READ_PARALLELISM);
}

public int getBloomIndexMetadataBloomFilterReadBatchSize() {
return getIntOrDefault(HoodieIndexConfig.BLOOM_INDEX_METADATA_BLOOM_FILTER_READ_BATCH_SIZE);
}

public boolean useBloomIndexTreebasedFilter() {
return getBoolean(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hudi.table.HoodieTable;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -43,15 +42,15 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable {
* @param context {@link HoodieEngineContext} instance to use.
* @param hoodieTable {@link HoodieTable} instance to use.
* @param partitionRecordKeyPairs Pairs of partition path and record key.
* @param fileComparisonPairs Pairs of filename and record key based on file comparisons.
* @param fileComparisonPairs Pairs of (file ID, filename) pair and record key based on file comparisons.
* @param partitionToFileInfo Partition path to {@link BloomIndexFileInfo} map.
* @param recordsPerPartition Number of records per partition in a map.
* @return {@link HoodiePairData} of {@link HoodieKey} and {@link HoodieRecordLocation} pairs.
*/
public abstract HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
HoodieData<Pair<Pair<String, String>, HoodieKey>> fileComparisonPairs,
Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@
public class BloomIndexFileInfo implements Serializable {

private final String fileId;

private final String filename;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we store the filename to avoid recomputation of the same piece of information across stages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to store both file-id and file-name? I think we can just store the file-name, and then convert it to file-id wherever necessary

private final String minRecordKey;

private final String maxRecordKey;

public BloomIndexFileInfo(String fileId, String minRecordKey, String maxRecordKey) {
public BloomIndexFileInfo(String fileId, String filename, String minRecordKey, String maxRecordKey) {
this.fileId = fileId;
this.filename = filename;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fileName

this.minRecordKey = minRecordKey;
this.maxRecordKey = maxRecordKey;
}

public BloomIndexFileInfo(String fileId) {
public BloomIndexFileInfo(String fileId, String filename) {
this.fileId = fileId;
this.filename = filename;
this.minRecordKey = null;
this.maxRecordKey = null;
}
Expand All @@ -48,6 +49,10 @@ public String getFileId() {
return fileId;
}

public String getFilename() {
return filename;
}

public String getMinRecordKey() {
return minRecordKey;
}
Expand Down Expand Up @@ -78,20 +83,23 @@ public boolean equals(Object o) {
}

BloomIndexFileInfo that = (BloomIndexFileInfo) o;
return Objects.equals(that.fileId, fileId) && Objects.equals(that.minRecordKey, minRecordKey)
return Objects.equals(that.fileId, fileId)
&& Objects.equals(that.filename, filename)
&& Objects.equals(that.minRecordKey, minRecordKey)
&& Objects.equals(that.maxRecordKey, maxRecordKey);

}

@Override
public int hashCode() {
return Objects.hash(fileId, minRecordKey, maxRecordKey);
return Objects.hash(fileId, filename, minRecordKey, maxRecordKey);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {");
sb.append(" fileId=").append(fileId);
sb.append(" filename=").append(filename);
sb.append(" minRecordKey=").append(minRecordKey);
sb.append(" maxRecordKey=").append(maxRecordKey);
sb.append('}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
Expand Down Expand Up @@ -121,12 +121,14 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(

// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
// partition -> {file ID -> BloomIndexFileInfo instance}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "File Id" denomination might be confusing, more accurate would be "file-group id"

final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from List to Map for each value so that the filename can be retrieved with file ID in O(1) time.

.collect(groupingBy(Pair::getLeft, toMap(entry -> entry.getRight().getFileId(), Pair::getRight)));

// Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
// that contains it.
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
// Each entry: ((File ID, Filename), HoodieKey instance)
HoodieData<Pair<Pair<String, String>, HoodieKey>> fileComparisonPairs =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along the DAG, passing down the file names so they don't need to be recomputed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies here: i don't think we need to propagate both file-id and file-name since the latter contains the former (it increases both complexity and amount of data we need to shuffle)

explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);

return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
Expand All @@ -142,7 +144,8 @@ private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartition
// load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
if (config.getBloomIndexUseMetadata()
&& hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
fileInfoList = loadColumnRangesFromMetaIndex(
affectedPartitionPathList, context, hoodieTable, config.getBloomIndexMetadataReadParallelism());
}
// fallback to loading column ranges from files
if (isNullOrEmpty(fileInfoList)) {
Expand All @@ -161,21 +164,30 @@ private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartition
List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
// Obtain the latest data files from all the partitions.
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
// Each entry: (relative partition path, (File ID, Filename))
List<Pair<String, Pair<String, String>>> partitionPathFileIdNameList =
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(
pair.getKey(),
Pair.of(pair.getValue().getFileId(), pair.getValue().getFileName())))
.collect(toList());

context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName());
return context.map(partitionPathFileIDList, pf -> {
return context.map(partitionPathFileIdNameList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(
config, hoodieTable, Pair.of(pf.getLeft(), pf.getRight().getLeft()));
String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
return Pair.of(
pf.getKey(),
new BloomIndexFileInfo(pf.getValue().getKey(), pf.getValue().getValue(), minMaxKeys[0], minMaxKeys[1]));
} catch (MetadataNotFoundException me) {
LOG.warn("Unable to find range metadata in file :" + pf);
return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
return Pair.of(
pf.getKey(),
new BloomIndexFileInfo(pf.getValue().getKey(), pf.getValue().getValue()));
}
}, Math.max(partitionPathFileIDList.size(), 1));
}, Math.max(partitionPathFileIdNameList.size(), 1));
}

/**
Expand All @@ -188,12 +200,17 @@ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
*/
private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context,
hoodieTable).stream()
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());
List<Pair<String, Pair<String, String>>> partitionPathFileIDList =
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(pair -> Pair.of(
pair.getKey(),
Pair.of(pair.getValue().getFileId(), pair.getValue().getFileName())))
.collect(toList());
return partitionPathFileIDList.stream()
.map(pf -> Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList());
.map(pf -> Pair.of(
pf.getKey(),
new BloomIndexFileInfo(pf.getValue().getKey(), pf.getValue().getValue())))
.collect(toList());
}

/**
Expand All @@ -202,41 +219,57 @@ private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
* @param partitions - List of partitions for which column stats need to be loaded
* @param context - Engine context
* @param hoodieTable - Hoodie table
* @param parallelism - Parallelism for reading column stats from metadata table
* @return List of partition and file column range info pairs
*/
protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
List<String> partitions, final HoodieEngineContext context, final HoodieTable<?, ?, ?, ?> hoodieTable) {
List<String> partitions, final HoodieEngineContext context,
final HoodieTable<?, ?, ?, ?> hoodieTable, int parallelism) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName());

final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
return context.flatMap(partitions, partitionName -> {
// Partition and file name pairs
List<Pair<String, String>> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName()))
.sorted()
.collect(toList());
if (partitionFileNameList.isEmpty()) {
return Stream.empty();
}
try {
Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
// NOTE: Here we assume that the type of the primary key field is string
(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
(String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
)));
}
return result.stream();
} catch (MetadataNotFoundException me) {
throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
}
}, Math.max(partitions.size(), 1));
return context.parallelize(partitions, Math.max(Math.min(partitions.size(), parallelism), 1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limits the parallelism of column stats fetching from the metadata table.

.mapPartitions(partitionIterator -> {
List<String> partitionNameList = new ArrayList<>();
List<Pair<String, String>> partitionFileNameList = new ArrayList<>();
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();

// For the list of partitions, get all file names to fetch column stats
while (partitionIterator.hasNext()) {
String partitionName = partitionIterator.next();
partitionNameList.add(partitionName);
partitionFileNameList.addAll(
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName, hoodieTable).stream()
.map(baseFile -> Pair.of(partitionName, baseFile.getFileName()))
.collect(Collectors.toList()));
}
if (partitionFileNameList.isEmpty()) {
return result.iterator();
}

// Sort the file name list and do lookup of column stats from metadata table
List<Pair<String, String>> sortedPartitionFileNameList =
partitionFileNameList.stream().sorted().collect(toList());
try {
Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
hoodieTable.getMetadataTable().getColumnStats(sortedPartitionFileNameList, keyField);
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
entry.getKey().getRight(),
// NOTE: Here we assume that the type of the primary key field is string
(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
(String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
)));
}
return result.iterator();
} catch (MetadataNotFoundException me) {
throw new HoodieMetadataException("Unable to find column range metadata for partitions:" + partitionNameList, me);
}
}, true)
.collectAsList();
}

@Override
Expand Down Expand Up @@ -277,8 +310,8 @@ public boolean isImplicitWithStorage() {
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodieData<Pair<Pair<String, String>, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
IndexFileFilter indexFileFilter =
config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
Expand All @@ -289,8 +322,9 @@ HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
String partitionPath = partitionRecordKeyPair.getLeft();

return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.map(partitionFileIdPair -> (Pair<Pair<String, String>, HoodieKey>)
new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(List<String> pa
*/

@Override
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodieData<Pair<Pair<String, String>, HoodieKey>> explodeRecordsWithFileComparisons(
final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {

IndexFileFilter indexFileFilter =
Expand All @@ -87,7 +87,7 @@ HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
String partitionPath = partitionRecordKeyPair.getLeft();

return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
.map(partitionFileIdPair -> (Pair<Pair<String, String>, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
.collect(Collectors.toList());
}).flatMap(List::iterator);
Expand Down
Loading