Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->

# S3A Prefetching


This document explains the `S3PrefetchingInputStream` and the various components it uses.

This input stream implements prefetching and caching to improve read performance of the input stream. A high level overview of this feature can also be found on [this](https://medium.com/pinterest-engineering/improving-efficiency-and-reducing-runtime-using-s3-read-optimization-b31da4b60fa0) blogpost.

With prefetching, we divide the file into blocks of a fixed size (default is 8MB), associate buffers to these blocks, and then read data into these buffers asynchronously. We also potentially cache these blocks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might drop the number of references to the default in case we want to change it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, have removed


### Basic Concepts

* **File** : A binary blob of data stored on some storage device.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the doc use Object to refer to something in s3, to distinguish it from the local FS files?

* **Block :** A file is divided into a number of blocks. The default size of a block is 8MB, but can be configured. The size of the first n-1 blocks is same, and the size of the last block may be same or smaller.
* **Block based reading** : The granularity of read is one block. That is, we read an entire block and return or none at all. Multiple blocks may be read in parallel.

### Configuring the stream

|Property |Meaning |Default |
|--- |--- |--- |
|fs.s3a.prefetch.enabled |Enable the prefetch input stream |TRUE |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. use backticks around the configuration names and values; all values must be valid if passed in as the config strings. they will be.
  2. use true for true, rather than the capitalised value

|fs.s3a.prefetch.block.size |Size of a block |8MB |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8MB isn't gong to be a valid value. 8M should be

|fs.s3a.prefetch.block.count |Number of blocks to prefetch |8 |

### Key Components:

`S3PrefetchingInputStream` - When prefetching is enabled, S3AFileSystem will return an instance of this class as the input stream. Depending on the file size, it will either use the `S3InMemoryInputStream` or the `S3CachingInputStream` as the underlying input stream.

`S3InMemoryInputStream` - Underlying input stream used when the file size < configured block size. Will read the entire file into memory.

`S3CachingInputStream` - Underlying input stream used when file size > configured block size. Uses asynchronous prefetching of blocks and caching to improve performance.

`BlockData` - Holds information about the blocks in a file, such as:

* Number of blocks in the file
* Block size
* State of each block (initially all blocks have state *NOT_READY*). Other states are: Queued, Ready, Cached.

`BufferData` - Holds the buffer and additional information about it such as:

* The block number this buffer is for
* State of the buffer (Unknown, Blank, Prefetching, Caching, Ready, Done). Initial state of a buffer is blank.

`CachingBlockManager` - Implements reading data into the buffer, prefetching and caching.

`BufferPool` - Manages a fixed sized pool of buffers. It’s used by `CachingBlockManager` to acquire buffers.

`S3File` - Implements operations to interact with S3 such as opening and closing the input stream to the S3 file.

`S3Reader` - Implements reading from the stream opened by `S3File`. Reads from this input stream in blocks of 64KB.

`FilePosition` - Provides functionality related to tracking the position in the file. Also gives access to the current buffer in use.

`SingleFilePerBlockCache` - Responsible for caching blocks to the local file system. Each cache block is stored on the local disk as a separate file.

### Operation

### S3InMemoryInputStream

If we have a file with size 5MB, and block size = 8MB. Since file size is less than the block size, the `S3InMemoryInputStream` will be used.

If the caller makes the following read calls:


```
in.read(buffer, 0, 3MB);
in.read(buffer, 0, 2MB);
```

When the first read is issued, there is no buffer in use yet. We get the data in this file by calling the `ensureCurrentBuffer()` method, which ensures that a buffer with data is available to be read from.

The `ensureCurrentBuffer()` then:

* Reads data into a buffer by calling `S3Reader.read(ByteBuffer buffer, long offset, int size)`
* `S3Reader` uses `S3File` to open an input stream to the S3 file by making a `getObject()` request with range as `(0, filesize)`.
* The S3Reader reads the entire file into the provided buffer, and once reading is complete closes the S3 stream and frees all underlying resources.
* Now the entire file is in a buffer, set this data in `FilePosition` so it can be accessed by the input stream.

The read operation now just gets the required bytes from the buffer in `FilePosition`.

When the second read is issued, there is already a valid buffer which can be used. Don’t do anything else, just read the required bytes from this buffer.

### S3CachingInputStream



[Image: image.png]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What image?

let's add alt text when embedding the image.

also, not sure what the best practice here for adding images to the docs. should they be stored in repo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they go into hadoop-tools/hadoop-aws/src/site/resources/images ; other modules have examples of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I didn't mean to commit this. I was initially using the image in the blogpost: https://medium.com/pinterest-engineering/improving-efficiency-and-reducing-runtime-using-s3-read-optimization-b31da4b60fa0, but that diagram is very high level so I'm not sure if it adds much here. Do we think having that will help? Or do we think we need a lower level architecture diagram?


Now, if we have a file with size 40MB, and block size = 8MB. The `S3CachingInputStream` will be used.

### Sequential Reads

If the caller makes the following calls:

```
in.read(buffer, 0, 5MB)
in.read(buffer, 0, 8MB)
```

For the first read call, there is no valid buffer yet. `ensureCurrentBuffer()` is called, and for the first read(), prefetch count is set as 1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add backticks around 'read()'


The current block (block 0) is read synchronously, while the blocks to be prefetched (block 1) is read asynchronously.

The `CachingBlockManager` is responsible for getting buffers from the buffer pool and reading data into them. This process of acquiring the buffer pool works as follows:


* The buffer pool keeps a map of allocated buffers and a pool of available buffers. The size of this pool is = prefetch block count + 1. For the default value of 8, we will have a buffer pool of size 9.
* If the pool is not yet at capacity, create a new buffer and add it to the pool.
* If it’s at capacity, check if any buffers with state = done can be released. Releasing a buffer means removing it from allocated and returning it back to the pool of available buffers.
* If we have no buffers with state = done currently then nothing will be released, so retry the above step at a fixed interval a few times till a buffer becomes available.
* If after multiple retries we still don’t have an available buffer, release a buffer in the ready state. The buffer for the block furthest from the current block is released.

Once a buffer has been acquired by `CachingBlockManager`, if the buffer is in a *READY* state, we can just return it. This means that data was already read into this buffer asynchronously by a prefetch. If it’s state is *BLANK,* then data is read into it using `S3Reader.read(ByteBuffer buffer, long offset, int size).`

For the second read call, `in.read(buffer, 0, 8MB)`, since the block sizes are of 8MB and we have only read 5MB of block 0 so far, 3MB of the required data will be read from the current block 0. Once we’re done with this block, we’ll request the next block (block 1), which will already have been prefetched and so we can just start reading from it. Also, while reading from block 1 we will also issue prefetch requests for the next blocks. The number of blocks to be prefetched is determined by `fs.s3a.prefetch.block.count`, with the default being 8.


### Random Reads

If the caller makes the following calls:


```
in.read(buffer, 0, 5MB)
in.seek(10MB)
in.read(buffer, 0, 4MB)
in.seek(2MB)
in.read(buffer, 0, 4MB)
```


The `CachingInputStream` also caches prefetched blocks. This happens when a `seek()` is issued for outside the current block and the current block still has not been fully read.

For the above read sequence, when the `seek(10MB)` call is issued, block 0 has not been read completely so we should cache it as we will probably want to read from it again.

When `seek(2MB)` is called, the position is back inside block 0. The next read can now be satisfied from the locally cached block, which is typically orders of magnitude faster than a network based read.