Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ exclude = ["ci"]

[dependencies]
rayon-core = { version = "1.5.0", path = "rayon-core" }
crossbeam-deque = "0.6.3"
crossbeam-deque = "0.7"

# This is a public dependency!
[dependencies.either]
Expand Down
2 changes: 1 addition & 1 deletion rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ categories = ["concurrency"]
[dependencies]
num_cpus = "1.2"
lazy_static = "1"
crossbeam-deque = "0.6.3"
crossbeam-deque = "0.7"
crossbeam-queue = "0.1.2"
crossbeam-utils = "0.6.5"

Expand Down
23 changes: 10 additions & 13 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crossbeam_deque::{self as deque, Pop, Steal, Stealer, Worker};
use crossbeam_deque::{self as deque, Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
#[cfg(rayon_unstable)]
use internal::task::Task;
Expand Down Expand Up @@ -224,11 +224,14 @@ impl Registry {

let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
.map(|_| {
if breadth_first {
deque::fifo()
let w = if breadth_first {
deque::Worker::new_fifo()
} else {
deque::lifo()
}
deque::Worker::new_lifo()
};

let s = w.stealer();
(w, s)
})
.unzip();

Expand Down Expand Up @@ -674,13 +677,7 @@ impl WorkerThread {
/// bottom.
#[inline]
pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
loop {
match self.worker.pop() {
Pop::Empty => return None,
Pop::Data(d) => return Some(d),
Pop::Retry => {}
}
}
self.worker.pop()
}

/// Wait until the latch is set. Try to keep busy by popping and
Expand Down Expand Up @@ -763,7 +760,7 @@ impl WorkerThread {
loop {
match victim.stealer.steal() {
Steal::Empty => return None,
Steal::Data(d) => {
Steal::Success(d) => {
log!(StoleWork {
worker: self.index,
victim: victim_index
Expand Down
5 changes: 3 additions & 2 deletions src/iter/par_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ where
C: UnindexedConsumer<Self::Item>,
{
let split_count = AtomicUsize::new(current_num_threads());
let (worker, stealer) = deque::fifo();
let worker = deque::Worker::new_fifo();
let stealer = worker.stealer();
let done = AtomicBool::new(false);
let iter = Mutex::new((self.iter, worker));

Expand Down Expand Up @@ -149,7 +150,7 @@ where
{
loop {
match self.items.steal() {
Steal::Data(it) => {
Steal::Success(it) => {
folder = folder.consume(it);
if folder.full() {
return folder;
Expand Down