@@ -68,31 +68,62 @@ class BlockingQueue {
6868 }
6969
7070 bool Push (const T &elem) {
71- {
72- std::unique_lock<std::mutex> lock (mutex_);
73- cv_.wait (lock, [&] { return queue_.size () < capacity_; });
74- queue_.push_back (elem);
71+ std::unique_lock<std::mutex> lock (mutex_);
72+ WaitForWrite (lock);
73+
74+ queue_.push_back (elem);
75+
76+ Notify ();
77+ return true ;
78+ }
79+ bool WaitForWrite (std::unique_lock<std::mutex> &lock) { // NOLINT
80+ while (FullUnlocked ()) {
81+ if (empty_waiters_ != 0 ) {
82+ empty_cond_.notify_one ();
83+ }
84+ full_waiters_++;
85+ full_cond_.wait (lock);
86+ full_waiters_--;
7587 }
76- cv_.notify_one ();
7788 return true ;
7889 }
79-
80- bool Push (T &&elem) {
81- {
82- std::unique_lock<std::mutex> lock (mutex_);
83- cv_.wait (lock, [&] { return queue_.size () < capacity_; });
84- queue_.emplace_back (std::move (elem));
90+ bool WaitForRead (std::unique_lock<std::mutex> &lock) { // NOLINT
91+ while (EmptyUnlocked ()) {
92+ if (full_waiters_ != 0 ) {
93+ full_cond_.notify_one ();
94+ }
95+ empty_waiters_++;
96+ empty_cond_.wait (lock);
97+ empty_waiters_--;
8598 }
86- cv_.notify_one ();
8799 return true ;
88100 }
101+ bool EmptyUnlocked () { return queue_.empty (); }
102+
103+ bool FullUnlocked () { return queue_.size () >= capacity_; }
104+ void Notify () {
105+ if (empty_waiters_ != 0 && (!EmptyUnlocked ())) {
106+ empty_cond_.notify_one ();
107+ }
108+ if (full_waiters_ != 0 && (!FullUnlocked ())) {
109+ full_cond_.notify_one ();
110+ }
111+ }
112+
113+ bool Push (T &&elem) {
114+ std::unique_lock<std::mutex> lock (mutex_);
115+ WaitForWrite (lock);
116+ queue_.emplace_back (std::move (elem));
89117
118+ Notify ();
119+ return true ;
120+ }
90121 T Pop () {
91122 std::unique_lock<std::mutex> lock (mutex_);
92- cv_. wait (lock, [=] { return !queue_. empty (); } );
123+ WaitForRead (lock);
93124 T rc (std::move (queue_.front ()));
94125 queue_.pop_front ();
95- cv_. notify_one ();
126+ Notify ();
96127 return rc;
97128 }
98129
@@ -107,11 +138,14 @@ class BlockingQueue {
107138 }
108139
109140 private:
141+ int empty_waiters_ = 0 ;
142+ int full_waiters_ = 0 ;
143+ std::condition_variable empty_cond_;
144+ std::condition_variable full_cond_;
110145 const size_t capacity_;
111146 std::deque<T> queue_;
112147
113148 mutable std::mutex mutex_;
114- std::condition_variable cv_;
115149};
116150
117151template <typename T, int MajorType = Eigen::RowMajor,
0 commit comments