|
1 | 1 | use std::collections::VecDeque; |
2 | 2 | use std::sync::{Condvar, Mutex}; |
3 | | -use std::time::{Duration, Instant}; |
| 3 | +use std::time::Duration; |
4 | 4 |
|
5 | 5 | /// A simple, threadsafe, queue of items of type `T` |
6 | 6 | /// |
@@ -40,41 +40,30 @@ impl<T> Queue<T> { |
40 | 40 |
|
41 | 41 | /// Pushes an item onto the queue, blocking if the queue is full. |
42 | 42 | pub fn push_bounded(&self, item: T) { |
43 | | - let mut state = self.state.lock().unwrap(); |
44 | | - loop { |
45 | | - if state.items.len() >= self.bound { |
46 | | - state = self.bounded_cv.wait(state).unwrap(); |
47 | | - } else { |
48 | | - state.items.push_back(item); |
49 | | - self.popper_cv.notify_one(); |
50 | | - break; |
51 | | - } |
52 | | - } |
| 43 | + let locked_state = self.state.lock().unwrap(); |
| 44 | + let mut state = self |
| 45 | + .bounded_cv |
| 46 | + .wait_while(locked_state, |s| s.items.len() >= self.bound) |
| 47 | + .unwrap(); |
| 48 | + state.items.push_back(item); |
| 49 | + self.popper_cv.notify_one(); |
53 | 50 | } |
54 | 51 |
|
55 | 52 | pub fn pop(&self, timeout: Duration) -> Option<T> { |
56 | | - let mut state = self.state.lock().unwrap(); |
57 | | - let now = Instant::now(); |
58 | | - while state.items.is_empty() { |
59 | | - let elapsed = now.elapsed(); |
60 | | - if elapsed >= timeout { |
61 | | - break; |
62 | | - } |
63 | | - let (lock, result) = self |
64 | | - .popper_cv |
65 | | - .wait_timeout(state, timeout - elapsed) |
66 | | - .unwrap(); |
67 | | - state = lock; |
68 | | - if result.timed_out() { |
69 | | - break; |
| 53 | + let (mut state, result) = self |
| 54 | + .popper_cv |
| 55 | + .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty()) |
| 56 | + .unwrap(); |
| 57 | + if result.timed_out() { |
| 58 | + None |
| 59 | + } else { |
| 60 | + let value = state.items.pop_front()?; |
| 61 | + if state.items.len() < self.bound { |
| 62 | + // Assumes threads cannot be canceled. |
| 63 | + self.bounded_cv.notify_one(); |
70 | 64 | } |
| 65 | + Some(value) |
71 | 66 | } |
72 | | - let value = state.items.pop_front()?; |
73 | | - if state.items.len() < self.bound { |
74 | | - // Assumes threads cannot be canceled. |
75 | | - self.bounded_cv.notify_one(); |
76 | | - } |
77 | | - Some(value) |
78 | 67 | } |
79 | 68 |
|
80 | 69 | pub fn try_pop_all(&self) -> Vec<T> { |
|
0 commit comments