diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 8a8a03e1b17ac..2ec46f19a43cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -62,9 +62,8 @@ public class HoodieIndexUtils { * @param hoodieTable Instance of {@link HoodieTable} of interest * @return the list of {@link HoodieBaseFile} */ - public static List getLatestBaseFilesForPartition( - final String partition, - final HoodieTable hoodieTable) { + public static List getLatestBaseFilesForPartition(String partition, + HoodieTable hoodieTable) { Option latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline() .filterCompletedInstants().lastInstant(); if (latestCommitTime.isPresent()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java index 9430d9bb5e50b..f144540ed2280 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java @@ -19,12 +19,11 @@ package org.apache.hudi.index.bloom; -import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -51,7 +50,7 @@ public abstract class BaseHoodieBloomIndexHelper implements Serializable { public abstract HoodiePairData findMatchingFilesForRecordKeys( HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData partitionRecordKeyPairs, - HoodieData> fileComparisonPairs, + HoodiePairData fileComparisonPairs, Map> partitionToFileInfo, Map recordsPerPartition); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index 57d9def9b4201..cf067c3a99101 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -35,7 +36,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.MetadataNotFoundException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndexUtils; @@ -45,9 +45,9 @@ import org.apache.log4j.Logger; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.groupingBy; @@ -127,7 +127,7 @@ private HoodiePairData lookupIndex( // Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id, // that contains it. - HoodieData> fileComparisonPairs = + HoodiePairData fileComparisonPairs = explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs); return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable, @@ -210,34 +210,35 @@ protected List> loadColumnRangesFromMetaIndex( // also obtain file ranges, if range pruning is enabled context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName()); - final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); - return context.flatMap(partitions, partitionName -> { - // Partition and file name pairs - List> partitionFileNameList = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName, - hoodieTable).stream().map(baseFile -> Pair.of(partitionName, baseFile.getFileName())) - .sorted() - .collect(toList()); - if (partitionFileNameList.isEmpty()) { - return Stream.empty(); - } - try { - Map, HoodieMetadataColumnStats> fileToColumnStatsMap = - hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField); - List> result = new ArrayList<>(); - for (Map.Entry, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) { - result.add(Pair.of(entry.getKey().getLeft(), - new BloomIndexFileInfo( - FSUtils.getFileId(entry.getKey().getRight()), - // NOTE: Here we assume that the type of the primary key field is string - (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()), - (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue()) - ))); - } - return result.stream(); - } catch (MetadataNotFoundException me) { - throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me); - } - }, Math.max(partitions.size(), 1)); + String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); + + // Partition and file name pairs + List> partitionFileNameList = + HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() + .map(partitionBaseFilePair -> Pair.of(partitionBaseFilePair.getLeft(), partitionBaseFilePair.getRight().getFileName())) + .sorted() + .collect(toList()); + + if (partitionFileNameList.isEmpty()) { + return Collections.emptyList(); + } + + Map, HoodieMetadataColumnStats> fileToColumnStatsMap = + hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField); + + List> result = new ArrayList<>(fileToColumnStatsMap.size()); + + for (Map.Entry, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) { + result.add(Pair.of(entry.getKey().getLeft(), + new BloomIndexFileInfo( + FSUtils.getFileId(entry.getKey().getRight()), + // NOTE: Here we assume that the type of the primary key field is string + (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()), + (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue()) + ))); + } + + return result; } @Override @@ -278,7 +279,7 @@ public boolean isImplicitWithStorage() { * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on * recordKey ranges in the index info. */ - HoodieData> explodeRecordsWithFileComparisons( + HoodiePairData explodeRecordsWithFileComparisons( final Map> partitionToFileIndexInfo, HoodiePairData partitionRecordKeyPairs) { IndexFileFilter indexFileFilter = @@ -289,11 +290,13 @@ HoodieData> explodeRecordsWithFileComparisons( String recordKey = partitionRecordKeyPair.getRight(); String partitionPath = partitionRecordKeyPair.getLeft(); - return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream() - .map(partitionFileIdPair -> (Pair) new ImmutablePair<>(partitionFileIdPair.getRight(), - new HoodieKey(recordKey, partitionPath))) - .collect(Collectors.toList()); - }).flatMap(List::iterator); + return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey) + .stream() + .map(partitionFileIdPair -> + new ImmutablePair<>( + new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey)); + }) + .flatMapToPair(Stream::iterator); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java similarity index 61% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java index 80031f4e8f025..52b504e9ab168 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java @@ -19,7 +19,8 @@ package org.apache.hudi.index.bloom; import org.apache.hudi.client.utils.LazyIterableIterator; -import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -28,53 +29,67 @@ import org.apache.hudi.io.HoodieKeyLookupResult; import org.apache.hudi.table.HoodieTable; -import java.util.function.Function; +import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.function.Function; /** - * Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files. + * Function accepting a tuple of {@link HoodieFileGroupId} and a record key and producing + * a list of {@link HoodieKeyLookupResult} for every file identified by the file-group ids + * + * @param type of the tuple of {@code (HoodieFileGroupId, )}. Note that this is + * parameterized as generic such that this code could be reused for Spark as well */ -public class HoodieBaseBloomIndexCheckFunction - implements Function>, Iterator>> { +public class HoodieBloomIndexCheckFunction + implements Function, Iterator>>, Serializable { private final HoodieTable hoodieTable; private final HoodieWriteConfig config; - public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) { + private final SerializableFunction fileGroupIdExtractor; + private final SerializableFunction recordKeyExtractor; + + public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable, + HoodieWriteConfig config, + SerializableFunction fileGroupIdExtractor, + SerializableFunction recordKeyExtractor) { this.hoodieTable = hoodieTable; this.config = config; + this.fileGroupIdExtractor = fileGroupIdExtractor; + this.recordKeyExtractor = recordKeyExtractor; } @Override - public Iterator> apply(Iterator> filePartitionRecordKeyTripletItr) { - return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr); + public Iterator> apply(Iterator fileGroupIdRecordKeyPairIterator) { + return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator); } - class LazyKeyCheckIterator extends LazyIterableIterator, List> { + protected class LazyKeyCheckIterator extends LazyIterableIterator> { private HoodieKeyLookupHandle keyLookupHandle; - LazyKeyCheckIterator(Iterator> filePartitionRecordKeyTripletItr) { + LazyKeyCheckIterator(Iterator filePartitionRecordKeyTripletItr) { super(filePartitionRecordKeyTripletItr); } - @Override - protected void start() { - } - @Override protected List computeNext() { + List ret = new ArrayList<>(); try { // process one file in each go. while (inputItr.hasNext()) { - Pair currentTuple = inputItr.next(); - String fileId = currentTuple.getLeft(); - String partitionPath = currentTuple.getRight().getPartitionPath(); - String recordKey = currentTuple.getRight().getRecordKey(); + I tuple = inputItr.next(); + + HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple); + String recordKey = recordKeyExtractor.apply(tuple); + + String fileId = fileGroupId.getFileId(); + String partitionPath = fileGroupId.getPartitionPath(); + Pair partitionPathFilePair = Pair.of(partitionPath, fileId); // lazily init state @@ -100,15 +115,13 @@ protected List computeNext() { } } catch (Throwable e) { if (e instanceof HoodieException) { - throw e; + throw (HoodieException) e; } + throw new HoodieIndexException("Error checking bloom filter index. ", e); } - return ret; - } - @Override - protected void end() { + return ret; } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java index 5f2007ea53668..b5604312d3f7b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -41,7 +42,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.stream.Stream; /** * This filter will only work with hoodie table since it will only load partitions @@ -74,7 +75,7 @@ List> loadColumnRangesFromFiles(List pa */ @Override - HoodieData> explodeRecordsWithFileComparisons( + HoodiePairData explodeRecordsWithFileComparisons( final Map> partitionToFileIndexInfo, HoodiePairData partitionRecordKeyPairs) { @@ -87,10 +88,11 @@ HoodieData> explodeRecordsWithFileComparisons( String partitionPath = partitionRecordKeyPair.getLeft(); return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream() - .map(partitionFileIdPair -> (Pair) new ImmutablePair<>(partitionFileIdPair.getRight(), - new HoodieKey(recordKey, partitionFileIdPair.getLeft()))) - .collect(Collectors.toList()); - }).flatMap(List::iterator); + .map(partitionFileIdPair -> + new ImmutablePair<>( + new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey)); + }) + .flatMapToPair(Stream::iterator); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java index cffee5ee74081..b47f5cf066c54 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java @@ -19,20 +19,20 @@ package org.apache.hudi.index.bloom; -import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.HoodieKeyLookupResult; import org.apache.hudi.table.HoodieTable; -import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,21 +56,21 @@ public static ListBasedHoodieBloomIndexHelper getInstance() { public HoodiePairData findMatchingFilesForRecordKeys( HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData partitionRecordKeyPairs, - HoodieData> fileComparisonPairs, + HoodiePairData fileComparisonPairs, Map> partitionToFileInfo, Map recordsPerPartition) { - List> fileComparisonPairList = + List> fileComparisonPairList = fileComparisonPairs.collectAsList().stream() .sorted(Comparator.comparing(Pair::getLeft)).collect(toList()); - List keyLookupResults = new ArrayList<>(); - Iterator> iterator = new HoodieBaseBloomIndexCheckFunction( - hoodieTable, config).apply(fileComparisonPairList.iterator()); - while (iterator.hasNext()) { - keyLookupResults.addAll(iterator.next()); - } + List keyLookupResults = + CollectionUtils.toStream( + new HoodieBloomIndexCheckFunction>(hoodieTable, config, Pair::getLeft, Pair::getRight) + .apply(fileComparisonPairList.iterator()) + ) + .flatMap(Collection::stream) + .filter(lr -> lr.getMatchingRecordKeys().size() > 0) + .collect(toList()); - keyLookupResults = keyLookupResults.stream().filter( - lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList()); return context.parallelize(keyLookupResults).flatMap(lookupResult -> lookupResult.getMatchingRecordKeys().stream() .map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator() diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index 6dd5a1c27e212..d4b4007bedb19 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodieListPairData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -186,11 +187,14 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea partitionRecordKeyMap.put(t.getLeft(), recordKeyList); }); - List> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList(); + List> comparisonKeyList = + index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList(); assertEquals(10, comparisonKeyList.size()); java.util.Map> recordKeyToFileComps = comparisonKeyList.stream() - .collect(java.util.stream.Collectors.groupingBy(t -> t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> t.getLeft(), java.util.stream.Collectors.toList()))); + .collect( + java.util.stream.Collectors.groupingBy(t -> t.getRight(), + java.util.stream.Collectors.mapping(t -> t.getLeft().getFileId(), java.util.stream.Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002"))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java index 9ec3c4cf71592..9019fb43ff058 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java @@ -121,6 +121,11 @@ public HoodieData map(SerializableFunction, O> func) { tuple -> func.apply(new ImmutablePair<>(tuple._1, tuple._2)))); } + @Override + public HoodiePairData mapValues(SerializableFunction func) { + return HoodieJavaPairRDD.of(pairRDDData.mapValues(func::apply)); + } + @Override public HoodiePairData mapToPair(SerializablePairFunction, L, W> mapToPairFunc) { return HoodieJavaPairRDD.of(pairRDDData.mapToPair(pair -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java index ed9613bc15fe6..6ed3a854962b8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -24,17 +24,16 @@ import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.collection.MappingIterator; import org.apache.hudi.common.util.collection.Pair; - import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.storage.StorageLevel; +import scala.Tuple2; import java.util.Iterator; import java.util.List; -import scala.Tuple2; - /** * Holds a {@link JavaRDD} of objects. * @@ -119,9 +118,18 @@ public HoodieData mapPartitions(SerializableFunction, Iterato @Override public HoodieData flatMap(SerializableFunction> func) { + // NOTE: Unrolling this lambda into a method reference results in [[ClassCastException]] + // due to weird interop b/w Scala and Java return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e))); } + @Override + public HoodiePairData flatMapToPair(SerializableFunction>> func) { + return HoodieJavaPairRDD.of( + rddData.flatMapToPair(e -> + new MappingIterator<>(func.apply(e), p -> new Tuple2<>(p.getKey(), p.getValue())))); + } + @Override public HoodiePairData mapToPair(SerializablePairFunction func) { return HoodieJavaPairRDD.of(rddData.mapToPair(input -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java index 36710dc02bb9b..48099220dbf92 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java @@ -18,6 +18,7 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.util.NumericUtils; import org.apache.hudi.common.util.collection.Pair; @@ -63,7 +64,7 @@ public class BucketizedBloomCheckPartitioner extends Partitioner { /** * Stores the final mapping of a file group to a list of partitions for its keys. */ - private Map> fileGroupToPartitions; + private Map> fileGroupToPartitions; /** * Create a partitioner that computes a plan based on provided workload characteristics. @@ -72,11 +73,11 @@ public class BucketizedBloomCheckPartitioner extends Partitioner { * @param fileGroupToComparisons number of expected comparisons per file group * @param keysPerBucket maximum number of keys to pack in a single bucket */ - public BucketizedBloomCheckPartitioner(int targetPartitions, Map fileGroupToComparisons, + public BucketizedBloomCheckPartitioner(int targetPartitions, Map fileGroupToComparisons, int keysPerBucket) { this.fileGroupToPartitions = new HashMap<>(); - Map bucketsPerFileGroup = new HashMap<>(); + Map bucketsPerFileGroup = new HashMap<>(); // Compute the buckets needed per file group, using simple uniform distribution fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int) Math.ceil((c * 1.0) / keysPerBucket))); int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> i).sum(); @@ -90,9 +91,9 @@ public BucketizedBloomCheckPartitioner(int targetPartitions, Map f int minBucketsPerPartition = Math.max((int) Math.floor((1.0 * totalBuckets) / partitions), 1); LOG.info(String.format("TotalBuckets %d, min_buckets/partition %d", totalBuckets, minBucketsPerPartition)); int[] bucketsFilled = new int[partitions]; - Map bucketsFilledPerFileGroup = new HashMap<>(); + Map bucketsFilledPerFileGroup = new HashMap<>(); int partitionIndex = 0; - for (Map.Entry e : bucketsPerFileGroup.entrySet()) { + for (Map.Entry e : bucketsPerFileGroup.entrySet()) { for (int b = 0; b < Math.max(1, e.getValue() - 1); b++) { // keep filled counts upto date bucketsFilled[partitionIndex]++; @@ -115,7 +116,7 @@ public BucketizedBloomCheckPartitioner(int targetPartitions, Map f // PHASE 2 : for remaining unassigned buckets, round robin over partitions once. Since we withheld 1 bucket from // each file group uniformly, this remaining is also an uniform mix across file groups. We just round robin to // optimize for goal 2. - for (Map.Entry e : bucketsPerFileGroup.entrySet()) { + for (Map.Entry e : bucketsPerFileGroup.entrySet()) { int remaining = e.getValue() - bucketsFilledPerFileGroup.get(e.getKey()).intValue(); for (int r = 0; r < remaining; r++) { // mark this partition against the file group @@ -142,7 +143,8 @@ public int numPartitions() { @Override public int getPartition(Object key) { - final Pair parts = (Pair) key; + final Pair parts = (Pair) key; + // TODO replace w/ more performant hash final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", parts.getRight()); final List candidatePartitions = fileGroupToPartitions.get(parts.getLeft()); final int idx = (int) Math.floorMod((int) hashOfKey, candidatePartitions.size()); @@ -150,7 +152,7 @@ public int getPartition(Object key) { return candidatePartitions.get(idx); } - Map> getFileGroupToPartitions() { + Map> getFileGroupToPartitions() { return fileGroupToPartitions; } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java new file mode 100644 index 0000000000000..c124f8b27b80f --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.bloom; + +import java.util.List; + +class HoodieBloomFilterProbingResult { + + private final List candidateKeys; + + HoodieBloomFilterProbingResult(List candidateKeys) { + this.candidateKeys = candidateKeys; + } + + public List getCandidateKeys() { + return candidateKeys; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java deleted file mode 100644 index e19a429ea7234..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.index.bloom; - -import org.apache.hudi.client.utils.LazyIterableIterator; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIndexException; -import org.apache.hudi.io.HoodieKeyLookupHandle; -import org.apache.hudi.io.HoodieKeyLookupResult; -import org.apache.hudi.table.HoodieTable; - -import org.apache.spark.api.java.function.Function2; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import scala.Tuple2; - -/** - * Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the actual files. - */ -public class HoodieBloomIndexCheckFunction - implements Function2>, Iterator>> { - - private final HoodieTable hoodieTable; - - private final HoodieWriteConfig config; - - public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) { - this.hoodieTable = hoodieTable; - this.config = config; - } - - @Override - public Iterator> call(Integer partition, - Iterator> filePartitionRecordKeyTripletItr) { - return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr); - } - - class LazyKeyCheckIterator extends LazyIterableIterator, List> { - - private HoodieKeyLookupHandle keyLookupHandle; - - LazyKeyCheckIterator(Iterator> filePartitionRecordKeyTripletItr) { - super(filePartitionRecordKeyTripletItr); - } - - @Override - protected void start() { - } - - @Override - protected List computeNext() { - - List ret = new ArrayList<>(); - try { - // process one file in each go. - while (inputItr.hasNext()) { - Tuple2 currentTuple = inputItr.next(); - String fileId = currentTuple._1; - String partitionPath = currentTuple._2.getPartitionPath(); - String recordKey = currentTuple._2.getRecordKey(); - Pair partitionPathFilePair = Pair.of(partitionPath, fileId); - - // lazily init state - if (keyLookupHandle == null) { - keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); - } - - // if continue on current file - if (keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) { - keyLookupHandle.addKey(recordKey); - } else { - // do the actual checking of file & break out - ret.add(keyLookupHandle.getLookupResult()); - keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); - keyLookupHandle.addKey(recordKey); - break; - } - } - - // handle case, where we ran out of input, close pending work, update return val - if (!inputItr.hasNext()) { - ret.add(keyLookupHandle.getLookupResult()); - } - } catch (Throwable e) { - if (e instanceof HoodieException) { - throw e; - } - throw new HoodieIndexException("Error checking bloom filter index. ", e); - } - - return ret; - } - - @Override - protected void end() { - } - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java new file mode 100644 index 0000000000000..0809042c9fbbf --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.io.HoodieKeyLookupResult; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import scala.Tuple2; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Implementation of the function probing filtered in candidate keys provided in + * {@link HoodieBloomFilterProbingResult} w/in corresponding files identified by {@link HoodieFileGroupId} + * to validate whether the record w/ the provided key is indeed persisted in it + */ +public class HoodieFileProbingFunction implements + FlatMapFunction>, List> { + + private static final Logger LOG = LogManager.getLogger(HoodieFileProbingFunction.class); + + // Assuming each file bloom filter takes up 512K, sizing the max file count + // per batch so that the total fetched bloom filters would not cross 128 MB. + private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256; + + private final Broadcast baseFileOnlyViewBroadcast; + private final SerializableConfiguration hadoopConf; + + public HoodieFileProbingFunction(Broadcast baseFileOnlyViewBroadcast, + SerializableConfiguration hadoopConf) { + this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast; + this.hadoopConf = hadoopConf; + } + + @Override + public Iterator> call(Iterator> tuple2Iterator) throws Exception { + return new BloomIndexLazyKeyCheckIterator(tuple2Iterator); + } + + private class BloomIndexLazyKeyCheckIterator + extends LazyIterableIterator, List> { + + public BloomIndexLazyKeyCheckIterator(Iterator> tuple2Iterator) { + super(tuple2Iterator); + } + + @Override + protected List computeNext() { + // Partition path and file name pair to list of keys + final Map, HoodieBloomFilterProbingResult> fileToLookupResults = new HashMap<>(); + final Map fileIDBaseFileMap = new HashMap<>(); + + while (inputItr.hasNext()) { + Tuple2 entry = inputItr.next(); + final String partitionPath = entry._1.getPartitionPath(); + final String fileId = entry._1.getFileId(); + + if (!fileIDBaseFileMap.containsKey(fileId)) { + Option baseFile = + baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId); + if (!baseFile.isPresent()) { + throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath + + ", fileId: " + fileId); + } + + fileIDBaseFileMap.put(fileId, baseFile.get()); + } + + fileToLookupResults.putIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), entry._2); + + if (fileToLookupResults.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) { + break; + } + } + + if (fileToLookupResults.isEmpty()) { + return Collections.emptyList(); + } + + return fileToLookupResults.entrySet().stream() + .map(entry -> { + Pair partitionPathFileNamePair = entry.getKey(); + HoodieBloomFilterProbingResult bloomFilterKeyLookupResult = entry.getValue(); + + final String partitionPath = partitionPathFileNamePair.getLeft(); + final String fileName = partitionPathFileNamePair.getRight(); + final String fileId = FSUtils.getFileId(fileName); + ValidationUtils.checkState(!fileId.isEmpty()); + + List candidateRecordKeys = bloomFilterKeyLookupResult.getCandidateKeys(); + + // TODO add assertion that file is checked only once + + final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId); + List matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), + candidateRecordKeys, hadoopConf.get()); + + LOG.debug( + String.format("Bloom filter candidates (%d) / false positives (%d), actual matches (%d)", + candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size())); + + return new HoodieKeyLookupResult(fileId, partitionPath, dataFile.getCommitTime(), matchingKeys); + }) + .collect(Collectors.toList()); + } + + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java new file mode 100644 index 0000000000000..406be81650057 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.FlatteningIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Implementation of the function that probing Bloom Filters of individual files verifying + * whether particular record key could be stored in the latest file-slice of the file-group + * identified by the {@link HoodieFileGroupId} + */ +public class HoodieMetadataBloomFilterProbingFunction implements + PairFlatMapFunction>, HoodieFileGroupId, HoodieBloomFilterProbingResult> { + + private static final Logger LOG = LogManager.getLogger(HoodieMetadataBloomFilterProbingFunction.class); + + // Assuming each file bloom filter takes up 512K, sizing the max file count + // per batch so that the total fetched bloom filters would not cross 128 MB. + private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256; + private final HoodieTable hoodieTable; + + private final Broadcast baseFileOnlyViewBroadcast; + + /** + * NOTE: It's critical for this ctor to accept {@link HoodieTable} to make sure that it uses + * broadcast-ed instance of {@link HoodieBackedTableMetadata} internally, instead of + * one being serialized and deserialized for _every_ task individually + * + * NOTE: We pass in broadcasted {@link HoodieTableFileSystemView} to make sure it's materialized + * on executor once + */ + public HoodieMetadataBloomFilterProbingFunction(Broadcast baseFileOnlyViewBroadcast, + HoodieTable hoodieTable) { + this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast; + this.hoodieTable = hoodieTable; + } + + @Override + public Iterator> call(Iterator> tuple2Iterator) throws Exception { + return new FlatteningIterator<>(new BloomIndexLazyKeyCheckIterator(tuple2Iterator)); + } + + private class BloomIndexLazyKeyCheckIterator + extends LazyIterableIterator, Iterator>> { + + public BloomIndexLazyKeyCheckIterator(Iterator> tuple2Iterator) { + super(tuple2Iterator); + } + + @Override + protected Iterator> computeNext() { + // Partition path and file name pair to list of keys + final Map, List> fileToKeysMap = new HashMap<>(); + final Map fileIDBaseFileMap = new HashMap<>(); + + while (inputItr.hasNext()) { + Tuple2 entry = inputItr.next(); + String partitionPath = entry._1.getPartitionPath(); + String fileId = entry._1.getFileId(); + + if (!fileIDBaseFileMap.containsKey(fileId)) { + Option baseFile = baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId); + if (!baseFile.isPresent()) { + throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath + + ", fileId: " + fileId); + } + fileIDBaseFileMap.put(fileId, baseFile.get()); + } + + fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), + k -> new ArrayList<>()).add(new HoodieKey(entry._2, partitionPath)); + + if (fileToKeysMap.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) { + break; + } + } + + if (fileToKeysMap.isEmpty()) { + return Collections.emptyIterator(); + } + + List> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet()); + Map, BloomFilter> fileToBloomFilterMap = + hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList); + + return fileToKeysMap.entrySet().stream() + .map(entry -> { + Pair partitionPathFileNamePair = entry.getKey(); + List hoodieKeyList = entry.getValue(); + + final String partitionPath = partitionPathFileNamePair.getLeft(); + final String fileName = partitionPathFileNamePair.getRight(); + final String fileId = FSUtils.getFileId(fileName); + ValidationUtils.checkState(!fileId.isEmpty()); + + if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) { + throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair); + } + final BloomFilter fileBloomFilter = fileToBloomFilterMap.get(partitionPathFileNamePair); + + List candidateRecordKeys = new ArrayList<>(); + hoodieKeyList.forEach(hoodieKey -> { + if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) { + candidateRecordKeys.add(hoodieKey.getRecordKey()); + } + }); + + LOG.debug(String.format("Total records (%d), bloom filter candidates (%d)", + hoodieKeyList.size(), candidateRecordKeys.size())); + + return Tuple2.apply(new HoodieFileGroupId(partitionPath, fileId), new HoodieBloomFilterProbingResult(candidateRecordKeys)); + }) + .iterator(); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java deleted file mode 100644 index 8a2958eab9da8..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.index.bloom; - -import org.apache.hadoop.fs.Path; -import org.apache.hudi.client.utils.LazyIterableIterator; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieIndexException; -import org.apache.hudi.index.HoodieIndexUtils; -import org.apache.hudi.io.HoodieKeyLookupResult; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.function.Function2; -import scala.Tuple2; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Spark Function2 implementation for checking bloom filters for the - * requested keys from the metadata table index. The bloom filter - * checking for keys and the actual file verification for the - * candidate keys is done in an iterative fashion. In each iteration, - * bloom filters are requested for a batch of partition files and the - * keys are checked against them. - */ -public class HoodieMetadataBloomIndexCheckFunction implements - Function2>, Iterator>> { - - private static final Logger LOG = LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class); - - // Assuming each file bloom filter takes up 512K, sizing the max file count - // per batch so that the total fetched bloom filters would not cross 128 MB. - private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256; - private final HoodieTable hoodieTable; - - public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable) { - this.hoodieTable = hoodieTable; - } - - @Override - public Iterator> call(Integer integer, Iterator> tuple2Iterator) throws Exception { - return new BloomIndexLazyKeyCheckIterator(tuple2Iterator); - } - - private class BloomIndexLazyKeyCheckIterator extends LazyIterableIterator, List> { - public BloomIndexLazyKeyCheckIterator(Iterator> tuple2Iterator) { - super(tuple2Iterator); - } - - @Override - protected void start() { - } - - @Override - protected List computeNext() { - // Partition path and file name pair to list of keys - final Map, List> fileToKeysMap = new HashMap<>(); - final Map fileIDBaseFileMap = new HashMap<>(); - final List resultList = new ArrayList<>(); - - while (inputItr.hasNext()) { - Tuple2 entry = inputItr.next(); - final String partitionPath = entry._2.getPartitionPath(); - final String fileId = entry._1; - if (!fileIDBaseFileMap.containsKey(fileId)) { - Option baseFile = hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId); - if (!baseFile.isPresent()) { - throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath - + ", fileId: " + fileId); - } - fileIDBaseFileMap.put(fileId, baseFile.get()); - } - fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), - k -> new ArrayList<>()).add(entry._2); - if (fileToKeysMap.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) { - break; - } - } - if (fileToKeysMap.isEmpty()) { - return Collections.emptyList(); - } - - List> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet()); - Map, BloomFilter> fileToBloomFilterMap = - hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList); - - final AtomicInteger totalKeys = new AtomicInteger(0); - fileToKeysMap.forEach((partitionPathFileNamePair, hoodieKeyList) -> { - final String partitionPath = partitionPathFileNamePair.getLeft(); - final String fileName = partitionPathFileNamePair.getRight(); - final String fileId = FSUtils.getFileId(fileName); - ValidationUtils.checkState(!fileId.isEmpty()); - - if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) { - throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair); - } - final BloomFilter fileBloomFilter = fileToBloomFilterMap.get(partitionPathFileNamePair); - - List candidateRecordKeys = new ArrayList<>(); - hoodieKeyList.forEach(hoodieKey -> { - totalKeys.incrementAndGet(); - if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) { - candidateRecordKeys.add(hoodieKey.getRecordKey()); - } - }); - - final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId); - List matchingKeys = - HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), candidateRecordKeys, - hoodieTable.getHadoopConf()); - LOG.debug( - String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", - hoodieKeyList.size(), candidateRecordKeys.size(), - candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size())); - - resultList.add(new HoodieKeyLookupResult(fileId, partitionPath, dataFile.getCommitTime(), matchingKeys)); - }); - return resultList; - } - - @Override - protected void end() { - } - } -} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java index 5736024dc2455..265b0507768ff 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java @@ -19,30 +19,47 @@ package org.apache.hudi.index.bloom; -import org.apache.hudi.common.data.HoodieData; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.hash.FileIndexID; +import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.HoodieKeyLookupResult; import org.apache.hudi.table.HoodieTable; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import scala.Tuple2; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import scala.Tuple2; - +import static org.apache.hudi.metadata.HoodieMetadataPayload.getBloomFilterIndexKey; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex; import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; /** @@ -55,8 +72,7 @@ public class SparkHoodieBloomIndexHelper extends BaseHoodieBloomIndexHelper { private static final SparkHoodieBloomIndexHelper SINGLETON_INSTANCE = new SparkHoodieBloomIndexHelper(); - private SparkHoodieBloomIndexHelper() { - } + private SparkHoodieBloomIndexHelper() {} public static SparkHoodieBloomIndexHelper getInstance() { return SINGLETON_INSTANCE; @@ -66,42 +82,92 @@ public static SparkHoodieBloomIndexHelper getInstance() { public HoodiePairData findMatchingFilesForRecordKeys( HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable, HoodiePairData partitionRecordKeyPairs, - HoodieData> fileComparisonPairs, + HoodiePairData fileComparisonPairs, Map> partitionToFileInfo, Map recordsPerPartition) { - JavaRDD> fileComparisonsRDD = - HoodieJavaRDD.getJavaRDD(fileComparisonPairs) - .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight())); - int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size(); - int joinParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); - LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${" - + config.getBloomIndexParallelism() + "}"); + int inputParallelism = HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).getNumPartitions(); + int configuredBloomIndexParallelism = config.getBloomIndexParallelism(); + // NOTE: Target parallelism could be overridden by the config + int targetParallelism = + configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism : inputParallelism; + + LOG.info(String.format("Input parallelism: %d, Index parallelism: %d", inputParallelism, targetParallelism)); + + JavaPairRDD fileComparisonsRDD = HoodieJavaRDD.getJavaRDD(fileComparisonPairs); JavaRDD> keyLookupResultRDD; + if (config.getBloomIndexUseMetadata() && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions() .contains(BLOOM_FILTERS.getPartitionPath())) { - // Step 1: Sort by file id - JavaRDD> sortedFileIdAndKeyPairs = - fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism); + SerializableConfiguration hadoopConf = new SerializableConfiguration(hoodieTable.getHadoopConf()); + + HoodieTableFileSystemView baseFileOnlyView = + getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet()); + + Broadcast baseFileOnlyViewBroadcast = + ((HoodieSparkEngineContext) context).getJavaSparkContext().broadcast(baseFileOnlyView); + + // When leveraging MT we're aiming for following goals: + // - (G1) All requests to MT are made in batch (ie we're trying to fetch all the values + // for corresponding keys at once) + // - (G2) Each task reads no more than just _one_ file-group from the MT Bloom Filters + // partition + // + // Ta achieve G2, following invariant have to be maintained: Spark partitions have to be + // affine w/ Metadata Table's file-groups, meaning that each Spark partition holds records + // belonging to one and only file-group in MT Bloom Filters partition. To provide for that + // we need to make sure + // - Spark's used [[Partitioner]] employs same hashing function as Metadata Table (as well + // as being applied to the same keys as the MT one) + // - Make sure that # of partitions is congruent to the # of file-groups (ie number of Spark + // partitions is a multiple of the # of the file-groups). + // + // Last provision is necessary, so that for every key it's the case that + // + // (hash(key) % N) % M = hash(key) % M, iff N % M = 0 + // + // Let's take an example of N = 8 and M = 4 (default # of file-groups in Bloom Filter + // partition). In that case Spark partitions for which `hash(key) % N` will be either 0 + // or 4, will map to the same (first) file-group in MT + int bloomFilterPartitionFileGroupCount = + config.getMetadataConfig().getBloomFilterIndexFileGroupCount(); + int adjustedTargetParallelism = + targetParallelism % bloomFilterPartitionFileGroupCount == 0 + ? targetParallelism + // NOTE: We add 1 to make sure parallelism a) value always stays positive and b) + // {@code targetParallelism <= adjustedTargetParallelism} + : (targetParallelism / bloomFilterPartitionFileGroupCount + 1) * bloomFilterPartitionFileGroupCount; + + AffineBloomIndexFileGroupPartitioner partitioner = + new AffineBloomIndexFileGroupPartitioner(baseFileOnlyViewBroadcast, adjustedTargetParallelism); + + // First, we need to repartition and sort records using [[AffineBloomIndexFileGroupPartitioner]] + // to make sure every Spark task accesses no more than just a single file-group in MT (allows + // us to achieve G2). + // + // NOTE: Sorting records w/in individual partitions is required to make sure that we cluster + // together keys co-located w/in the MT files (sorted by keys) + keyLookupResultRDD = fileComparisonsRDD.repartitionAndSortWithinPartitions(partitioner) + .mapPartitionsToPair(new HoodieMetadataBloomFilterProbingFunction(baseFileOnlyViewBroadcast, hoodieTable)) + // Second, we use [[HoodieFileProbingFunction]] to open actual file and check whether it + // contains the records with candidate keys that were filtered in by the Bloom Filter + .mapPartitions(new HoodieFileProbingFunction(baseFileOnlyViewBroadcast, hadoopConf), true); - // Step 2: Use bloom filter to filter and the actual log file to get the record location - keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex( - new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true); } else if (config.useBloomIndexBucketizedChecking()) { - Map comparisonsPerFileGroup = computeComparisonsPerFileGroup( + Map comparisonsPerFileGroup = computeComparisonsPerFileGroup( config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, context); - Partitioner partitioner = new BucketizedBloomCheckPartitioner(joinParallelism, comparisonsPerFileGroup, + Partitioner partitioner = new BucketizedBloomCheckPartitioner(targetParallelism, comparisonsPerFileGroup, config.getBloomIndexKeysPerBucket()); - keyLookupResultRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t)) + keyLookupResultRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2), t)) .repartitionAndSortWithinPartitions(partitioner) .map(Tuple2::_2) - .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true); + .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); } else { - keyLookupResultRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism) - .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true); + keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, targetParallelism) + .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, config), true); } return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator) @@ -115,27 +181,124 @@ public HoodiePairData findMatchingFilesForRecor /** * Compute the estimated number of bloom filter comparisons to be performed on each file group. */ - private Map computeComparisonsPerFileGroup( + private Map computeComparisonsPerFileGroup( final HoodieWriteConfig config, final Map recordsPerPartition, final Map> partitionToFileInfo, - final JavaRDD> fileComparisonsRDD, + final JavaPairRDD fileComparisonsRDD, final HoodieEngineContext context) { - Map fileToComparisons; + Map fileToComparisons; if (config.getBloomIndexPruneByRanges()) { // we will just try exploding the input and then count to determine comparisons // FIX(vc): Only do sampling here and extrapolate? context.setJobStatus(this.getClass().getSimpleName(), "Compute all comparisons needed between records and files: " + config.getTableName()); - fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey(); + fileToComparisons = fileComparisonsRDD.countByKey(); } else { fileToComparisons = new HashMap<>(); - partitionToFileInfo.forEach((key, value) -> { - for (BloomIndexFileInfo fileInfo : value) { + partitionToFileInfo.forEach((partitionPath, fileInfos) -> { + for (BloomIndexFileInfo fileInfo : fileInfos) { // each file needs to be compared against all the records coming into the partition - fileToComparisons.put(fileInfo.getFileId(), recordsPerPartition.get(key)); + fileToComparisons.put( + new HoodieFileGroupId(partitionPath, fileInfo.getFileId()), recordsPerPartition.get(partitionPath)); } }); } return fileToComparisons; } + + private static HoodieTableFileSystemView getBaseFileOnlyView(HoodieTable hoodieTable, Collection partitionPaths) { + try { + List fullPartitionPaths = partitionPaths.stream() + .map(partitionPath -> + String.format("%s/%s", hoodieTable.getMetaClient().getBasePathV2(), partitionPath)) + .collect(Collectors.toList()); + + FileStatus[] allFiles = + hoodieTable.getMetadataTable().getAllFilesInPartitions(fullPartitionPaths).values().stream() + .flatMap(Arrays::stream) + .toArray(FileStatus[]::new); + + return new HoodieTableFileSystemView(hoodieTable.getMetaClient(), hoodieTable.getActiveTimeline(), allFiles); + } catch (IOException e) { + LOG.error(String.format("Failed to fetch all files for partitions (%s)", partitionPaths)); + throw new HoodieIOException("Failed to fetch all files for partitions", e); + } + } + + static class AffineBloomIndexFileGroupPartitioner extends Partitioner { + + private final Broadcast latestBaseFilesBroadcast; + + // TODO(HUDI-5619) remove when addressed + private final Map> cachedLatestBaseFileNames = + new HashMap<>(16); + + private final int targetPartitions; + + AffineBloomIndexFileGroupPartitioner(Broadcast baseFileOnlyViewBroadcast, + int targetPartitions) { + this.targetPartitions = targetPartitions; + this.latestBaseFilesBroadcast = baseFileOnlyViewBroadcast; + } + + @Override + public int numPartitions() { + return targetPartitions; + } + + @Override + public int getPartition(Object key) { + HoodieFileGroupId partitionFileGroupId = (HoodieFileGroupId) key; + String partitionPath = partitionFileGroupId.getPartitionPath(); + String fileGroupId = partitionFileGroupId.getFileId(); + + /* + // TODO(HUDI-5619) uncomment when addressed + String baseFileName = + latestBaseFilesBroadcast.getValue() + .getLatestBaseFile(partitionPath, fileGroupId) + .orElseThrow(() -> new HoodieException( + String.format("File from file-group (%s) not found in partition path (%s)", fileGroupId, partitionPath))) + .getFileName(); + */ + + // NOTE: This is a workaround to alleviate performance impact of needing to process whole + // partition for every file-group being looked up. + // See HUDI-5619 for more details + String baseFileName = cachedLatestBaseFileNames.computeIfAbsent(partitionPath, ignored -> + latestBaseFilesBroadcast.getValue() + .getLatestBaseFiles(partitionPath) + .collect( + Collectors.toMap(HoodieBaseFile::getFileId, BaseFile::getFileName) + ) + ) + .get(fileGroupId); + + if (baseFileName == null) { + throw new HoodieException( + String.format("File from file-group (%s) not found in partition path (%s)", fileGroupId, partitionPath)); + } + + String bloomIndexEncodedKey = + getBloomFilterIndexKey(new PartitionIndexID(partitionPath), new FileIndexID(baseFileName)); + + // NOTE: It's crucial that [[targetPartitions]] be congruent w/ the number of + // actual file-groups in the Bloom Index in MT + return mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey, targetPartitions); + } + } + + public static class HoodieSparkBloomIndexCheckFunction extends HoodieBloomIndexCheckFunction> + implements FlatMapFunction>, List> { + + public HoodieSparkBloomIndexCheckFunction(HoodieTable hoodieTable, + HoodieWriteConfig config) { + super(hoodieTable, config, t -> t._1, t -> t._2); + } + + @Override + public Iterator> call(Iterator> fileGroupIdRecordKeyPairIterator) { + return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator); + } + } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index f4edf7fb94260..5853b4eb8a8cb 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -23,17 +23,17 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.TablePathUtils +import org.apache.spark.sql._ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer} -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel import java.util.Locale diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java index e946450c90426..fe178839c0dc5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java @@ -18,6 +18,7 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.util.collection.Pair; import org.junit.jupiter.api.Test; @@ -36,33 +37,37 @@ public class TestBucketizedBloomCheckPartitioner { @Test public void testAssignmentCorrectness() { - Map fileToComparisons = new HashMap() { + HoodieFileGroupId fg1 = new HoodieFileGroupId("p1", "f1"); + HoodieFileGroupId fg2 = new HoodieFileGroupId("p1", "f2"); + HoodieFileGroupId fg3 = new HoodieFileGroupId("p1", "f3"); + + Map fileToComparisons = new HashMap() { { - put("f1", 40L); - put("f2", 35L); - put("f3", 20L); + put(fg1, 40L); + put(fg2, 35L); + put(fg3, 20L); } }; BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(4, fileToComparisons, 10); - Map> assignments = p.getFileGroupToPartitions(); - assertEquals(4, assignments.get("f1").size(), "f1 should have 4 buckets"); - assertEquals(4, assignments.get("f2").size(), "f2 should have 4 buckets"); - assertEquals(2, assignments.get("f3").size(), "f3 should have 2 buckets"); - assertArrayEquals(new Integer[] {0, 0, 1, 3}, assignments.get("f1").toArray(), "f1 spread across 3 partitions"); - assertArrayEquals(new Integer[] {1, 2, 2, 0}, assignments.get("f2").toArray(), "f2 spread across 3 partitions"); - assertArrayEquals(new Integer[] {3, 1}, assignments.get("f3").toArray(), "f3 spread across 2 partitions"); + Map> assignments = p.getFileGroupToPartitions(); + assertEquals(4, assignments.get(fg1).size(), "f1 should have 4 buckets"); + assertEquals(4, assignments.get(fg2).size(), "f2 should have 4 buckets"); + assertEquals(2, assignments.get(fg3).size(), "f3 should have 2 buckets"); + assertArrayEquals(new Integer[] {0, 0, 1, 3}, assignments.get(fg1).toArray(), "f1 spread across 3 partitions"); + assertArrayEquals(new Integer[] {2, 2, 3, 1}, assignments.get(fg2).toArray(), "f2 spread across 3 partitions"); + assertArrayEquals(new Integer[] {1, 0}, assignments.get(fg3).toArray(), "f3 spread across 2 partitions"); } @Test public void testUniformPacking() { // evenly distribute 10 buckets/file across 100 partitions - Map comparisons1 = new HashMap() { + Map comparisons1 = new HashMap() { { - IntStream.range(0, 10).forEach(f -> put("f" + f, 100L)); + IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", "f" + f), 100L)); } }; BucketizedBloomCheckPartitioner partitioner = new BucketizedBloomCheckPartitioner(100, comparisons1, 10); - Map> assignments = partitioner.getFileGroupToPartitions(); + Map> assignments = partitioner.getFileGroupToPartitions(); assignments.forEach((key, value) -> assertEquals(10, value.size())); Map partitionToNumBuckets = assignments.entrySet().stream().flatMap(e -> e.getValue().stream().map(p -> Pair.of(p, e.getKey()))) @@ -72,9 +77,9 @@ public void testUniformPacking() { @Test public void testNumPartitions() { - Map comparisons1 = new HashMap() { + Map comparisons1 = new HashMap() { { - IntStream.range(0, 10).forEach(f -> put("f" + f, 100L)); + IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", "f" + f), 100L)); } }; BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(10000, comparisons1, 10); @@ -83,15 +88,15 @@ public void testNumPartitions() { @Test public void testGetPartitions() { - Map comparisons1 = new HashMap() { + Map comparisons1 = new HashMap() { { - IntStream.range(0, 100000).forEach(f -> put("f" + f, 100L)); + IntStream.range(0, 100000).forEach(f -> put(new HoodieFileGroupId("p1", "f" + f), 100L)); } }; BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(1000, comparisons1, 10); IntStream.range(0, 100000).forEach(f -> { - int partition = p.getPartition(Pair.of("f" + f, "value")); + int partition = p.getPartition(Pair.of(new HoodieFileGroupId("p1", "f" + f), "value")); assertTrue(0 <= partition && partition <= 1000, "partition is out of range: " + partition); }); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index 5be4e4ce624a3..3c906062c16ed 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; @@ -35,7 +36,6 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; -import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; @@ -259,12 +259,14 @@ public void testRangePruning( jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004"))).mapToPair(t -> t); - List> comparisonKeyList = HoodieJavaRDD.getJavaRDD( - index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect(); + List> comparisonKeyList = + index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD)).collectAsList(); assertEquals(10, comparisonKeyList.size()); Map> recordKeyToFileComps = comparisonKeyList.stream() - .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), Collectors.mapping(Pair::getLeft, Collectors.toList()))); + .collect( + Collectors.groupingBy(t -> t.getRight(), + Collectors.mapping(t -> t.getLeft().getFileId(), Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new HashSet<>(recordKeyToFileComps.get("002"))); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 3ad8952feea84..2577f9ba28433 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.functional.TestHoodieMetadataBase; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; @@ -29,7 +30,6 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaPairRDD; -import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; @@ -201,9 +201,9 @@ public void testExplodeRecordRDDWithFileComparisons() { jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t); - List> comparisonKeyList = HoodieJavaRDD.getJavaRDD( - index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, - HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect(); + List> comparisonKeyList = + index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieJavaPairRDD.of(partitionRecordKeyPairRDD)) + .collectAsList(); /* * expecting: f4, HoodieKey { recordKey=003 partitionPath=2017/10/23} f1, HoodieKey { recordKey=003 @@ -216,7 +216,7 @@ public void testExplodeRecordRDDWithFileComparisons() { assertEquals(10, comparisonKeyList.size()); Map> recordKeyToFileComps = comparisonKeyList.stream() - .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), Collectors.mapping(Pair::getKey, Collectors.toList()))); + .collect(Collectors.groupingBy(t -> t.getRight(), Collectors.mapping(t -> t.getLeft().getFileId(), Collectors.toList()))); assertEquals(4, recordKeyToFileComps.size()); assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new HashSet<>(recordKeyToFileComps.get("002"))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java index 1d56e63fad928..9ce0947883f6b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -105,9 +105,9 @@ HoodieData mapPartitions(SerializableFunction, Iterator> func, boolean preservesPartitioning); /** - * Maps every element in the collection into a collection of the new elements (provided by - * {@link Iterator}) using provided mapping {@code func}, subsequently flattening the result - * (by concatenating) into a single collection + * Maps every element in the collection into a collection of the new elements using provided + * mapping {@code func}, subsequently flattening the result (by concatenating) into a single + * collection * * This is an intermediate operation * @@ -117,6 +117,17 @@ HoodieData mapPartitions(SerializableFunction, */ HoodieData flatMap(SerializableFunction> func); + /** + * Maps every element in the collection into a collection of the {@link Pair}s of new elements + * using provided mapping {@code func}, subsequently flattening the result (by concatenating) into + * a single collection + * + * NOTE: That this operation will convert container from {@link HoodieData} to {@link HoodiePairData} + * + * This is an intermediate operation + */ + HoodiePairData flatMapToPair(SerializableFunction>> func); + /** * Maps every element in the collection using provided mapping {@code func} into a {@link Pair} * of elements {@code K} and {@code V} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java index b2a503a85b323..c6287a744e0e1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java @@ -125,6 +125,16 @@ public HoodieData flatMap(SerializableFunction> func) { return new HoodieListData<>(mappedStream, lazy); } + @Override + public HoodiePairData flatMapToPair(SerializableFunction>> func) { + Function>> mapper = throwingMapWrapper(func); + Stream> mappedStream = asStream().flatMap(e -> + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(mapper.apply(e), Spliterator.ORDERED), true)); + + return new HoodieListPairData<>(mappedStream, lazy); + } + @Override public HoodiePairData mapToPair(SerializablePairFunction func) { Function> throwableMapToPairFunc = throwingMapToPairWrapper(func); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java index a389649548e98..39ce141157593 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java @@ -23,15 +23,20 @@ import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.MappingIterator; import org.apache.hudi.common.util.collection.Pair; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; @@ -136,6 +141,24 @@ public HoodieData map(SerializableFunction, O> func) { return new HoodieListData<>(asStream().map(uncheckedMapper), lazy); } + @Override + public HoodiePairData mapValues(SerializableFunction func) { + Function uncheckedMapper = throwingMapWrapper(func); + return new HoodieListPairData<>(asStream().map(p -> Pair.of(p.getKey(), uncheckedMapper.apply(p.getValue()))), lazy); + } + + public HoodiePairData flatMapValues(SerializableFunction> func) { + Function> uncheckedMapper = throwingMapWrapper(func); + return new HoodieListPairData<>(asStream().flatMap(p -> { + Iterator mappedValuesIterator = uncheckedMapper.apply(p.getValue()); + Iterator> mappedPairsIterator = + new MappingIterator<>(mappedValuesIterator, w -> Pair.of(p.getKey(), w)); + + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(mappedPairsIterator, Spliterator.ORDERED), true); + }), lazy); + } + @Override public HoodiePairData mapToPair(SerializablePairFunction, L, W> mapToPairFunc) { return new HoodieListPairData<>(asStream().map(p -> throwingMapToPairWrapper(mapToPairFunc).apply(p)), lazy); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java index 49fa7174da9a6..1d3622786fd07 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java @@ -90,12 +90,17 @@ public interface HoodiePairData extends Serializable { HoodiePairData reduceByKey(SerializableBiFunction combiner, int parallelism); /** - * @param func serializable map function. - * @param output object type. - * @return {@link HoodieData} containing the result. Actual execution may be deferred. + * Maps key-value pairs of this {@link HoodiePairData} container leveraging provided mapper + * + * NOTE: That this returns {@link HoodieData} and not {@link HoodiePairData} */ HoodieData map(SerializableFunction, O> func); + /** + * Maps values of this {@link HoodiePairData} container leveraging provided mapper + */ + HoodiePairData mapValues(SerializableFunction func); + /** * @param mapToPairFunc serializable map function to generate another pair. * @param new key type. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java b/hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java new file mode 100644 index 0000000000000..1cbae113fcbce --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.function; + +/** + * Throwing counterpart of {@link java.util.function.Consumer} + */ +@FunctionalInterface +public interface ThrowingConsumer { + + void accept(T t) throws Exception; + +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index 7d497408e6467..a8c695240cddc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -113,7 +113,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { @Override public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException { - GenericRecord record = HoodieAvroUtils.rewriteRecord((GenericRecord) data, targetSchema); + GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, targetSchema); return new HoodieAvroIndexedRecord(key, record, operation, metaData); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index e7c749a2911b2..00fa1b97db06d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -338,6 +338,10 @@ public Configuration getHadoopConf() { return hadoopConf.get(); } + public SerializableConfiguration getSerializableHadoopConf() { + return hadoopConf; + } + /** * Get the active instants as a timeline. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java new file mode 100644 index 0000000000000..4adf6b5c91f7c --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Iterator flattening source {@link Iterator} holding other {@link Iterator}s + */ +public final class FlatteningIterator> implements Iterator { + + private final Iterator sourceIterator; + private Iterator innerSourceIterator; + + public FlatteningIterator(Iterator source) { + this.sourceIterator = source; + } + + public boolean hasNext() { + while (innerSourceIterator == null || !innerSourceIterator.hasNext()) { + if (sourceIterator.hasNext()) { + innerSourceIterator = sourceIterator.next(); + } else { + return false; + } + } + + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + return innerSourceIterator.next(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java index a4655764a97f3..24b0961470bde 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java @@ -21,7 +21,9 @@ import java.util.Iterator; import java.util.function.Function; -// TODO java-docs +/** + * Iterator mapping elements of the provided source {@link Iterator} from {@code I} to {@code O} + */ public class MappingIterator implements Iterator { protected final Iterator source; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 1034aa3125532..f343d6437cfa0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -19,12 +19,12 @@ package org.apache.hudi.metadata; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -73,11 +73,12 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { private static final Logger LOG = LogManager.getLogger(BaseTableMetadata.class); - public static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; - public static final int BUFFER_SIZE = 10 * 1024 * 1024; + protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; + // NOTE: Buffer-size is deliberately set pretty low, since MT internally is relying + // on HFile (serving as persisted binary key-value mapping) to do caching + protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb protected final transient HoodieEngineContext engineContext; - protected final SerializableConfiguration hadoopConf; protected final SerializablePath dataBasePath; protected final HoodieTableMetaClient dataMetaClient; protected final Option metrics; @@ -92,9 +93,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String dataBasePath, String spillableMapDirectory) { this.engineContext = engineContext; - this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf()); this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath)); - this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build(); + this.dataMetaClient = HoodieTableMetaClient.builder() + .setConf(engineContext.getHadoopConf().get()) + .setBasePath(dataBasePath) + .build(); this.spillableMapDirectory = spillableMapDirectory; this.metadataConfig = metadataConfig; @@ -123,8 +126,10 @@ public List getAllPartitionPaths() throws IOException { throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e); } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), - metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths(); + + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + createFileSystemBackedTableMetadata(); + return fileSystemBackedTableMetadata.getAllPartitionPaths(); } /** @@ -148,8 +153,9 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()) - .getAllFilesInPartition(partitionPath); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + createFileSystemBackedTableMetadata(); + return fileSystemBackedTableMetadata.getAllFilesInPartition(partitionPath); } @Override @@ -168,8 +174,9 @@ public Map getAllFilesInPartitions(Collection part } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()) - .getAllFilesInPartitions(partitions); + FileSystemBackedTableMetadata fileSystemBackedTableMetadata = + createFileSystemBackedTableMetadata(); + return fileSystemBackedTableMetadata.getAllFilesInPartitions(partitions); } @Override @@ -205,6 +212,7 @@ public Map, BloomFilter> getBloomFilters(final List partitionIDFileIDSortedStrings = new TreeSet<>(); Map> fileToKeyMap = new HashMap<>(); + // TODO simplify (no sorting is required) partitionNameFileNameList.forEach(partitionNameFileNamePair -> { final String bloomFilterIndexKey = HoodieMetadataPayload.getBloomFilterIndexKey( new PartitionIndexID(partitionNameFileNamePair.getLeft()), new FileIndexID(partitionNameFileNamePair.getRight())); @@ -227,7 +235,11 @@ public Map, BloomFilter> getBloomFilters(final List fetchAllFilesInPartitionPaths(List partitionPath getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); - FileSystem fs = partitionPaths.get(0).getFileSystem(hadoopConf.get()); + FileSystem fs = partitionPaths.get(0).getFileSystem(getHadoopConf()); Map partitionPathToFilesMap = partitionIdRecordPairs.parallelStream() .map(pair -> { @@ -399,18 +411,27 @@ private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, Stri } } + private FileSystemBackedTableMetadata createFileSystemBackedTableMetadata() { + return new FileSystemBackedTableMetadata(getEngineContext(), dataMetaClient.getSerializableHadoopConf(), dataBasePath.toString(), + metadataConfig.shouldAssumeDatePartitioning()); + } + protected abstract Option> getRecordByKey(String key, String partitionName); public abstract List>>> getRecordsByKeys(List key, String partitionName); protected HoodieEngineContext getEngineContext() { - return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get()); + return engineContext != null ? engineContext : new HoodieLocalEngineContext(getHadoopConf()); } public HoodieMetadataConfig getMetadataConfig() { return metadataConfig; } + protected Configuration getHadoopConf() { + return dataMetaClient.getHadoopConf(); + } + protected String getLatestDataInstantTime() { return dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant() .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 09c6b35309a3a..ecb0da8792df0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -24,7 +24,6 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -53,6 +52,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; +import org.apache.hudi.util.Transient; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieSeekingFileReader; @@ -90,19 +90,18 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadata.class); - private static final Schema METADATA_RECORD_SCHEMA = HoodieMetadataRecord.getClassSchema(); + private final String metadataBasePath; - private String metadataBasePath; - // Metadata table's timeline and metaclient private HoodieTableMetaClient metadataMetaClient; private HoodieTableConfig metadataTableConfig; + private HoodieTableFileSystemView metadataFileSystemView; // should we reuse the open file handles, across calls private final boolean reuse; // Readers for the latest file slice corresponding to file groups in the metadata partition - private final Map, Pair, HoodieMetadataLogRecordReader>> partitionReaders = - new ConcurrentHashMap<>(); + private final Transient, Pair, HoodieMetadataLogRecordReader>>> partitionReaders = + Transient.lazy(ConcurrentHashMap::new); public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory) { @@ -113,18 +112,19 @@ public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetada String datasetBasePath, String spillableMapDirectory, boolean reuse) { super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory); this.reuse = reuse; + this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString()); + initIfNeeded(); } private void initIfNeeded() { - this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString()); if (!isMetadataTableEnabled) { if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) { LOG.info("Metadata table is disabled."); } } else if (this.metadataMetaClient == null) { try { - this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build(); + this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(getHadoopConf()).setBasePath(metadataBasePath).build(); this.metadataFileSystemView = getFileSystemView(metadataMetaClient); this.metadataTableConfig = metadataMetaClient.getTableConfig(); this.isBloomFilterIndexEnabled = metadataConfig.isBloomFilterIndexEnabled(); @@ -213,14 +213,14 @@ public HoodieData> getRecordsByKeyPrefixes(L return mergedRecords.stream() .map(keyRecordPair -> keyRecordPair.getValue().orElse(null)) + .filter(Objects::nonNull) .iterator(); } catch (IOException ioe) { throw new HoodieIOException("Error merging records from metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe); } finally { closeReader(readers); } - }) - .filter(Objects::nonNull); + }); } @Override @@ -425,7 +425,7 @@ private Map, List> getPartitionFileSliceToKeysMa private Pair, HoodieMetadataLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) { if (reuse) { Pair key = Pair.of(partitionName, slice.getFileId()); - return partitionReaders.computeIfAbsent(key, ignored -> openReaders(partitionName, slice)); + return partitionReaders.get().computeIfAbsent(key, ignored -> openReaders(partitionName, slice)); } else { return openReaders(partitionName, slice); } @@ -462,7 +462,7 @@ private Pair, Long> getBaseFileReader(FileSlice slice if (basefile.isPresent()) { String baseFilePath = basefile.get().getPath(); baseFileReader = (HoodieSeekingFileReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) - .getFileReader(hadoopConf.get(), new Path(baseFilePath)); + .getFileReader(getHadoopConf(), new Path(baseFilePath)); baseFileOpenMs = timer.endTimer(); LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", baseFilePath, basefile.get().getCommitTime(), baseFileOpenMs)); @@ -596,7 +596,7 @@ public void close() { */ private synchronized void close(Pair partitionFileSlicePair) { Pair, HoodieMetadataLogRecordReader> readers = - partitionReaders.remove(partitionFileSlicePair); + partitionReaders.get().remove(partitionFileSlicePair); closeReader(readers); } @@ -604,10 +604,10 @@ private synchronized void close(Pair partitionFileSlicePair) { * Close and clear all the partitions readers. */ private void closePartitionReaders() { - for (Pair partitionFileSlicePair : partitionReaders.keySet()) { + for (Pair partitionFileSlicePair : partitionReaders.get().keySet()) { close(partitionFileSlicePair); } - partitionReaders.clear(); + partitionReaders.get().clear(); } private void closeReader(Pair, HoodieMetadataLogRecordReader> readers) { @@ -629,10 +629,6 @@ public boolean enabled() { return isMetadataTableEnabled; } - public SerializableConfiguration getHadoopConf() { - return hadoopConf; - } - public HoodieTableMetaClient getMetadataMetaClient() { return metadataMetaClient; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index ab5b5f6b4db82..7cb35a9b6624c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -49,9 +49,8 @@ public HoodieMetadataFileSystemView(HoodieEngineContext engineContext, HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, HoodieMetadataConfig metadataConfig) { - super(metaClient, visibleActiveTimeline); - this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath(), - FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true); + this(metaClient, visibleActiveTimeline, HoodieTableMetadata.create(engineContext, metadataConfig, + metaClient.getBasePath(), FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/util/Transient.java b/hudi-common/src/main/java/org/apache/hudi/util/Transient.java new file mode 100644 index 0000000000000..0d8f6ad6565e5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/util/Transient.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.function.SerializableSupplier; +import org.apache.hudi.common.function.ThrowingConsumer; + +import javax.annotation.concurrent.ThreadSafe; +import java.io.Serializable; + +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + +/** + * {@link Serializable} counterpart of {@link Lazy} + * + * @param type of the object being held by {@link Transient} + */ +@ThreadSafe +public class Transient implements Serializable { + + private SerializableSupplier initializer; + + private transient boolean initialized; + private transient T ref; + + private Transient(SerializableSupplier initializer) { + checkArgument(initializer != null); + + this.initializer = initializer; + this.ref = null; + this.initialized = false; + } + + private Transient(T value, SerializableSupplier initializer) { + checkArgument(value != null); + checkArgument(initializer != null); + + this.initializer = initializer; + this.ref = value; + this.initialized = true; + } + + public T get() { + if (!initialized) { + synchronized (this) { + if (!initialized) { + this.ref = initializer.get(); + initialized = true; + } + } + } + + return ref; + } + + public void reset() { + synchronized (this) { + this.ref = null; + this.initialized = false; + } + } + + public void destroy(ThrowingConsumer cleaner) throws Exception { + synchronized (this) { + if (initialized) { + cleaner.accept(ref); + } + + this.ref = null; + this.initialized = false; + this.initializer = null; + } + } + + /** + * Creates instance of {@link Transient} by lazily executing provided {@code initializer}, + * to instantiate value of type {@link T}. Same initializer will be used to re-instantiate + * the value after original one being dropped during serialization/deserialization cycle + */ + public static Transient lazy(SerializableSupplier initializer) { + return new Transient<>(initializer); + } + + /** + * Creates instance of {@link Transient} by eagerly setting it to provided {@code value}, + * while given {@code initializer} will be used to re-instantiate the value after original + * one being dropped during serialization/deserialization cycle + */ + public static Transient eager(T value, SerializableSupplier initializer) { + return new Transient<>(value, initializer); + } +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java new file mode 100644 index 0000000000000..6e9d43388b12f --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.collection; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.CollectionUtils.toStream; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestIterators { + + @Test + public void testFlatteningIterator() { + List> listOfList = + Arrays.asList( + Arrays.asList(0), + Arrays.asList(1, 2), + Collections.emptyList(), + Arrays.asList(3, 4, 5) + ); + + List flattenedList = + toStream(new FlatteningIterator<>(new MappingIterator<>(listOfList.iterator(), List::iterator))) + .collect(Collectors.toList()); + + assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), flattenedList); + } +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index da53f00e69781..c4d7c1ff5b05e 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -33,13 +33,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, LogicalPlan} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat} +import org.apache.spark.sql.execution.vectorized.MutableColumnarRow import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.vectorized.MutableColumnarRow -import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._