Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Jan 11, 2023

Change Logs

Most recently, while trying to use Metadata Table in Bloom Index it was resulting in failures due to exhaustion of S3 connection pool no matter how (reasonably big) we're setting the pool size (we've tested up to 3k connections).

This PR focuses on optimizing the Bloom Index lookup sequence in case when it's leveraging Bloom Filter partition in Metadata Table. The premise of this change is based on the following observations:

  • Increasing the size of the batch of the requests to MT allows to amortize the cost of processing it (bigger the batch, lesser the cost).

  • Having too few partitions in the Bloom Index path however, starts to hurt parallelism when we actually probe individual files whether they actually contain target keys or not. Solution to this is to split these 2 in different stages w/ drastically different parallelism levels: constrain parallelism when reading from MT (10s of tasks) and keep at the current level for probing individual files (100s of tasks)

  • Current way of partitioning records (relying on Spark's default partitioner) was entailing that every Spark executor with high likelihood will be opening up (and processing) every file-group of the MT Bloom Filter partition. To alleviate that same hashing algorithm used by MT should be used to partition records into Spark's individual partitions, so that we can limit every task to open no more than 1 file-group in Bloom Filter's partition of MT

To achieve that following changes in Bloom Index sequence (leveraging MT) are implemented

  1. Bloom Filter probing and actual File Probing are split into 2 separate operations (so that parallelism of each of them could be controlled individually)
  2. Requests to MT are replaced to invoke batch APIs
  3. Custom partitioner is introduced AffineBloomIndexFileGroupPartitioner repartitioning dataset of filenames with corresponding record keys in a way that is affine w/ MT Bloom Filters' partitioning (allowing us to open no more than a single file-group per Spark's task)

Additionally, this PR addresses some of the low-hanging performance optimizations that could considerably improve performance of the Bloom Index lookup sequence like mapping file-comparison pairs to PairRDD (where key is file-name, and value is record-key) instead of RDD so that we could:

  1. Do in-partition sorting by filename (to make sure we check all records w/in the file all at once) w/in a single Spark partition instead of global one (reducing shuffling as well)
  2. Avoid re-shuffling (by re-mapping from RDD to PairRDD later)

Impact

Impact of this PR is more than 10x improvement of the record tagging sequence using Bloom Index leveraging Bloom Filters persisted in MT.

Risk level (write none, low medium or high below)

Low

Documentation Update

We need to update documentation to still elaborate that if using MT partition path users still need to increase # of S3 connections to roughly ~30 per executors core, for it to avert hitting Connection Pool exhaustion problem.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@danny0405 danny0405 added metadata priority:high Significant impact; potential bugs engine:spark Spark integration labels Jan 11, 2023
@alexeykudinkin alexeykudinkin added priority:blocker Production down; release blocker and removed priority:high Significant impact; potential bugs labels Jan 11, 2023
@alexeykudinkin alexeykudinkin requested a review from yihua January 11, 2023 06:59
@alexeykudinkin alexeykudinkin assigned yihua and unassigned nsivabalan Jan 11, 2023
@alexeykudinkin alexeykudinkin changed the title [DNM] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table [HUDI-5534] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table Jan 11, 2023
@alexeykudinkin alexeykudinkin changed the title [HUDI-5534] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table [HUDI-5534][Stacked on 6782] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table Jan 11, 2023
@alexeykudinkin alexeykudinkin force-pushed the ak/blm-idx-dag-opt branch 3 times, most recently from 2ac1a78 to 6988dfc Compare January 18, 2023 22:02
@alexeykudinkin alexeykudinkin changed the title [HUDI-5534][Stacked on 6782] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table [HUDI-5534][Stacked on 6815] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table Jan 19, 2023
@alexeykudinkin alexeykudinkin force-pushed the ak/blm-idx-dag-opt branch 3 times, most recently from 621d20b to 48c0f69 Compare January 20, 2023 06:16
@apache apache deleted a comment from hudi-bot Jan 20, 2023
@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

@alexeykudinkin alexeykudinkin force-pushed the ak/blm-idx-dag-opt branch 6 times, most recently from cb0d7dc to 4437e63 Compare January 24, 2023 21:38
@alexeykudinkin alexeykudinkin changed the title [HUDI-5534][Stacked on 6815] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table [HUDI-5534] Optimizing Bloom Index lookup when using Bloom Filters from Metadata Table Jan 24, 2023
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Half way through the review. I left a few comments.

int configuredBloomIndexParallelism = config.getBloomIndexParallelism();

// NOTE: Target parallelism could be overridden by the config
int targetParallelism =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to myself: if the configured bloom index parallelism is smaller than the input parallelism, before this PR, we take the input parallelism; now, we take the configured bloom index parallelism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Previously we're taking max(input, configured) and there was essentially no way for user to override it

Alexey Kudinkin added 19 commits January 25, 2023 20:20
…HTTP request for every file probed, killing performance);

Added config to control the parallelism factor when reading BFs from MT;
Tidying up: added comments;
…ation` explicitly;

Dialed down buffer size from 10Mb to 10Kb (this buffer is practically not used)
…sm to read MT

as the one used for Bloom Index lookup overall
@apache apache deleted a comment from hudi-bot Jan 26, 2023
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for fixing and improving the Bloom Index when reading bloom filters from the metadata table!

Comment on lines +134 to +141
int bloomFilterPartitionFileGroupCount =
config.getMetadataConfig().getBloomFilterIndexFileGroupCount();
int adjustedTargetParallelism =
targetParallelism % bloomFilterPartitionFileGroupCount == 0
? targetParallelism
// NOTE: We add 1 to make sure parallelism a) value always stays positive and b)
// {@code targetParallelism <= adjustedTargetParallelism}
: (targetParallelism / bloomFilterPartitionFileGroupCount + 1) * bloomFilterPartitionFileGroupCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure we benchmark the write workload with this new logic before landing the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One caveat is that if the targetParallelism is large, it may still overload the S3 bucket if the number of Spark executors is large (each executor reading metadata table's bloom_filters partition).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this still has that risk. This could be controlled by Bloom Index parallelism though


// TODO(HUDI-5619) remove when addressed
private final Map<String, Map<String, String>> cachedLatestBaseFileNames =
new HashMap<>(16);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason of using 16 instead of a bigger number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to skip a first few expansions, but don't want to allocate too much memory that might not even be used

@Override
public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
GenericRecord record = HoodieAvroUtils.rewriteRecord((GenericRecord) data, targetSchema);
GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, targetSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I see other irrelevant changes reverted. This one is still here. If you think this makes a performance improvement, we can keep it in this PR.

@alexeykudinkin alexeykudinkin merged commit e270924 into apache:master Jan 27, 2023
yihua pushed a commit that referenced this pull request Jan 30, 2023
…om Metadata Table (#7642)

Most recently, while trying to use Metadata Table in Bloom Index it was resulting in failures due to exhaustion of S3 connection pool no matter how (reasonably big) we're setting the pool size (we've tested up to 3k connections).

This PR focuses on optimizing the Bloom Index lookup sequence in case when it's leveraging Bloom Filter partition in Metadata Table. The premise of this change is based on the following observations:

Increasing the size of the batch of the requests to MT allows to amortize the cost of processing it (bigger the batch, lesser the cost).

Having too few partitions in the Bloom Index path however, starts to hurt parallelism when we actually probe individual files whether they actually contain target keys or not. Solution to this is to split these 2 in different stages w/ drastically different parallelism levels: constrain parallelism when reading from MT (10s of tasks) and keep at the current level for probing individual files (100s of tasks)

Current way of partitioning records (relying on Spark's default partitioner) was entailing that every Spark executor with high likelihood will be opening up (and processing) every file-group of the MT Bloom Filter partition. To alleviate that same hashing algorithm used by MT should be used to partition records into Spark's individual partitions, so that we can limit every task to open no more than 1 file-group in Bloom Filter's partition of MT

To achieve that following changes in Bloom Index sequence (leveraging MT) are implemented

Bloom Filter probing and actual File Probing are split into 2 separate operations (so that parallelism of each of them could be controlled individually)
Requests to MT are replaced to invoke batch APIs
Custom partitioner is introduced AffineBloomIndexFileGroupPartitioner repartitioning dataset of filenames with corresponding record keys in a way that is affine w/ MT Bloom Filters' partitioning (allowing us to open no more than a single file-group per Spark's task)
Additionally, this PR addresses some of the low-hanging performance optimizations that could considerably improve performance of the Bloom Index lookup sequence like mapping file-comparison pairs to PairRDD (where key is file-name, and value is record-key) instead of RDD so that we could:

Do in-partition sorting by filename (to make sure we check all records w/in the file all at once) w/in a single Spark partition instead of global one (reducing shuffling as well)
Avoid re-shuffling (by re-mapping from RDD to PairRDD later)
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Jan 31, 2023
…om Metadata Table (apache#7642)

Most recently, while trying to use Metadata Table in Bloom Index it was resulting in failures due to exhaustion of S3 connection pool no matter how (reasonably big) we're setting the pool size (we've tested up to 3k connections).

This PR focuses on optimizing the Bloom Index lookup sequence in case when it's leveraging Bloom Filter partition in Metadata Table. The premise of this change is based on the following observations:

Increasing the size of the batch of the requests to MT allows to amortize the cost of processing it (bigger the batch, lesser the cost).

Having too few partitions in the Bloom Index path however, starts to hurt parallelism when we actually probe individual files whether they actually contain target keys or not. Solution to this is to split these 2 in different stages w/ drastically different parallelism levels: constrain parallelism when reading from MT (10s of tasks) and keep at the current level for probing individual files (100s of tasks)

Current way of partitioning records (relying on Spark's default partitioner) was entailing that every Spark executor with high likelihood will be opening up (and processing) every file-group of the MT Bloom Filter partition. To alleviate that same hashing algorithm used by MT should be used to partition records into Spark's individual partitions, so that we can limit every task to open no more than 1 file-group in Bloom Filter's partition of MT

To achieve that following changes in Bloom Index sequence (leveraging MT) are implemented

Bloom Filter probing and actual File Probing are split into 2 separate operations (so that parallelism of each of them could be controlled individually)
Requests to MT are replaced to invoke batch APIs
Custom partitioner is introduced AffineBloomIndexFileGroupPartitioner repartitioning dataset of filenames with corresponding record keys in a way that is affine w/ MT Bloom Filters' partitioning (allowing us to open no more than a single file-group per Spark's task)
Additionally, this PR addresses some of the low-hanging performance optimizations that could considerably improve performance of the Bloom Index lookup sequence like mapping file-comparison pairs to PairRDD (where key is file-name, and value is record-key) instead of RDD so that we could:

Do in-partition sorting by filename (to make sure we check all records w/in the file all at once) w/in a single Spark partition instead of global one (reducing shuffling as well)
Avoid re-shuffling (by re-mapping from RDD to PairRDD later)
fengjian428 pushed a commit to fengjian428/hudi that referenced this pull request Apr 5, 2023
…om Metadata Table (apache#7642)

Most recently, while trying to use Metadata Table in Bloom Index it was resulting in failures due to exhaustion of S3 connection pool no matter how (reasonably big) we're setting the pool size (we've tested up to 3k connections).

This PR focuses on optimizing the Bloom Index lookup sequence in case when it's leveraging Bloom Filter partition in Metadata Table. The premise of this change is based on the following observations:

Increasing the size of the batch of the requests to MT allows to amortize the cost of processing it (bigger the batch, lesser the cost).

Having too few partitions in the Bloom Index path however, starts to hurt parallelism when we actually probe individual files whether they actually contain target keys or not. Solution to this is to split these 2 in different stages w/ drastically different parallelism levels: constrain parallelism when reading from MT (10s of tasks) and keep at the current level for probing individual files (100s of tasks)

Current way of partitioning records (relying on Spark's default partitioner) was entailing that every Spark executor with high likelihood will be opening up (and processing) every file-group of the MT Bloom Filter partition. To alleviate that same hashing algorithm used by MT should be used to partition records into Spark's individual partitions, so that we can limit every task to open no more than 1 file-group in Bloom Filter's partition of MT

To achieve that following changes in Bloom Index sequence (leveraging MT) are implemented

Bloom Filter probing and actual File Probing are split into 2 separate operations (so that parallelism of each of them could be controlled individually)
Requests to MT are replaced to invoke batch APIs
Custom partitioner is introduced AffineBloomIndexFileGroupPartitioner repartitioning dataset of filenames with corresponding record keys in a way that is affine w/ MT Bloom Filters' partitioning (allowing us to open no more than a single file-group per Spark's task)
Additionally, this PR addresses some of the low-hanging performance optimizations that could considerably improve performance of the Bloom Index lookup sequence like mapping file-comparison pairs to PairRDD (where key is file-name, and value is record-key) instead of RDD so that we could:

Do in-partition sorting by filename (to make sure we check all records w/in the file all at once) w/in a single Spark partition instead of global one (reducing shuffling as well)
Avoid re-shuffling (by re-mapping from RDD to PairRDD later)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:spark Spark integration priority:blocker Production down; release blocker

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

5 participants