2323
2424#define VEC_SIZE (p ) ((int )p.size())
2525
26+ extern int zerrno;
27+
2628int
2729eventd_proxy::init ()
2830{
@@ -60,6 +62,8 @@ eventd_proxy::run()
6062
6163 /* runs forever until zmq context is terminated */
6264 zmq_proxy (m_frontend, m_backend, m_capture);
65+
66+ SWSS_LOG_INFO (" Stopped xpub/xsub proxy" );
6367}
6468
6569
@@ -71,10 +75,8 @@ capture_service::~capture_service()
7175void
7276capture_service::stop_capture ()
7377{
74- if (m_socket != NULL ) {
75- zmq_close (m_socket);
76- m_socket = NULL ;
77- }
78+ m_ctrl = STOP_CAPTURE;
79+
7880 if (m_thr.joinable ()) {
7981 m_thr.join ();
8082 }
@@ -104,26 +106,18 @@ validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq
104106
105107
106108void
107- capture_service::init_capture_cache (const events_data_lst_t &lst)
109+ capture_service::init_capture_cache (const event_serialized_lst_t &lst)
108110{
109111 /* clean any pre-existing cache */
110- int i;
111112 runtime_id_t rid;
112113 sequence_t seq;
113114
114- /*
115- * Reserve a MAX_PUBLISHERS_COUNT entries for last events, as we use it only
116- * upon m_events/vector overflow, which might block adding new entries in map
117- * if overall mem consumption is too high. Clearing the map just before use
118- * is likely to help.
115+ /* Cache given events as initial stock.
116+ * Save runtime ID with last seen seq to avoid duplicates, while reading
117+ * from capture socket.
118+ * No check for max cache size here, as most likely not needed.
119119 */
120- for (i=0 ; i<MAX_PUBLISHERS_COUNT; ++i) {
121- m_last_events[to_string (i)] = " " ;
122- }
123-
124- /* Cache last events -- as only the last instance */
125- /* This is required to compute missed count */
126- for (events_data_lst_t ::const_iterator itc = lst.begin (); itc != lst.end (); ++itc) {
120+ for (event_serialized_lst_t ::const_iterator itc = lst.begin (); itc != lst.end (); ++itc) {
127121 internal_event_t event;
128122
129123 if (deserialize (*itc, event) == 0 ) {
@@ -142,20 +136,43 @@ capture_service::init_capture_cache(const events_data_lst_t &lst)
142136void
143137capture_service::do_capture ()
144138{
145- /* clean any pre-existing cache */
139+ int rc;
146140 runtime_id_t rid;
147141 sequence_t seq;
142+ int block_ms=100 ;
143+ internal_event_t event;
144+ string source, evt_str;
145+ chrono::steady_clock::time_point start;
148146
149- /* Check read events against provided cache for 2 seconds to skip */
150- chrono::steady_clock::time_point start = chrono::steady_clock::now ();
151- while (!m_pre_exist_id.empty ()) {
152- internal_event_t event;
153- string source, evt_str;
147+ void *sock = NULL ;
148+ sock = zmq_socket (m_ctx, ZMQ_SUB);
149+ RET_ON_ERR (sock != NULL , " failing to get ZMQ_SUB socket" );
150+
151+ rc = zmq_connect (sock, get_config (string (CAPTURE_END_KEY)).c_str ());
152+ RET_ON_ERR (rc == 0 , " Failing to bind capture SUB to %s" , get_config (string (CAPTURE_END_KEY)).c_str ());
153+
154+ rc = zmq_setsockopt (sock, ZMQ_SUBSCRIBE, " " , 0 );
155+ RET_ON_ERR (rc == 0 , " Failing to ZMQ_SUBSCRIBE" );
156+
157+ rc = zmq_setsockopt (sock, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms));
158+ RET_ON_ERR (rc == 0 , " Failed to ZMQ_RCVTIMEO to %d" , block_ms);
159+
160+ m_cap_run = true ;
161+
162+ while (m_ctrl != START_CAPTURE) {
163+ /* Wait for capture start */
164+ this_thread::sleep_for (chrono::milliseconds (10 ));
165+ }
154166
155- RET_ON_ERR (zmq_message_read (m_socket, 0 , source, event) == 0 ,
156- " Failed to read from capture socket" );
167+ /* Check read events against provided cache for 2 seconds to skip */
168+ start = chrono::steady_clock::now ();
169+ while ((m_ctrl == START_CAPTURE) && !m_pre_exist_id.empty ()) {
157170
158- if (validate_event (event, rid, seq)) {
171+ if (zmq_message_read (sock, 0 , source, event) == -1 ) {
172+ RET_ON_ERR (zerrno == EAGAIN,
173+ " 0:Failed to read from capture socket" );
174+ }
175+ else if (validate_event (event, rid, seq)) {
159176
160177 serialize (event, evt_str);
161178 pre_exist_id_t ::iterator it = m_pre_exist_id.find (rid);
@@ -175,13 +192,13 @@ capture_service::do_capture()
175192 pre_exist_id_t ().swap (m_pre_exist_id);
176193
177194 /* Save until max allowed */
178- while (VEC_SIZE (m_events) < m_cache_max) {
179- internal_event_t event;
180- string source, evt_str;
195+ while ((m_ctrl == START_CAPTURE) && (VEC_SIZE (m_events) < m_cache_max)) {
181196
182- RET_ON_ERR (zmq_message_read (m_socket, 0 , source, event) == 0 ,
183- " Failed to read from capture socket" );
184- if (validate_event (event, rid, seq)) {
197+ if (zmq_message_read (sock, 0 , source, event) == -1 ) {
198+ RET_ON_ERR (zerrno == EAGAIN,
199+ " 1: Failed to read from capture socket" );
200+ }
201+ else if (validate_event (event, rid, seq)) {
185202 serialize (event, evt_str);
186203 try
187204 {
@@ -200,14 +217,13 @@ capture_service::do_capture()
200217
201218
202219 /* Save only last event per sender */
203- while (true ) {
204- internal_event_t event;
205- string source, evt_str;
220+ while ((m_ctrl == START_CAPTURE)) {
206221
207- RET_ON_ERR (zmq_message_read (m_socket, 0 , source, event) == 0 ,
208- " Failed to read from capture socket" );
222+ if (zmq_message_read (sock, 0 , source, event) == -1 ) {
223+ RET_ON_ERR (zerrno == EAGAIN,
224+ " 2:Failed to read from capture socket" );
209225
210- if (validate_event (event, rid, seq)) {
226+ } else if (validate_event (event, rid, seq)) {
211227 serialize (event, evt_str);
212228 m_last_events[rid] = evt_str;
213229 }
@@ -217,35 +233,29 @@ capture_service::do_capture()
217233 * Capture stop will close the socket which fail the read
218234 * and hence bail out.
219235 */
236+ zmq_close (sock);
237+ m_cap_run = false ;
220238 return ;
221239}
222240
223241
224242int
225- capture_service::set_control (capture_control_t ctrl, events_data_lst_t *lst)
243+ capture_service::set_control (capture_control_t ctrl, event_serialized_lst_t *lst)
226244{
227245 int ret = -1 ;
228- int rc;
229246
230247 /* Can go in single step only. */
231- RET_ON_ERR ((ctrl - m_ctrl) == 1 , " m_ctrl(%d) > ctrl(%d)" , m_ctrl, ctrl);
232- m_ctrl = ctrl;
248+ RET_ON_ERR ((ctrl - m_ctrl) == 1 , " m_ctrl(%d)+1 < ctrl(%d)" , m_ctrl, ctrl);
233249
234- switch (m_ctrl ) {
250+ switch (ctrl ) {
235251 case INIT_CAPTURE:
236- {
237- void *sock = NULL ;
238- sock = zmq_socket (m_ctx, ZMQ_SUB);
239- RET_ON_ERR (sock != NULL , " failing to get ZMQ_SUB socket" );
240-
241- rc = zmq_connect (sock, get_config (string (CAPTURE_END_KEY)).c_str ());
242- RET_ON_ERR (rc == 0 , " Failing to bind capture SUB to %s" , get_config (string (CAPTURE_END_KEY)).c_str ());
243-
244- rc = zmq_setsockopt (sock, ZMQ_SUBSCRIBE, " " , 0 );
245- RET_ON_ERR (rc == 0 , " Failing to ZMQ_SUBSCRIBE" );
246-
247- m_socket = sock;
252+ m_thr = thread (&capture_service::do_capture, this );
253+ for (int i=0 ; !m_cap_run && (i < 100 ); ++i) {
254+ /* Wait max a second for thread to init */
255+ this_thread::sleep_for (chrono::milliseconds (10 ));
248256 }
257+ RET_ON_ERR (m_cap_run, " Failed to init capture" );
258+ m_ctrl = ctrl;
249259 ret = 0 ;
250260 break ;
251261
@@ -264,9 +274,7 @@ capture_service::set_control(capture_control_t ctrl, events_data_lst_t *lst)
264274 if ((lst != NULL ) && (!lst->empty ())) {
265275 init_capture_cache (*lst);
266276 }
267-
268- m_thr = thread (&capture_service::do_capture, this );
269- RET_ON_ERR (m_thr.joinable (), " Capture thread not running" );
277+ m_ctrl = ctrl;
270278 ret = 0 ;
271279 break ;
272280
@@ -279,24 +287,25 @@ capture_service::set_control(capture_control_t ctrl, events_data_lst_t *lst)
279287 */
280288 this_thread::sleep_for (chrono::milliseconds (CACHE_DRAIN_IN_MILLISECS));
281289 stop_capture ();
290+ ret = 0 ;
282291 break ;
283292
284293 default :
285- SWSS_LOG_ERROR (" Unexpected code=%d" , m_ctrl );
294+ SWSS_LOG_ERROR (" Unexpected code=%d" , ctrl );
286295 break ;
287296 }
288297out:
289298 return ret;
290299}
291300
292301int
293- capture_service::read_cache (events_data_lst_t &lst_fifo,
302+ capture_service::read_cache (event_serialized_lst_t &lst_fifo,
294303 last_events_t &lst_last)
295304{
296305 lst_fifo.swap (m_events);
297306 lst_last.swap (m_last_events);
298307 last_events_t ().swap (m_last_events);
299- events_data_lst_t ().swap (m_events);
308+ event_serialized_lst_t ().swap (m_events);
300309 return 0 ;
301310}
302311
@@ -309,7 +318,7 @@ run_eventd_service()
309318 eventd_proxy *proxy = NULL ;
310319 capture_service *capture = NULL ;
311320
312- events_data_lst_t capture_fifo_events;
321+ event_serialized_lst_t capture_fifo_events;
313322 last_events_t capture_last_events;
314323
315324 SWSS_LOG_ERROR (" Eventd service starting\n " );
@@ -329,7 +338,7 @@ run_eventd_service()
329338
330339 while (true ) {
331340 int code, resp = -1 ;
332- events_data_lst_t req_data, resp_data;
341+ event_serialized_lst_t req_data, resp_data;
333342
334343 RET_ON_ERR (service.channel_read (code, req_data) == 0 ,
335344 " Failed to read request" );
@@ -340,7 +349,7 @@ run_eventd_service()
340349 if (capture != NULL ) {
341350 delete capture;
342351 }
343- events_data_lst_t ().swap (capture_fifo_events);
352+ event_serialized_lst_t ().swap (capture_fifo_events);
344353 last_events_t ().swap (capture_last_events);
345354
346355 capture = new capture_service (zctx, cache_max);
@@ -401,7 +410,7 @@ run_eventd_service()
401410 back_inserter (resp_data));
402411
403412 if (sz == VEC_SIZE (capture_fifo_events)) {
404- events_data_lst_t ().swap (capture_fifo_events);
413+ event_serialized_lst_t ().swap (capture_fifo_events);
405414 } else {
406415 capture_fifo_events.erase (capture_fifo_events.begin (), it);
407416 }
0 commit comments