Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/sonic-eventd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ EVENTD_TOOL := tools/events_tool
EVENTD_PUBLISH_TOOL := tools/events_publish_tool.py
RSYSLOG-PLUGIN_TARGET := rsyslog_plugin/rsyslog_plugin
RSYSLOG-PLUGIN_TEST := rsyslog_plugin_tests/tests
EVENTD_MONIT := tools/events_monit_test.py
EVENTD_MONIT_CONF := tools/monit_events

CP := cp
MKDIR := mkdir
CC := g++
Expand Down Expand Up @@ -69,15 +72,20 @@ rsyslog-plugin-tests: $(RSYSLOG-PLUGIN-TEST_OBJS)

install:
$(MKDIR) -p $(DESTDIR)/usr/sbin
$(MKDIR) -p $(DESTDIR)/usr/local/bin
$(MKDIR) -p $(DESTDIR)/etc/monit/conf.d
$(CP) $(EVENTD_TARGET) $(DESTDIR)/usr/sbin
$(CP) $(EVENTD_TOOL) $(DESTDIR)/usr/sbin
$(CP) $(EVENTD_PUBLISH_TOOL) $(DESTDIR)/usr/sbin
$(CP) $(RSYSLOG-PLUGIN_TARGET) $(DESTDIR)/usr/sbin
$(CP) $(RSYSLOG-PLUGIN_TARGET) $(DESTDIR)/usr/local/bin
$(CP) $(EVENTD_MONIT) $(DESTDIR)/usr/local/bin
$(CP) $(EVENTD_MONIT_CONF) $(DESTDIR)/etc/monit/conf.d

deinstall:
$(RM) $(DESTDIR)/usr/sbin/$(EVENTD_TARGET)
$(RM) $(DESTDIR)/usr/sbin/$(RSYSLOG-PLUGIN_TARGET)
$(RM) -rf $(DESTDIR)/usr/sbin
$(RM) -rf $(DESTDIR)/usr
$(RM) -rf $(DESTDIR)/etc

clean:
-@echo ' '
Expand Down
4 changes: 3 additions & 1 deletion src/sonic-eventd/debian/sonic-rsyslog-plugin.install
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
usr/sbin/ryslog_plugin
usr/local/bin/rsyslog_plugin
usr/local/bin/events_monit_test.py
etc/monit/conf.d/monit_events
235 changes: 235 additions & 0 deletions src/sonic-eventd/tools/events_monit_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
#! /usr/bin/python -u

from inspect import getframeinfo, stack
from swsscommon.swsscommon import events_init_publisher, event_publish, FieldValueMap
from swsscommon.swsscommon import event_receive_op_t, event_receive, events_init_subscriber
from swsscommon.swsscommon import events_deinit_subscriber, events_deinit_publisher
import argparse
import os
import threading
import time
import syslog
import uuid

chk_log_level = syslog.LOG_ERR

test_source = "sonic-host"
test_event_tag = "device-test-event"
test_event_key = "{}:{}".format(test_source, test_event_tag)
test_event_params = {
"sender": os.path.basename(__file__),
"reason": "monit periodic test",
"batch-id": str(uuid.uuid1()),
"index": "0"
}

# Async connection wait time in milliseconds.
ASYNC_CONN_WAIT = 300


# Thread results
rc_test_receive = -1


def _log_msg(lvl, pfx, msg):
if lvl <= chk_log_level:
caller = getframeinfo(stack()[2][0])
fmsg = "{}:{}:{}".format(caller.function, caller.lineno, msg)
print("{}: {}".format(pfx, fmsg))
syslog.syslog(lvl, fmsg)

def log_err(m):
_log_msg(syslog.LOG_ERR, "Err", m)


def log_info(m):
_log_msg(syslog.LOG_INFO, "Info", m)


def log_debug(m):
_log_msg(syslog.LOG_DEBUG, "Debug", m)


def map_dict_fvm(s, d):
for k, v in s.items():
d[k] = v


# Invoked in a separate thread
def test_receiver(event_obj, cnt):
global rc_test_receive

sh = events_init_subscriber()

# Sleep ASYNC_CONN_WAIT to ensure async connectivity is complete.
time.sleep(ASYNC_CONN_WAIT/1000)

exp_params = dict(test_event_params)

event_obj.set()
cnt_done = 0

for i in range(cnt):
p = event_receive_op_t()
rc = event_receive(sh, p)

if (rc != 0):
log_err("Failed to receive. {}/{} rc={}".format(i, cnt, rc))
break

if test_event_key != p.key:
log_err("key mismatch {} != {} {}/{}".format(test_event_key,
p.key, i, cnt))
break

exp_params["index"] = str(i)
rcv_params = {}
map_dict_fvm(p.params, rcv_params)

for k, v in exp_params.items():
if k in rcv_params:
if (rcv_params[k] != v):
log_err("key:{} exp:{} != exist:{}".format(
k, v, rcv_params[k]))
rc = -1
else:
log_err("key:{} is missing", k)
rc = -1

if (rc != 0):
log_err("params mismatch {}/{}".format(i,cnt))
break

if p.missed_cnt != 0:
log_err("Expect missed_cnt {} == 0 {}/{}".format(p.missed_cnt,i,cnt))
break

if p.publish_epoch_ms == 0:
log_err("Expect publish_epoch_ms != 0 {}/{}".format(i,cnt))
break

cnt_done += 1
log_debug("Received {}/{}".format(i+1, cnt))

if (cnt_done == cnt):
rc_test_receive = 0
else:
log_err("test receive abort {}/{}".format(cnt_done, cnt))

# wait for a max of 5 secs for main thread to clear the event.
tout = 5000
while(event_obj.is_set()):
# main thread yet to consume last set event
if tout > 0:
t_sleep = 100
time.sleep(t_sleep / 1000)
tout -= t_sleep
else:
log_err("test_receiver:Internal err: event not cleared by main")
break

event_obj.set()

events_deinit_subscriber(sh)


def publish_events(cnt):
rc = -1
ph = events_init_publisher(test_source)
if not ph:
log_err("Failed to get publisher handle")
return rc

# Sleep ASYNC_CONN_WAIT to ensure async connectivity is complete.
# Messages published before connection are silently dropped by ZMQ.
time.sleep(ASYNC_CONN_WAIT/1000)

pub_params = dict(test_event_params)

for i in range(cnt):
pd = FieldValueMap()
pub_params["index"] = str(i)
map_dict_fvm(pub_params, pd)

rc = event_publish(ph, test_event_tag, pd)
if (rc != 0):
log_err("Failed to publish. {}/{} rc={}".format(i, cnt, rc))
break
log_debug("published: {}/{}".format(i+1, cnt))

# Sleep ASYNC_CONN_WAIT to ensure publish complete, before closing channel.
time.sleep(ASYNC_CONN_WAIT/1000)

events_deinit_publisher(ph)

log_debug("publish_events Done. cnt={}".format(cnt))

return rc



def run_test(cnt):
global rc_test_receive

# Initialising event objects
event_sub = threading.Event()

# Start subscriber thread
thread_sub = threading.Thread(target=test_receiver, args=(event_sub, cnt))
thread_sub.start()

# Subscriber would wait for ASYNC_CONN_WAIT. Wait additional 200ms
# for signal from test_receiver as ready.
event_sub.wait((ASYNC_CONN_WAIT + 200)/1000)
event_sub.clear()

rc_pub = publish_events(cnt)
if (rc_pub != 0):
log_err("Failed in publish_events")
else:
# Wait for subscriber to complete with 1 sec timeout.
event_sub.wait(1)
if (rc_test_receive != 0):
log_err("Failed to receive events")

log_debug("run_test_DONE rc_pub={} rc_test_receive={}".format(
rc_pub, rc_test_receive))

if (rc_pub != 0):
return rc_pub

if (rc_test_receive == 0):
return rc_test_receive

return 0


def main():
global chk_log_level

parser=argparse.ArgumentParser(
description="Check events from publish to receive via gNMI")
parser.add_argument('-l', "--loglvl", default=syslog.LOG_ERR, type=int,
help="log level")
parser.add_argument('-n', "--cnt", default=5, type=int,
help="count of events to publish/receive")
args = parser.parse_args()

chk_log_level = args.loglvl
rc = run_test(args.cnt)

if(rc == 0):
log_info("eventd test succeeded")
else:
log_err("eventd monit test failed rc={}".format(rc))


if __name__ == "__main__":
main()







5 changes: 5 additions & 0 deletions src/sonic-eventd/tools/monit_events
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
###############################################################################
## Monit configuration for telemetry container
###############################################################################
check program container_eventd with path "/usr/local/bin/events_monit_test.py"
every 5 cycles