Skip to content

[RFC] Add persistable replay buffer for large-scale rollout data storage #2539

@Xunmei-liu

Description

@Xunmei-liu

Motivation

  1. In reinforcement learning (RL) tasks, we have a large need for data transfer and sampling, such as storing rollout results and sampling batches for actor or critic training. However, when rollout data becomes too large to fit in memory, out-of-memory (OOM) issues arise. To address this, we propose a hybrid replay buffer architecture that combines an in-memory cache with RocksDB, allowing flexible support of fully in-memory, fully on-disk, or hybrid modes to resolve the OOM issues.
  2. Many RL use cases require experimenting with different sampling strategies, such as uniform sampling, prioritized-time (p-time) sampling, or other custom heuristics. To enable this, we designed a plugin-based sampling interface in the replay buffer to support custom sampling strategies.
  3. When a trial fails, all the history rollout data is lost when it restarts. Thus, we propose a mechanism to periodically upload its data to HDFS, allowing data persistence. In this way, the restarted trial or other trials can reuse data in the replay buffer.

Design choices (Pros & Cons)

In-memory cache + RocksDB hybrid structure

  • We use a combined in-memory cache and RocksDB structure, with the cache storing hot data and RocksDB storing cold data. Data in the cache and RocksDB are complementary. The user can set the cache_memory_limit_in_mb parameter, and when the data in the cache exceeds this limit, the least-recently used data will be evicted to RocksDB.
  • We chose this structure because 1. it enables disk-based storage to prevent OOM issues, and 2. we still want the client to be able to only use the in-memory cache if their data doesn't have OOM issues, so that it will have performance (QPS) similar to a python dictionary.

Pros

  • Allows flexibility in using fully in-memory, fully on-disk, or hybrid modes depending on the user's need

Cons

  • Increase in implementation difficulty

Disk vs remote data storage

  • We chose RocksDB instead of choosing remote data storage (such as Redis) because
    1. In terms of performance, local disk read time is lower than the round-trip latency to a remote data center.
    2. If using remote data storage, we also need to consider multi-tenant concerns, such as how to isolate data from different tasks. These issues don’t arise when using local disks.

Pros

  • Higher read/write QPS
  • Natural data isolation

Cons

  • limited storage capacity compared to scalable remote storage. In the future, if rollout data grows beyond the combined capacity of memory and local disk, we can implement an auto-scaling mechanism and build a distributed database by leveraging the local disks of multiple worker nodes.

Write-through vs write-back Cache

Generally, there are two types of cache write policies:

  1. Write through: data is simultaneously updated to cache and db
  2. Write back: data is updated only in the cache and flushed into the db at a later time

For write through, the cache is a subset of the db. For write-back, data in the cache and the db complement each other, allowing more data to be stored in total. Moreover, in write-back, only the cache needs to be updated during a push, so the push cost is lower. Thus, we chose write-back for our cache.

Pros

  • Greater overall data storage capacity
  • Faster push operations

Cons

  • Data loss risk: with write back, unflushed data in the cache may be lost if the system crashes before it's written to the database.

Request processing

To improve QPS, we propose an asynchronous processing engine structure. All write operations are non-blocking: instead of executing immediately, a task is appended to a queue and handled by a dedicated background thread (task-processor). This allows the main thread to continue without waiting.
There are 5 types of tasks - push, delete, snapshot, eviction, and populate. These are prioritized using two separate task queues:

  • p0 task queue: High-priority tasks (push, delete, snapshot)
  • p1 task queue: Lower-priority tasks (eviction, populate)
    When both queues have pending tasks, the processor chooses a task from the p0 queue with 80% probability to prioritize time-sensitive operations. It also selects a task from the p1 queue with a 20% probability to prevent task congestion in the p1 queue.

p0 Task Queue (High Priority):

  • Push task: A push task will be appended when the user calls push. When executed, it will be pushed to the in-memory cache.
  • Delete task: A delete task will be appended when the user calls delete. When executed, it will delete from the cache and RocksDB if the key exists.
  • Snapshot task: If the hdfs backup mode is turned on, a backup-manager background thread will append a snapshot task every 3 minutes. When executed, it will zip the current cache and RocksDB status and the backup manager will upload it to HDFS.

p1 Task Queue (Low Priority)

  • Eviction task: Whenever the data in the cache changes, a separate background thread responsible for eviction will calculate and update the cache size. When the cache size exceeds the set cache limit, an eviction task will be appended. When the eviction task is executed, the least recently used data will be evicted from the cache to RocksDB.
  • Populate task: When the data in RocksDB is accessed (e.g., the retrieved data is in RocksDB instead of the cache), a populate task will be appended. When the populate task is executed, the key - value pair will be moved from RocksDB to the cache.
    _Note: since all read operations still need to immediately return, there is a problem - If the user pushes a batch but the background thread hasn't executed the corresponding push task yet. At this time, if the user tries to get the key that was just pushed, they won't be able to retrieve it. So, we need another dictionary to store pending push/delete tasks. Before returning the result of a read operation, we need to merge it with the tasks in the dictionary.

p0_index:_

  • A dict storing {key -> pending list of push/delete task in p0 task queue}:
    • When the user calls get, first get the existing value from the cache/RocksDB. Then merge it with the pending tasks in the dictionary and return.
    • The same goes for sampling. First get all the existing keys, then iterate through the pending tasks to see if any keys need to be deleted or new keys need to be added. Then, sample a key.

Pros

  • Higher QPS
  • Simplified concurrency control to maintain thread safety, since all write operations are handled by a single task processor thread

Cons

  • Increase in complexity compared to a basic synchronous structure. This may make debugging more difficult

Sampler design

Customizable sampler

We propose a plugin based sampler design so that the user can customize their own sampling strategies by implementing their own samplers. We provide a uniform sampler implementation as a template that users can reference. The sample method of a sampler returns a generator, which can be used to iterate the keys in the replay buffer in a certain order.

Pros

  • Allows users to customize their own sampling strategies

Cons

  • If the users want sampling strategies other than uniform/p-time, they have to implement their own sampling strategies by following the template of the already existing code (p-time or uniform sampling), which may increase their workload.

Large scale sampling

Because the data to be sampled can be very large and cannot fit entirely in memory, we designed the sampler interface to support large-scale sampling by exposing an API that allows users to define custom index keys. Users can specify which fields should be indexed, and the mapping from those field values to keys will be maintained in memory.

For example, suppose a user wants to implement a sampling strategy that sorts by a score field. In their sampler's constructor, they can declare score as an index key:

super().__init__({"score"})

Then, the user can also implement the build_index method, so that when a new record is pushed into the replay buffer, e.g.,

push("rollout_id_1", {"score": 12, "foo": "blah"})

the sampler will build an in-memory mapping from score to keys in the replay buffer:

"12" -> ["rollout_id_1"]

This mapping is kept in memory, so when the user wants to sample by score order, they can implement the sample method to sort the index in memory and retrieve the corresponding replay buffer keys. The associated data (e.g., batch lists) can then be fetched using the replay buffer’s get API.

Pros

  • Resolve the issue of OOM issues during large-scale sampling

Cons

  • More complex implementation and maybe requires the user to read the documentation to understand

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions