Skip to content

Conversation

@alextwoods
Copy link
Contributor

@alextwoods alextwoods commented Nov 5, 2025

Considers the outstanding demand (in flight request to the subscription) in ByteBufferStoringSubscriber before requesting more.

Motivation and Context

Fixes #3726

The ByteBufferStoringSubscriber requests more data from its subscription in two cases:

  1. when data is read when transferTo is called.
  2. when receiving data from onNext.

In both cases, it will only request more when the amount of buffered data is less than its limit. However, it was not considering unfulfilled requests (ie, requests that have been made to the subscription, but which have not yet been fulfilled) - it is possible then for the ByteBufferStoringSubscriber to queue up a large number of requests and once those are finally fulfilled, end up with a large number of events that are buffered. This can lead to OutOfMemory issues, in particularly when using a bursty Publisher/Subscription (ie, one that will fulfill a large number of requests quickly and then wait for awhile to build up more outstanding requests - an example of this is the FileAsyncRequestBody)

Modifications

  • Sets a byteBufferSizeHint when onNext is called.
  • Track outstanding demand (ie, the difference between requests and onNext calls).
  • Don't make more requests if the estimated in flight requests + current buffer is larger than the minimumBytesBuffered
  • Increases the MINIMUM_BYTES_BUFFERED back to 16 MB as it was before Fixed the issue where CRT-based S3 client was using excessive memory #3800 - the lower limit was required when outstanding demand wasn't considered.

A note on the use of AtomicInteger for tracking outstandingDemand vs volatile long for tracking byteBufferSizeHint - outstandingDemand is a counter and requires atomic updates (since we are incrementing/decrementing it). byteBufferSizeHint on the other hand is simply set to whatever the latest size is. It should be volatile so that whatever thread is accessing it gets the latest values, but its updates do not need to be atomic.

Testing

Added new unit tests. Manual testing with a large file upload and small max heap size - before the change, hit OOM, with the change, memory usage is much lower (200 mb -> 40 mb max usage for a 2 GB file).

Screenshots (if appropriate)

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

Checklist

  • I have read the CONTRIBUTING document
  • Local run of mvn install succeeds
  • My code follows the code style of this project
  • [] My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • I have added tests to cover my changes
  • All new and existing tests passed
  • I have added a changelog entry. Adding a new entry must be accomplished by running the scripts/new-change script and following the instructions. Commit the new file created by the script in .changes/next-release with your changes.
  • My change is to implement 1.11 parity feature and I have updated LaunchChangelog

License

  • I confirm that this pull request can be released under the Apache 2 license

@alextwoods alextwoods requested a review from a team as a code owner November 5, 2025 22:32
if (maximumOutstandingDemand.isPresent() && outstandingDemand.get() >= maximumOutstandingDemand.get()) {
return;
}

if (currentDataBuffered < minimumBytesBuffered) {
Copy link
Contributor

@zoewangg zoewangg Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of enforcing maximumOutstandingDemand, can we use byteBuffer.remaining() as a hint and check outstandingDemand.get() * buffer hints + currentDataBuffered < minimumBytesBuffered?

Because this class is used else where like https://github.com/aws/aws-sdk-java-v2/blob/master/utils/src/main/java/software/amazon/awssdk/utils/async/InputStreamSubscriber.java#L44 and it'd address the issue there as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - thats a great idea - I've updated the approach to use a sizeHint

@@ -28,14 +28,16 @@
@SdkInternalApi
public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStream {
private static final long MINIMUM_BYTES_BUFFERED = 1024 * 1024L;
// for 16 kb chunks, this limits to about 16 MB (2x the standard crt provided buffer size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the 16KB come from? Is it related to #6542?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was removed based on the sizeHint approach, but I've updated the MINIMUM_BYTES_BUFFERED to what it was prior to #3800

@alextwoods alextwoods changed the title Add limit on outstanding demand to ByteBufferStoringSubscriber Consider outstanding demand in ByteBufferStoringSubscriber before requesting more Nov 6, 2025
@alextwoods alextwoods added the no-api-surface-area-change Indicate there is no API surface area change and thus API surface area review is not required label Nov 6, 2025
@sonarqubecloud
Copy link

sonarqubecloud bot commented Nov 6, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no-api-surface-area-change Indicate there is no API surface area change and thus API surface area review is not required

Projects

None yet

Development

Successfully merging this pull request may close these issues.

S3 Transfer Manager upload causes memory issues

4 participants