Skip to content

Commit 5078e72

Browse files
alexeykudinkinjian.feng
authored andcommitted
[HUDI-5534] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table (apache#7642)
Most recently, while trying to use Metadata Table in Bloom Index it was resulting in failures due to exhaustion of S3 connection pool no matter how (reasonably big) we're setting the pool size (we've tested up to 3k connections). This PR focuses on optimizing the Bloom Index lookup sequence in case when it's leveraging Bloom Filter partition in Metadata Table. The premise of this change is based on the following observations: Increasing the size of the batch of the requests to MT allows to amortize the cost of processing it (bigger the batch, lesser the cost). Having too few partitions in the Bloom Index path however, starts to hurt parallelism when we actually probe individual files whether they actually contain target keys or not. Solution to this is to split these 2 in different stages w/ drastically different parallelism levels: constrain parallelism when reading from MT (10s of tasks) and keep at the current level for probing individual files (100s of tasks) Current way of partitioning records (relying on Spark's default partitioner) was entailing that every Spark executor with high likelihood will be opening up (and processing) every file-group of the MT Bloom Filter partition. To alleviate that same hashing algorithm used by MT should be used to partition records into Spark's individual partitions, so that we can limit every task to open no more than 1 file-group in Bloom Filter's partition of MT To achieve that following changes in Bloom Index sequence (leveraging MT) are implemented Bloom Filter probing and actual File Probing are split into 2 separate operations (so that parallelism of each of them could be controlled individually) Requests to MT are replaced to invoke batch APIs Custom partitioner is introduced AffineBloomIndexFileGroupPartitioner repartitioning dataset of filenames with corresponding record keys in a way that is affine w/ MT Bloom Filters' partitioning (allowing us to open no more than a single file-group per Spark's task) Additionally, this PR addresses some of the low-hanging performance optimizations that could considerably improve performance of the Bloom Index lookup sequence like mapping file-comparison pairs to PairRDD (where key is file-name, and value is record-key) instead of RDD so that we could: Do in-partition sorting by filename (to make sure we check all records w/in the file all at once) w/in a single Spark partition instead of global one (reducing shuffling as well) Avoid re-shuffling (by re-mapping from RDD to PairRDD later)
1 parent 2e6d29b commit 5078e72

35 files changed

Lines changed: 1059 additions & 484 deletions

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,8 @@ public class HoodieIndexUtils {
6262
* @param hoodieTable Instance of {@link HoodieTable} of interest
6363
* @return the list of {@link HoodieBaseFile}
6464
*/
65-
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
66-
final String partition,
67-
final HoodieTable hoodieTable) {
65+
public static List<HoodieBaseFile> getLatestBaseFilesForPartition(String partition,
66+
HoodieTable hoodieTable) {
6867
Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
6968
.filterCompletedInstants().lastInstant();
7069
if (latestCommitTime.isPresent()) {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919

2020
package org.apache.hudi.index.bloom;
2121

22-
import org.apache.hudi.common.data.HoodieData;
2322
import org.apache.hudi.common.data.HoodiePairData;
2423
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.common.model.HoodieFileGroupId;
2525
import org.apache.hudi.common.model.HoodieKey;
2626
import org.apache.hudi.common.model.HoodieRecordLocation;
27-
import org.apache.hudi.common.util.collection.Pair;
2827
import org.apache.hudi.config.HoodieWriteConfig;
2928
import org.apache.hudi.table.HoodieTable;
3029

@@ -51,7 +50,7 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable {
5150
public abstract HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
5251
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
5352
HoodiePairData<String, String> partitionRecordKeyPairs,
54-
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
53+
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
5554
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
5655
Map<String, Long> recordsPerPartition);
5756
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.common.engine.HoodieEngineContext;
2828
import org.apache.hudi.common.fs.FSUtils;
2929
import org.apache.hudi.common.model.HoodieBaseFile;
30+
import org.apache.hudi.common.model.HoodieFileGroupId;
3031
import org.apache.hudi.common.model.HoodieKey;
3132
import org.apache.hudi.common.model.HoodieRecord;
3233
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -35,7 +36,6 @@
3536
import org.apache.hudi.common.util.collection.Pair;
3637
import org.apache.hudi.config.HoodieIndexConfig;
3738
import org.apache.hudi.config.HoodieWriteConfig;
38-
import org.apache.hudi.exception.HoodieMetadataException;
3939
import org.apache.hudi.exception.MetadataNotFoundException;
4040
import org.apache.hudi.index.HoodieIndex;
4141
import org.apache.hudi.index.HoodieIndexUtils;
@@ -45,9 +45,9 @@
4545
import org.apache.log4j.Logger;
4646

4747
import java.util.ArrayList;
48+
import java.util.Collections;
4849
import java.util.List;
4950
import java.util.Map;
50-
import java.util.stream.Collectors;
5151
import java.util.stream.Stream;
5252

5353
import static java.util.stream.Collectors.groupingBy;
@@ -127,7 +127,7 @@ private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
127127

128128
// Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
129129
// that contains it.
130-
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
130+
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs =
131131
explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);
132132

133133
return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
@@ -210,34 +210,35 @@ protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
210210
// also obtain file ranges, if range pruning is enabled
211211
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName());
212212

213-
final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
214-
return context.flatMap(partitions, partitionName -> {
215-
// Partition and file name pairs
216-
List<Pair<String, String>> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
217-
hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName()))
218-
.sorted()
219-
.collect(toList());
220-
if (partitionFileNameList.isEmpty()) {
221-
return Stream.empty();
222-
}
223-
try {
224-
Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
225-
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
226-
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
227-
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
228-
result.add(Pair.of(entry.getKey().getLeft(),
229-
new BloomIndexFileInfo(
230-
FSUtils.getFileId(entry.getKey().getRight()),
231-
// NOTE: Here we assume that the type of the primary key field is string
232-
(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
233-
(String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
234-
)));
235-
}
236-
return result.stream();
237-
} catch (MetadataNotFoundException me) {
238-
throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
239-
}
240-
}, Math.max(partitions.size(), 1));
213+
String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
214+
215+
// Partition and file name pairs
216+
List<Pair<String, String>> partitionFileNameList =
217+
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
218+
.map(partitionBaseFilePair -> Pair.of(partitionBaseFilePair.getLeft(), partitionBaseFilePair.getRight().getFileName()))
219+
.sorted()
220+
.collect(toList());
221+
222+
if (partitionFileNameList.isEmpty()) {
223+
return Collections.emptyList();
224+
}
225+
226+
Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
227+
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
228+
229+
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>(fileToColumnStatsMap.size());
230+
231+
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
232+
result.add(Pair.of(entry.getKey().getLeft(),
233+
new BloomIndexFileInfo(
234+
FSUtils.getFileId(entry.getKey().getRight()),
235+
// NOTE: Here we assume that the type of the primary key field is string
236+
(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
237+
(String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
238+
)));
239+
}
240+
241+
return result;
241242
}
242243

243244
@Override
@@ -278,7 +279,7 @@ public boolean isImplicitWithStorage() {
278279
* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on
279280
* recordKey ranges in the index info.
280281
*/
281-
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
282+
HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
282283
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
283284
HoodiePairData<String, String> partitionRecordKeyPairs) {
284285
IndexFileFilter indexFileFilter =
@@ -289,11 +290,13 @@ HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
289290
String recordKey = partitionRecordKeyPair.getRight();
290291
String partitionPath = partitionRecordKeyPair.getLeft();
291292

292-
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
293-
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
294-
new HoodieKey(recordKey, partitionPath)))
295-
.collect(Collectors.toList());
296-
}).flatMap(List::iterator);
293+
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey)
294+
.stream()
295+
.map(partitionFileIdPair ->
296+
new ImmutablePair<>(
297+
new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
298+
})
299+
.flatMapToPair(Stream::iterator);
297300
}
298301

299302
/**

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java renamed to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
package org.apache.hudi.index.bloom;
2020

2121
import org.apache.hudi.client.utils.LazyIterableIterator;
22-
import org.apache.hudi.common.model.HoodieKey;
22+
import org.apache.hudi.common.function.SerializableFunction;
23+
import org.apache.hudi.common.model.HoodieFileGroupId;
2324
import org.apache.hudi.common.util.collection.Pair;
2425
import org.apache.hudi.config.HoodieWriteConfig;
2526
import org.apache.hudi.exception.HoodieException;
@@ -28,53 +29,67 @@
2829
import org.apache.hudi.io.HoodieKeyLookupResult;
2930
import org.apache.hudi.table.HoodieTable;
3031

31-
import java.util.function.Function;
32+
import java.io.Serializable;
3233
import java.util.ArrayList;
3334
import java.util.Iterator;
3435
import java.util.List;
36+
import java.util.function.Function;
3537

3638
/**
37-
* Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files.
39+
* Function accepting a tuple of {@link HoodieFileGroupId} and a record key and producing
40+
* a list of {@link HoodieKeyLookupResult} for every file identified by the file-group ids
41+
*
42+
* @param <I> type of the tuple of {@code (HoodieFileGroupId, <record-key>)}. Note that this is
43+
* parameterized as generic such that this code could be reused for Spark as well
3844
*/
39-
public class HoodieBaseBloomIndexCheckFunction
40-
implements Function<Iterator<Pair<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
45+
public class HoodieBloomIndexCheckFunction<I>
46+
implements Function<Iterator<I>, Iterator<List<HoodieKeyLookupResult>>>, Serializable {
4147

4248
private final HoodieTable hoodieTable;
4349

4450
private final HoodieWriteConfig config;
4551

46-
public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) {
52+
private final SerializableFunction<I, HoodieFileGroupId> fileGroupIdExtractor;
53+
private final SerializableFunction<I, String> recordKeyExtractor;
54+
55+
public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable,
56+
HoodieWriteConfig config,
57+
SerializableFunction<I, HoodieFileGroupId> fileGroupIdExtractor,
58+
SerializableFunction<I, String> recordKeyExtractor) {
4759
this.hoodieTable = hoodieTable;
4860
this.config = config;
61+
this.fileGroupIdExtractor = fileGroupIdExtractor;
62+
this.recordKeyExtractor = recordKeyExtractor;
4963
}
5064

5165
@Override
52-
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
53-
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
66+
public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> fileGroupIdRecordKeyPairIterator) {
67+
return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
5468
}
5569

56-
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<HoodieKeyLookupResult>> {
70+
protected class LazyKeyCheckIterator extends LazyIterableIterator<I, List<HoodieKeyLookupResult>> {
5771

5872
private HoodieKeyLookupHandle keyLookupHandle;
5973

60-
LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
74+
LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyTripletItr) {
6175
super(filePartitionRecordKeyTripletItr);
6276
}
6377

64-
@Override
65-
protected void start() {
66-
}
67-
6878
@Override
6979
protected List<HoodieKeyLookupResult> computeNext() {
80+
7081
List<HoodieKeyLookupResult> ret = new ArrayList<>();
7182
try {
7283
// process one file in each go.
7384
while (inputItr.hasNext()) {
74-
Pair<String, HoodieKey> currentTuple = inputItr.next();
75-
String fileId = currentTuple.getLeft();
76-
String partitionPath = currentTuple.getRight().getPartitionPath();
77-
String recordKey = currentTuple.getRight().getRecordKey();
85+
I tuple = inputItr.next();
86+
87+
HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
88+
String recordKey = recordKeyExtractor.apply(tuple);
89+
90+
String fileId = fileGroupId.getFileId();
91+
String partitionPath = fileGroupId.getPartitionPath();
92+
7893
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
7994

8095
// lazily init state
@@ -100,15 +115,13 @@ protected List<HoodieKeyLookupResult> computeNext() {
100115
}
101116
} catch (Throwable e) {
102117
if (e instanceof HoodieException) {
103-
throw e;
118+
throw (HoodieException) e;
104119
}
120+
105121
throw new HoodieIndexException("Error checking bloom filter index. ", e);
106122
}
107-
return ret;
108-
}
109123

110-
@Override
111-
protected void end() {
124+
return ret;
112125
}
113126
}
114127
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hudi.common.fs.FSUtils;
2626
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
2727
import org.apache.hudi.common.model.HoodieAvroRecord;
28+
import org.apache.hudi.common.model.HoodieFileGroupId;
2829
import org.apache.hudi.common.model.HoodieKey;
2930
import org.apache.hudi.common.model.HoodieRecord;
3031
import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -41,7 +42,7 @@
4142
import java.util.Collections;
4243
import java.util.List;
4344
import java.util.Map;
44-
import java.util.stream.Collectors;
45+
import java.util.stream.Stream;
4546

4647
/**
4748
* This filter will only work with hoodie table since it will only load partitions
@@ -74,7 +75,7 @@ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(List<String> pa
7475
*/
7576

7677
@Override
77-
HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
78+
HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
7879
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
7980
HoodiePairData<String, String> partitionRecordKeyPairs) {
8081

@@ -87,10 +88,11 @@ HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
8788
String partitionPath = partitionRecordKeyPair.getLeft();
8889

8990
return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream()
90-
.map(partitionFileIdPair -> (Pair<String, HoodieKey>) new ImmutablePair<>(partitionFileIdPair.getRight(),
91-
new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
92-
.collect(Collectors.toList());
93-
}).flatMap(List::iterator);
91+
.map(partitionFileIdPair ->
92+
new ImmutablePair<>(
93+
new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
94+
})
95+
.flatMapToPair(Stream::iterator);
9496
}
9597

9698
/**

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@
1919

2020
package org.apache.hudi.index.bloom;
2121

22-
import org.apache.hudi.common.data.HoodieData;
2322
import org.apache.hudi.common.data.HoodiePairData;
2423
import org.apache.hudi.common.engine.HoodieEngineContext;
24+
import org.apache.hudi.common.model.HoodieFileGroupId;
2525
import org.apache.hudi.common.model.HoodieKey;
2626
import org.apache.hudi.common.model.HoodieRecordLocation;
27+
import org.apache.hudi.common.util.CollectionUtils;
2728
import org.apache.hudi.common.util.collection.ImmutablePair;
2829
import org.apache.hudi.common.util.collection.Pair;
2930
import org.apache.hudi.config.HoodieWriteConfig;
3031
import org.apache.hudi.io.HoodieKeyLookupResult;
3132
import org.apache.hudi.table.HoodieTable;
3233

33-
import java.util.ArrayList;
34+
import java.util.Collection;
3435
import java.util.Comparator;
35-
import java.util.Iterator;
3636
import java.util.List;
3737
import java.util.Map;
3838

@@ -56,21 +56,21 @@ public static ListBasedHoodieBloomIndexHelper getInstance() {
5656
public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
5757
HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
5858
HoodiePairData<String, String> partitionRecordKeyPairs,
59-
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
59+
HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
6060
Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
61-
List<Pair<String, HoodieKey>> fileComparisonPairList =
61+
List<Pair<HoodieFileGroupId, String>> fileComparisonPairList =
6262
fileComparisonPairs.collectAsList().stream()
6363
.sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
6464

65-
List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
66-
Iterator<List<HoodieKeyLookupResult>> iterator = new HoodieBaseBloomIndexCheckFunction(
67-
hoodieTable, config).apply(fileComparisonPairList.iterator());
68-
while (iterator.hasNext()) {
69-
keyLookupResults.addAll(iterator.next());
70-
}
65+
List<HoodieKeyLookupResult> keyLookupResults =
66+
CollectionUtils.toStream(
67+
new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
68+
.apply(fileComparisonPairList.iterator())
69+
)
70+
.flatMap(Collection::stream)
71+
.filter(lr -> lr.getMatchingRecordKeys().size() > 0)
72+
.collect(toList());
7173

72-
keyLookupResults = keyLookupResults.stream().filter(
73-
lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
7474
return context.parallelize(keyLookupResults).flatMap(lookupResult ->
7575
lookupResult.getMatchingRecordKeys().stream()
7676
.map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()

hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hudi.common.data.HoodieListPairData;
2525
import org.apache.hudi.common.engine.EngineType;
2626
import org.apache.hudi.common.model.HoodieAvroRecord;
27+
import org.apache.hudi.common.model.HoodieFileGroupId;
2728
import org.apache.hudi.common.model.HoodieKey;
2829
import org.apache.hudi.common.model.HoodieRecord;
2930
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -186,11 +187,14 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea
186187
partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
187188
});
188189

189-
List<Pair<String, HoodieKey>> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
190+
List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
191+
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
190192

191193
assertEquals(10, comparisonKeyList.size());
192194
java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
193-
.collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList())));
195+
.collect(
196+
java.util.stream.Collectors.groupingBy(t -> t.getRight(),
197+
java.util.stream.Collectors.mapping(t -> t.getLeft().getFileId(), java.util.stream.Collectors.toList())));
194198

195199
assertEquals(4, recordKeyToFileComps.size());
196200
assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002")));

0 commit comments

Comments
 (0)