diff --git a/Cargo.toml b/Cargo.toml index a4dec8c70..eb187c02f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ exclude = ["ci"] [dependencies] rayon-core = { version = "1.5.0", path = "rayon-core" } -crossbeam-deque = "0.6.3" +crossbeam-deque = "0.7" # This is a public dependency! [dependencies.either] diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index e17d7e422..358ed1a24 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -17,7 +17,7 @@ categories = ["concurrency"] [dependencies] num_cpus = "1.2" lazy_static = "1" -crossbeam-deque = "0.6.3" +crossbeam-deque = "0.7" crossbeam-queue = "0.1.2" crossbeam-utils = "0.6.5" diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 76567c370..f529ad047 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -1,4 +1,4 @@ -use crossbeam_deque::{self as deque, Pop, Steal, Stealer, Worker}; +use crossbeam_deque::{self as deque, Steal, Stealer, Worker}; use crossbeam_queue::SegQueue; #[cfg(rayon_unstable)] use internal::task::Task; @@ -224,11 +224,14 @@ impl Registry { let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads) .map(|_| { - if breadth_first { - deque::fifo() + let w = if breadth_first { + deque::Worker::new_fifo() } else { - deque::lifo() - } + deque::Worker::new_lifo() + }; + + let s = w.stealer(); + (w, s) }) .unzip(); @@ -674,13 +677,7 @@ impl WorkerThread { /// bottom. #[inline] pub(super) unsafe fn take_local_job(&self) -> Option { - loop { - match self.worker.pop() { - Pop::Empty => return None, - Pop::Data(d) => return Some(d), - Pop::Retry => {} - } - } + self.worker.pop() } /// Wait until the latch is set. Try to keep busy by popping and @@ -763,7 +760,7 @@ impl WorkerThread { loop { match victim.stealer.steal() { Steal::Empty => return None, - Steal::Data(d) => { + Steal::Success(d) => { log!(StoleWork { worker: self.index, victim: victim_index diff --git a/src/iter/par_bridge.rs b/src/iter/par_bridge.rs index b366de698..bc938e46e 100644 --- a/src/iter/par_bridge.rs +++ b/src/iter/par_bridge.rs @@ -79,7 +79,8 @@ where C: UnindexedConsumer, { let split_count = AtomicUsize::new(current_num_threads()); - let (worker, stealer) = deque::fifo(); + let worker = deque::Worker::new_fifo(); + let stealer = worker.stealer(); let done = AtomicBool::new(false); let iter = Mutex::new((self.iter, worker)); @@ -149,7 +150,7 @@ where { loop { match self.items.steal() { - Steal::Data(it) => { + Steal::Success(it) => { folder = folder.consume(it); if folder.full() { return folder;