Skip to content

Conversation

@yihua
Copy link
Contributor

@yihua yihua commented Aug 18, 2022

Change Logs

When enabling column stats and bloom filter reading from metadata table for Bloom index (hoodie.bloom.index.use.metadata=true), frequent S3 timeouts like below happen and cause the write job to retry a lot or fail:

Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='s3a://<>/.hoodie/metadata/column_stats/.col-stats-0000_00000000000000.log.4_5-116-20141', fileLen=-1}
	at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:389)
	at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:123)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:229)
	... 38 more
Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException: re-open s3a://<>/.hoodie/metadata/column_stats/.col-stats-0000_00000000000000.log.4_5-116-20141 at 475916 on s3a://<>/.hoodie/metadata/column_stats/.col-stats-0000_00000000000000.log.4_5-116-20141: com.amazonaws.SdkClientException: Unable to execute HTTP request: The target server failed to respond: Unable to execute HTTP request: The target server failed to respond
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:208)
	at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)
	at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:226)
	at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:392)
	at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:228)
	at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:115)
	at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:354)
	at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:226)
	at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:270)
	at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:384)
	at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:418)
	at java.io.FilterInputStream.read(FilterInputStream.java:83)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.hudi.common.table.log.block.HoodieLogBlock.getLogMetadata(HoodieLogBlock.java:228)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:193)
	at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:387)
	... 40 more

Relevant Spark stages are (1) fetching column stats from the metadata table (Stage 86 below) (2) fetching bloom filters from metadata table and do record key lookup (Stage 141 below)
Screen Shot 2022-08-22 at 12 52 05
Screen Shot 2022-08-22 at 14 22 14

The root cause is that, the parallelism to fetch column stats and bloom filters is too high, causing too much concurrency reading metadata table from each executor, where HFiles and log files are scanned.

To address this problem, this PR adjusts the Bloom Index DAG to limit the parallelism to fetch column stats and bloom filters from the metadata table. Specifically,

  • Adds a new write config hoodie.bloom.index.metadata.read.parallelism (default 10) to control the parallelism for reading the index from metadata table. This affects the fetching of both column stats and bloom filters.
    • Limits the parallelism of column stats fetching inside HoodieBloomIndex::loadColumnRangesFromMetaIndex()
    • Limits the parallelism of bloom filter fetching inside SparkHoodieBloomIndexHelper::findMatchingFilesForRecordKeys()
  • Rewrites HoodieMetadataBloomIndexCheckFunction to support batch processing of bloom filter fetching
    • Adds a new write config hoodie.bloom.index.metadata.bloom.filter.read.batch.size (default 128) to determine the batch size for reading bloom filters from metadata table. Smaller value puts less pressure on the executor memory.
  • Before the lookup of <file ID, and record key> pairs, sorts the list of pairs based on the bloom filter key entry in the metadata table, instead of file ID alone (which does not provide value). In this way, bloom filter keys are nicely laid out in sequential lookup.
    • To support this, we need to have the file name (required for generating the hash key) stored alongside the file ID, by adding a new member in BloomIndexFileInfo. To support efficient lookup of file name based on the file ID, the data structure of a few intermediate results are changed.

Impact

Risk level: high
The changes are tested and benchmarked through upserts to Hudi tables on S3, using both 1GB and 100GB batches.

Below shows the Spark stages of fetching column stats and bloom filters after the fix. Timeouts are no longer present. The index fetching is much faster (column stats: 1min+ -> 7s, 2min+ -> 1min).
Screen Shot 2022-08-22 at 14 24 03
Screen Shot 2022-08-22 at 14 24 25

Based on the benchmarking, when enabling metadata table reading for column stats and bloom filters in Bloom index, the overall upsert latency is reduced by 1-3 min (10%+) after the fix.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@yihua yihua force-pushed the HUDI-4586-improve-metadata-fetching-in-bloom-index branch from a9a329c to f0de134 Compare August 20, 2022 00:26
@yihua yihua changed the title [WIP][HUDI-4586] Improve metadata fetching in bloom index [HUDI-4586] Improve metadata fetching in bloom index Aug 20, 2022
@yihua yihua force-pushed the HUDI-4586-improve-metadata-fetching-in-bloom-index branch 3 times, most recently from 5162838 to ab274c8 Compare August 21, 2022 06:00
@yihua yihua force-pushed the HUDI-4586-improve-metadata-fetching-in-bloom-index branch from ab274c8 to ed15f57 Compare August 22, 2022 21:41
throw new HoodieMetadataException("Unable to find column range metadata for partition:" + partitionName, me);
}
}, Math.max(partitions.size(), 1));
return context.parallelize(partitions, Math.max(Math.min(partitions.size(), parallelism), 1))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limits the parallelism of column stats fetching from the metadata table.

// Step 2: Use bloom filter to filter and the actual log file to get the record location
keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex(
new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true);
keyLookupResultRDD = sortedFileIdAndKeyPairs.coalesce(config.getBloomIndexMetadataReadParallelism())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limits the parallelism of bloom filter fetching.


private final String fileId;

private final String filename;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we store the filename to avoid recomputation of the same piece of information across stages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to store both file-id and file-name? I think we can just store the file-name, and then convert it to file-id wherever necessary

final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
// partition -> {file ID -> BloomIndexFileInfo instance}
final Map<String, Map<String, BloomIndexFileInfo>> partitionToFileInfo = fileInfoList.stream()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from List to Map for each value so that the filename can be retrieved with file ID in O(1) time.

// that contains it.
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
// Each entry: ((File ID, Filename), HoodieKey instance)
HoodieData<Pair<Pair<String, String>, HoodieKey>> fileComparisonPairs =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along the DAG, passing down the file names so they don't need to be recomputed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies here: i don't think we need to propagate both file-id and file-name since the latter contains the former (it increases both complexity and amount of data we need to shuffle)

@apache apache deleted a comment from hudi-bot Aug 23, 2022
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua i don't think we're focusing on the right problem here: RDD parallelism should not be directly translating into concurrency of reads of the MT, since it'd be cached (we'd be reading it at most N times, where N is the number of executors not the number of concurrent tasks).

If that's not what we're observing in tests then we need to chase down why caching doesn't actually work.

public BloomIndexFileInfo(String fileId, String minRecordKey, String maxRecordKey) {
public BloomIndexFileInfo(String fileId, String filename, String minRecordKey, String maxRecordKey) {
this.fileId = fileId;
this.filename = filename;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fileName

List<Pair<String, BloomIndexFileInfo>> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
// partition -> {file ID -> BloomIndexFileInfo instance}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "File Id" denomination might be confusing, more accurate would be "file-group id"


private final String fileId;

private final String filename;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to store both file-id and file-name? I think we can just store the file-name, and then convert it to file-id wherever necessary

// that contains it.
HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
// Each entry: ((File ID, Filename), HoodieKey instance)
HoodieData<Pair<Pair<String, String>, HoodieKey>> fileComparisonPairs =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies here: i don't think we need to propagate both file-id and file-name since the latter contains the former (it increases both complexity and amount of data we need to shuffle)

*/
public class HoodieMetadataBloomIndexCheckFunction implements
Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
Function2<Integer, Iterator<Tuple2<Tuple2<String, String>, HoodieKey>>, Iterator<List<HoodieKeyLookupResult>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, it's better to keep the nested hierarchies as flat as possible (makes it easier to comprehend, reduces amount of objects needed as well): in that case we could use just the Tuple3 in lieu of 2 Tuple2s

Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
hoodieTable.getMetadataTable().getBloomFilters(partitionPathFileNameList);
LOG.error(String.format("Took %d ms to look up %s bloom filters",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should rather be debug/info

throw new HoodieIndexException("Failed to find the base file for partition: " + partitionPath
+ ", fileId: " + fileId);
if (batchFileToKeysMap.size() == batchSize) {
resultList.addAll(lookupKeysInBloomFilters(batchFileToKeysMap));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just break in this conditional to avoid duplication

@alexeykudinkin
Copy link
Contributor

Following up on my previous comment: taking a deeper look i see following issues in our code at the moment

  1. Creating HoodieTableBackedMetadata instance w/in HoodieTable we don't specify that it should reuse MT readers.
  2. HoodieMetadataMergedLogRecordReader.getRecordsByKeys always clears previously computed records and always scans from scratch while instead we should NOT be re-processing records that have already been processed, and instead just incrementally process missing ones.

@yihua yihua added priority:critical Production degraded; pipelines stalled index labels Sep 6, 2022
@nsivabalan nsivabalan added the area:performance Performance optimizations label Sep 22, 2022
@nsivabalan nsivabalan added the status:in-progress Work in progress label Nov 2, 2022
@nsivabalan nsivabalan added the release-0.12.2 Patches targetted for 0.12.2 label Dec 6, 2022
@alexeykudinkin alexeykudinkin removed the release-0.12.2 Patches targetted for 0.12.2 label Dec 15, 2022
@yihua
Copy link
Contributor Author

yihua commented Feb 8, 2023

Part of this PR went into #7642 so closing this one

@yihua yihua closed this Feb 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:performance Performance optimizations priority:critical Production degraded; pipelines stalled status:in-progress Work in progress

Projects

Status: 🏁 Triaged
Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

4 participants