@@ -204,11 +204,18 @@ void run_sub(void *zctx, bool &term, string &read_source, internal_events_lst_t
204204 EXPECT_EQ (0 , zmq_setsockopt (mock_sub, ZMQ_SUBSCRIBE, " " , 0 ));
205205 EXPECT_EQ (0 , zmq_setsockopt (mock_sub, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
206206
207- while (!term) {
208- if (0 == zmq_message_read (mock_sub, 0 , source, ev_int)) {
209- lst.push_back (ev_int);
210- read_source.swap (source);
211- cnt = (int )lst.size ();
207+ if (cnt == 0 ) {
208+ while (!term) {
209+ if (0 == zmq_message_read (mock_sub, 0 , source, ev_int)) {
210+ lst.push_back (ev_int);
211+ read_source.swap (source);
212+ cnt = (int )lst.size ();
213+ }
214+ }
215+ }
216+ else {
217+ while (!term) {
218+ this_thread::sleep_for (chrono::milliseconds (100 ));
212219 }
213220 }
214221
@@ -349,6 +356,11 @@ TEST(eventd, capture)
349356 EXPECT_EQ (0 , pxy->init ());
350357
351358 /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
359+ /*
360+ * Block sub from calling zmq_message_read as capture service is calling
361+ * and zmq_message_read crashes on access from more than one thread.
362+ */
363+ sub_evts_sz = -1 ;
352364 thread thr_sub (&run_sub, zctx, ref (term_sub), ref (sub_source), ref (sub_evts), ref (sub_evts_sz));
353365
354366 /* Create capture service */
@@ -469,6 +481,11 @@ TEST(eventd, captureCacheMax)
469481 EXPECT_EQ (0 , pxy->init ());
470482
471483 /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
484+ /*
485+ * Block sub from calling zmq_message_read as capture service is calling
486+ * and zmq_message_read crashes on access from more than one thread.
487+ */
488+ sub_evts_sz = -1 ;
472489 thread thr_sub (&run_sub, zctx, ref (term_sub), ref (sub_source), ref (sub_evts), ref (sub_evts_sz));
473490
474491 /* Create capture service */
0 commit comments