Skip to content

Commit 44424d6

Browse files
committed
add derivation pipeline worker concurrency limits
1 parent b82f7e0 commit 44424d6

File tree

1 file changed

+4
-2
lines changed
  • crates/derivation-pipeline/src

1 file changed

+4
-2
lines changed

crates/derivation-pipeline/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ impl Stream for DerivationPipeline {
9494
}
9595
}
9696

97+
/// The maximum number of concurrent batch derivation futures.
98+
const DERIVATION_PIPELINE_WORKER_CONCURRENCY: usize = 5;
99+
97100
/// A structure holding the current unresolved futures for the derivation pipeline.
98101
#[derive(Debug)]
99102
pub struct DerivationPipelineWorker<P> {
@@ -170,8 +173,7 @@ where
170173
tokio::select! {
171174
biased;
172175

173-
// TODO: consider adding a filter on the receiver channel to limit the number of active derivation futures / concurrency.
174-
Some(batch_info) = self.batch_receiver.recv() => {
176+
Some(batch_info) = self.batch_receiver.recv(), if self.futures.len() < DERIVATION_PIPELINE_WORKER_CONCURRENCY => {
175177
let fut = self.derivation_future(batch_info);
176178
self.futures.push_back(fut);
177179
}

0 commit comments

Comments
 (0)