Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4763d2e
Rebasing MT Bloom Index partition listing to do bulk-request to MT li…
Jan 11, 2023
66551ee
Revisit Bloom Index to avoid superfluous conversions
Jan 11, 2023
bfa0e3c
Adding missing utils to `HoodieData` hierarchy
Jan 11, 2023
efa73a6
Streamline Bloom Index to pass around `HoodiePairData` to avoid unnec…
Jan 11, 2023
188d37d
Fixing `ClassCastException`
Jan 11, 2023
f86bc5b
Fixed `BloomFilter` deserialization seq to avoid mutating source `Byt…
Jan 12, 2023
bb6cf62
[WIP] Split Bloom Index lookup in MT and actual file probing to allow…
Jan 14, 2023
e68e224
Rebased Bloom Index to rely on full `HoodieFileGroupId` instead of ju…
Jan 17, 2023
37b34ec
Added static FS view as broadcasted value (to avoid the need to fire …
Jan 17, 2023
e0bc340
De-duplicated `HoodieBloomIndexCheckFunction` to be re-used for List/…
Jan 18, 2023
209fc66
Tidying up
Jan 18, 2023
0bb0aea
Use method rewriting the record that skips the validation
Jan 18, 2023
8dac4b5
Tidying up
Jan 18, 2023
736ab71
Reverting inadvertent changes
Jan 18, 2023
09012c6
Fixing compilation for Scala 2.11
Jan 18, 2023
25698d8
Scaffolded `HoodieRDDUtils` to handle incompatible change in Spark 3.…
Jan 18, 2023
f4ecb9f
Fixed non-serializable lambdas
Jan 19, 2023
c7bbc9a
Killing dead code
Jan 19, 2023
71b9af0
Fixing compilation
Jan 19, 2023
e3eb787
Introduced `Transient` to auto-reset after being serialized based on …
Jan 24, 2023
c9ca811
Cleaned up `BaseTableMetadata` to avoid holding `SerializableConfigur…
Jan 24, 2023
66ed5f7
Made `HoodieBackedTableMetadata` serializable even when reusing the u…
Jan 24, 2023
fe8f935
Simplified and cleaned up `FlatteningIterator`;
Jan 25, 2023
077909a
Added workaround to avoid re-processing whole partition for every fil…
Jan 25, 2023
6fdd8b5
Adding java-docs
Jan 25, 2023
0a80390
[XXX] Avoid repartitioning multiple times by using the same paralleli…
Jan 26, 2023
c5489b7
Tidying up
Jan 26, 2023
bb20aea
Tidying up
Jan 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ public class HoodieIndexUtils {
* @param hoodieTable Instance of {@link HoodieTable} of interest
* @return the list of {@link HoodieBaseFile}
*/
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
final String partition,
final HoodieTable hoodieTable) {
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(String partition,
HoodieTable hoodieTable) {
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
if (latestCommitTime.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@

package org.apache.hudi.index.bloom;

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

Expand All @@ -51,7 +50,7 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable {
public abstract HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
Map<String, Long> recordsPerPartition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand All @@ -35,7 +36,6 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
Expand All @@ -45,9 +45,9 @@
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.groupingBy;
Expand Down Expand Up @@ -127,7 +127,7 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(

// Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
// that contains it.
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs =
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);

return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
Expand Down Expand Up @@ -210,34 +210,35 @@ protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName());

final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
return context.flatMap(partitions, partitionName -> {
// Partition and file name pairs
List<Pair<String, String>> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName()))
.sorted()
.collect(toList());
if (partitionFileNameList.isEmpty()) {
return Stream.empty();
}
try {
Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
// NOTE: Here we assume that the type of the primary key field is string
(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
(String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
)));
}
return result.stream();
} catch (MetadataNotFoundException me) {
throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
}
}, Math.max(partitions.size(), 1));
String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sure we query Column Stats in MT in one batch request


// Partition and file name pairs
List<Pair<String, String>> partitionFileNameList =
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(partitionBaseFilePair -> Pair.of(partitionBaseFilePair.getLeft(), partitionBaseFilePair.getRight().getFileName()))
.sorted()
.collect(toList());

if (partitionFileNameList.isEmpty()) {
return Collections.emptyList();
}

Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);

List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>(fileToColumnStatsMap.size());

for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
// NOTE: Here we assume that the type of the primary key field is string
(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
(String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
)));
}

return result;
}

@Override
Expand Down Expand Up @@ -278,7 +279,7 @@ public boolean isImplicitWithStorage() {
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
* recordKey ranges in the index info.
*/
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {
IndexFileFilter indexFileFilter =
Expand All @@ -289,11 +290,13 @@ HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
String recordKey = partitionRecordKeyPair.getRight();
String partitionPath = partitionRecordKeyPair.getLeft();

return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionPath)))
.collect(Collectors.toList());
}).flatMap(List::iterator);
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey)
.stream()
.map(partitionFileIdPair ->
new ImmutablePair<>(
new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
})
.flatMapToPair(Stream::iterator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.hudi.index.bloom;

import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -28,53 +29,67 @@
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;

import java.util.function.Function;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

/**
* Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
* Function accepting a tuple of {@link HoodieFileGroupId} and a record key and producing
* a list of {@link HoodieKeyLookupResult} for every file identified by the file-group ids
*
* @param <I> type of the tuple of {@code (HoodieFileGroupId, <record-key>)}. Note that this is
* parameterized as generic such that this code could be reused for Spark as well
*/
public class HoodieBaseBloomIndexCheckFunction
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
public class HoodieBloomIndexCheckFunction<I>
implements Function<Iterator<I>, Iterator<List<HoodieKeyLookupResult>>>, Serializable {

private final HoodieTable hoodieTable;

private final HoodieWriteConfig config;

public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
private final SerializableFunction<I, HoodieFileGroupId> fileGroupIdExtractor;
private final SerializableFunction<I, String> recordKeyExtractor;

public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable,
HoodieWriteConfig config,
SerializableFunction<I, HoodieFileGroupId> fileGroupIdExtractor,
SerializableFunction<I, String> recordKeyExtractor) {
this.hoodieTable = hoodieTable;
this.config = config;
this.fileGroupIdExtractor = fileGroupIdExtractor;
this.recordKeyExtractor = recordKeyExtractor;
}

@Override
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> fileGroupIdRecordKeyPairIterator) {
return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
}

class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<HoodieKeyLookupResult>> {
protected class LazyKeyCheckIterator extends LazyIterableIterator<I, List<HoodieKeyLookupResult>> {

private HoodieKeyLookupHandle keyLookupHandle;

LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyTripletItr) {
super(filePartitionRecordKeyTripletItr);
}

@Override
protected void start() {
}

@Override
protected List<HoodieKeyLookupResult> computeNext() {

List<HoodieKeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
while (inputItr.hasNext()) {
Pair<String, HoodieKey> currentTuple = inputItr.next();
String fileId = currentTuple.getLeft();
String partitionPath = currentTuple.getRight().getPartitionPath();
String recordKey = currentTuple.getRight().getRecordKey();
I tuple = inputItr.next();

HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
String recordKey = recordKeyExtractor.apply(tuple);

String fileId = fileGroupId.getFileId();
String partitionPath = fileGroupId.getPartitionPath();

Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);

// lazily init state
Expand All @@ -100,15 +115,13 @@ protected List<HoodieKeyLookupResult> computeNext() {
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
throw e;
throw (HoodieException) e;
}

throw new HoodieIndexException("Error checking bloom filter index. ", e);
}
return ret;
}

@Override
protected void end() {
return ret;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand All @@ -41,7 +42,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* This filter will only work with hoodie table since it will only load partitions
Expand Down Expand Up @@ -74,7 +75,7 @@ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(List<String> pa
*/

@Override
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
HoodiePairData<String, String> partitionRecordKeyPairs) {

Expand All @@ -87,10 +88,11 @@ HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
String partitionPath = partitionRecordKeyPair.getLeft();

return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
.collect(Collectors.toList());
}).flatMap(List::iterator);
.map(partitionFileIdPair ->
new ImmutablePair<>(
new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
})
.flatMapToPair(Stream::iterator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@

package org.apache.hudi.index.bloom;

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieKeyLookupResult;
import org.apache.hudi.table.HoodieTable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -56,21 +56,21 @@ public static ListBasedHoodieBloomIndexHelper getInstance() {
public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
HoodiePairData<String, String> partitionRecordKeyPairs,
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
List<Pair<String, HoodieKey>> fileComparisonPairList =
List<Pair<HoodieFileGroupId, String>> fileComparisonPairList =
fileComparisonPairs.collectAsList().stream()
.sorted(Comparator.comparing(Pair::getLeft)).collect(toList());

List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
Iterator<List<HoodieKeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
hoodieTable, config).apply(fileComparisonPairList.iterator());
while (iterator.hasNext()) {
keyLookupResults.addAll(iterator.next());
}
List<HoodieKeyLookupResult> keyLookupResults =
CollectionUtils.toStream(
new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
.apply(fileComparisonPairList.iterator())
)
.flatMap(Collection::stream)
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.collect(toList());

keyLookupResults = keyLookupResults.stream().filter(
lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
return context.parallelize(keyLookupResults).flatMap(lookupResult ->
lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -186,11 +187,14 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea
partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
});

List<Pair<String, HoodieKey>> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();

assertEquals(10, comparisonKeyList.size());
java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
.collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList())));
.collect(
java.util.stream.Collectors.groupingBy(t -> t.getRight(),
java.util.stream.Collectors.mapping(t -> t.getLeft().getFileId(), java.util.stream.Collectors.toList())));

assertEquals(4, recordKeyToFileComps.size());
assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002")));
Expand Down
Loading