Skip to content

Commit 9a200c1

Browse files
committed
even fast unlock in contention
This is an alternative implementation of idea Amanieu#461. Compared to Amanieu#461, this PR maintains parked bit on waiter side, so that waker doesn't have to atomic operation twice. And waker now reset all lock states back to 0 no matter what state it was. This makes fast lock more likely succeed during high contention. Signed-off-by: Jay <BusyJay@users.noreply.github.com>
1 parent 87ce756 commit 9a200c1

3 files changed

Lines changed: 64 additions & 47 deletions

File tree

src/condvar.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
// copied, modified, or distributed except according to those terms.
77

88
use crate::mutex::MutexGuard;
9-
use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
10-
use crate::{deadlock, util};
9+
use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL, TOKEN_RESTORE_PARKED_BIT};
10+
use crate::util;
1111
use core::{
1212
fmt, ptr,
1313
sync::atomic::{AtomicPtr, Ordering},
@@ -229,9 +229,10 @@ impl Condvar {
229229
// If we requeued threads to the mutex, mark it as having
230230
// parked threads. The RequeueAll case is already handled above.
231231
if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
232-
unsafe { (*mutex).mark_parked() };
232+
TOKEN_RESTORE_PARKED_BIT
233+
} else {
234+
TOKEN_NORMAL
233235
}
234-
TOKEN_NORMAL
235236
};
236237
let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };
237238

@@ -350,10 +351,10 @@ impl Condvar {
350351
}
351352

352353
// ... and re-lock it once we are done sleeping
353-
if result == ParkResult::Unparked(TOKEN_HANDOFF) {
354-
unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
355-
} else {
356-
mutex.lock();
354+
match result {
355+
ParkResult::Unparked(TOKEN_HANDOFF) => unreachable!("can't be handed off"),
356+
ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => mutex.lock_contention(),
357+
_ => mutex.lock(),
357358
}
358359

359360
WaitTimeoutResult(!(result.is_unparked() || requeued))

src/raw_mutex.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub(crate) const TOKEN_NORMAL: UnparkToken = UnparkToken(0);
2222
// thread directly without unlocking it.
2323
pub(crate) const TOKEN_HANDOFF: UnparkToken = UnparkToken(1);
2424

25+
// UnparkToken used to indicate that the waiter should restore PARKED_BIT.
26+
pub(crate) const TOKEN_RESTORE_PARKED_BIT: UnparkToken = UnparkToken(2);
27+
2528
/// This bit is set in the `state` of a `RawMutex` when that mutex is locked by some thread.
2629
const LOCKED_BIT: u8 = 0b01;
2730
/// This bit is set in the `state` of a `RawMutex` just before parking a thread. A thread is being
@@ -69,7 +72,7 @@ unsafe impl lock_api::RawMutex for RawMutex {
6972
.compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
7073
.is_err()
7174
{
72-
self.lock_slow(None);
75+
self.lock_slow(None, false);
7376
}
7477
unsafe { deadlock::acquire_resource(self as *const _ as usize) };
7578
}
@@ -99,11 +102,8 @@ unsafe impl lock_api::RawMutex for RawMutex {
99102
#[inline]
100103
unsafe fn unlock(&self) {
101104
deadlock::release_resource(self as *const _ as usize);
102-
if self
103-
.state
104-
.compare_exchange(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed)
105-
.is_ok()
106-
{
105+
let prev = self.state.swap(0, Ordering::Release);
106+
if prev == LOCKED_BIT {
107107
return;
108108
}
109109
self.unlock_slow(false);
@@ -151,7 +151,7 @@ unsafe impl lock_api::RawMutexTimed for RawMutex {
151151
{
152152
true
153153
} else {
154-
self.lock_slow(Some(timeout))
154+
self.lock_slow(Some(timeout), false)
155155
};
156156
if result {
157157
unsafe { deadlock::acquire_resource(self as *const _ as usize) };
@@ -168,7 +168,7 @@ unsafe impl lock_api::RawMutexTimed for RawMutex {
168168
{
169169
true
170170
} else {
171-
self.lock_slow(util::to_deadline(timeout))
171+
self.lock_slow(util::to_deadline(timeout), false)
172172
};
173173
if result {
174174
unsafe { deadlock::acquire_resource(self as *const _ as usize) };
@@ -199,23 +199,27 @@ impl RawMutex {
199199
}
200200
}
201201

202-
// Used by Condvar when requeuing threads to us, must be called while
203-
// holding the queue lock.
204202
#[inline]
205-
pub(crate) fn mark_parked(&self) {
206-
self.state.fetch_or(PARKED_BIT, Ordering::Relaxed);
203+
pub(crate) fn lock_contention(&self) {
204+
self.lock_slow(None, true);
207205
}
208206

209207
#[cold]
210-
fn lock_slow(&self, timeout: Option<Instant>) -> bool {
208+
fn lock_slow(&self, timeout: Option<Instant>, in_contention: bool) -> bool {
211209
let mut spinwait = SpinWait::new();
212210
let mut state = self.state.load(Ordering::Relaxed);
211+
let mut extra_flags;
212+
if in_contention {
213+
extra_flags = PARKED_BIT;
214+
} else {
215+
extra_flags = 0;
216+
}
213217
loop {
214218
// Grab the lock if it isn't locked, even if there is a queue on it
215219
if state & LOCKED_BIT == 0 {
216220
match self.state.compare_exchange_weak(
217221
state,
218-
state | LOCKED_BIT,
222+
state | LOCKED_BIT | extra_flags,
219223
Ordering::Acquire,
220224
Ordering::Relaxed,
221225
) {
@@ -254,6 +258,7 @@ impl RawMutex {
254258
self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
255259
}
256260
};
261+
extra_flags = 0;
257262
// SAFETY:
258263
// * `addr` is an address we control.
259264
// * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
@@ -271,6 +276,7 @@ impl RawMutex {
271276
// The thread that unparked us passed the lock on to us
272277
// directly without unlocking it.
273278
ParkResult::Unparked(TOKEN_HANDOFF) => return true,
279+
ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => extra_flags = PARKED_BIT,
274280

275281
// We were unparked normally, try acquiring the lock again
276282
ParkResult::Unparked(_) => (),
@@ -296,7 +302,7 @@ impl RawMutex {
296302
let callback = |result: UnparkResult| {
297303
// If we are using a fair unlock then we should keep the
298304
// mutex locked and hand it off to the unparked thread.
299-
if result.unparked_threads != 0 && (force_fair || result.be_fair) {
305+
if result.unparked_threads != 0 && force_fair {
300306
// Clear the parked bit if there are no more parked
301307
// threads.
302308
if !result.have_more_threads {
@@ -308,8 +314,12 @@ impl RawMutex {
308314
// Clear the locked bit, and the parked bit as well if there
309315
// are no more parked threads.
310316
if result.have_more_threads {
311-
self.state.store(PARKED_BIT, Ordering::Release);
312-
} else {
317+
if force_fair {
318+
self.state.store(PARKED_BIT, Ordering::Release);
319+
} else {
320+
return TOKEN_RESTORE_PARKED_BIT;
321+
}
322+
} else if force_fair {
313323
self.state.store(0, Ordering::Release);
314324
}
315325
TOKEN_NORMAL

src/raw_rwlock.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// copied, modified, or distributed except according to those terms.
77

88
use crate::elision::{have_elision, AtomicElisionExt};
9-
use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
9+
use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL, TOKEN_RESTORE_PARKED_BIT};
1010
use crate::util;
1111
use core::{
1212
cell::Cell,
@@ -93,11 +93,8 @@ unsafe impl lock_api::RawRwLock for RawRwLock {
9393
#[inline]
9494
unsafe fn unlock_exclusive(&self) {
9595
self.deadlock_release();
96-
if self
97-
.state
98-
.compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99-
.is_ok()
100-
{
96+
let prev = self.state.swap(0, Ordering::Release);
97+
if prev == WRITER_BIT {
10198
return;
10299
}
103100
self.unlock_exclusive_slow(false);
@@ -613,7 +610,7 @@ impl RawRwLock {
613610

614611
#[cold]
615612
fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
616-
let try_lock = |state: &mut usize| {
613+
let try_lock = |state: &mut usize, extra_flags: usize| {
617614
loop {
618615
if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
619616
return false;
@@ -622,7 +619,7 @@ impl RawRwLock {
622619
// Grab WRITER_BIT if it isn't set, even if there are parked threads.
623620
match self.state.compare_exchange_weak(
624621
*state,
625-
*state | WRITER_BIT,
622+
*state | WRITER_BIT | extra_flags,
626623
Ordering::Acquire,
627624
Ordering::Relaxed,
628625
) {
@@ -653,7 +650,7 @@ impl RawRwLock {
653650
let callback = |mut new_state, result: UnparkResult| {
654651
// If we are using a fair unlock then we should keep the
655652
// rwlock locked and hand it off to the unparked threads.
656-
if result.unparked_threads != 0 && (force_fair || result.be_fair) {
653+
if result.unparked_threads != 0 && force_fair {
657654
if result.have_more_threads {
658655
new_state |= PARKED_BIT;
659656
}
@@ -662,8 +659,12 @@ impl RawRwLock {
662659
} else {
663660
// Clear the parked bit if there are no more parked threads.
664661
if result.have_more_threads {
665-
self.state.store(PARKED_BIT, Ordering::Release);
666-
} else {
662+
if force_fair {
663+
self.state.store(PARKED_BIT, Ordering::Release);
664+
} else {
665+
return TOKEN_RESTORE_PARKED_BIT;
666+
}
667+
} else if force_fair {
667668
self.state.store(0, Ordering::Release);
668669
}
669670
TOKEN_NORMAL
@@ -677,13 +678,13 @@ impl RawRwLock {
677678

678679
#[cold]
679680
fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
680-
let try_lock = |state: &mut usize| {
681+
let try_lock = |state: &mut usize, extra_flags: usize| {
681682
let mut spinwait_shared = SpinWait::new();
682683
loop {
683684
// Use hardware lock elision to avoid cache conflicts when multiple
684685
// readers try to acquire the lock. We only do this if the lock is
685686
// completely empty since elision handles conflicts poorly.
686-
if have_elision() && *state == 0 {
687+
if have_elision() && *state == 0 && extra_flags == 0 {
687688
match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
688689
Ok(_) => return true,
689690
Err(x) => *state = x,
@@ -702,9 +703,10 @@ impl RawRwLock {
702703
.state
703704
.compare_exchange_weak(
704705
*state,
705-
state
706-
.checked_add(ONE_READER)
707-
.expect("RwLock reader count overflow"),
706+
extra_flags
707+
| state
708+
.checked_add(ONE_READER)
709+
.expect("RwLock reader count overflow"),
708710
Ordering::Acquire,
709711
Ordering::Relaxed,
710712
)
@@ -745,7 +747,7 @@ impl RawRwLock {
745747

746748
#[cold]
747749
fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
748-
let try_lock = |state: &mut usize| {
750+
let try_lock = |state: &mut usize, extra_flags: usize| {
749751
let mut spinwait_shared = SpinWait::new();
750752
loop {
751753
if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
@@ -756,9 +758,10 @@ impl RawRwLock {
756758
.state
757759
.compare_exchange_weak(
758760
*state,
759-
state
760-
.checked_add(ONE_READER | UPGRADABLE_BIT)
761-
.expect("RwLock reader count overflow"),
761+
extra_flags
762+
| state
763+
.checked_add(ONE_READER | UPGRADABLE_BIT)
764+
.expect("RwLock reader count overflow"),
762765
Ordering::Acquire,
763766
Ordering::Relaxed,
764767
)
@@ -1067,14 +1070,15 @@ impl RawRwLock {
10671070
&self,
10681071
timeout: Option<Instant>,
10691072
token: ParkToken,
1070-
mut try_lock: impl FnMut(&mut usize) -> bool,
1073+
mut try_lock: impl FnMut(&mut usize, usize) -> bool,
10711074
validate_flags: usize,
10721075
) -> bool {
10731076
let mut spinwait = SpinWait::new();
10741077
let mut state = self.state.load(Ordering::Relaxed);
1078+
let mut extra_flags = 0;
10751079
loop {
10761080
// Attempt to grab the lock
1077-
if try_lock(&mut state) {
1081+
if try_lock(&mut state, extra_flags) {
10781082
return true;
10791083
}
10801084

@@ -1118,10 +1122,12 @@ impl RawRwLock {
11181122
let park_result = unsafe {
11191123
parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
11201124
};
1125+
extra_flags = 0;
11211126
match park_result {
11221127
// The thread that unparked us passed the lock on to us
11231128
// directly without unlocking it.
11241129
ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1130+
ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => extra_flags = PARKED_BIT,
11251131

11261132
// We were unparked normally, try acquiring the lock again
11271133
ParkResult::Unparked(_) => (),

0 commit comments

Comments
 (0)