Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2603,9 +2603,13 @@ func TestAuthCapabilities(t *testing.T) {
}

func TestClient(t *testing.T) {
// sonic-host:device-test-event is a test event.
// Events client will drop it on floor.
//
events := [] sdc.Evt_rcvd {
{ "test0", 7, 777 },
{ "test1", 6, 677 },
{ "{\"sonic-host:device-test-event\"", 5, 577 },
{ "test2", 5, 577 },
{ "test3", 4, 477 },
}
Expand Down Expand Up @@ -2653,7 +2657,6 @@ func TestClient(t *testing.T) {

qstr := fmt.Sprintf("all[heartbeat=%d]", HEARTBEAT_SET)
q := createEventsQuery(t, qstr)
// q := createEventsQuery(t, "all")
q.Addrs = []string{"127.0.0.1:8081"}

tests := []struct {
Expand Down Expand Up @@ -2694,16 +2697,20 @@ func TestClient(t *testing.T) {
}()

// wait for half second for subscribeRequest to sync
// and to receive events via notification handler.
//
time.Sleep(time.Millisecond * 2000)

if len(events) != len(gotNoti) {
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events))
// -1 to discount test event, which receiver would drop.
//
if (len(events) - 1) != len(gotNoti) {
t.Errorf("noti[%d] != events[%d]", len(gotNoti), len(events)-1)
}

if (heartbeat != HEARTBEAT_SET) {
t.Errorf("Heartbeat is not set %d != expected:%d", heartbeat, HEARTBEAT_SET)
}
fmt.Printf("DONE: events:%d gotNoti=%d\n", len(events), len(gotNoti))
fmt.Printf("DONE: Expect events:%d - 1 gotNoti=%d\n", len(events), len(gotNoti))
})
time.Sleep(time.Millisecond * 1000)

Expand Down
49 changes: 37 additions & 12 deletions sonic_data_client/events_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import "C"

import (
"strconv"
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"time"
"unsafe"
Expand Down Expand Up @@ -45,15 +47,19 @@ const STATS_FIELD_NAME = "value"

const EVENTD_PUBLISHER_SOURCE = "{\"sonic-events-eventd"

const TEST_EVENT = "{\"sonic-host:device-test-event"

// Path parameter
const PARAM_HEARTBEAT = "heartbeat"
const PARAM_QSIZE = "qsize"

type EventClient struct {

prefix *gnmipb.Path
path *gnmipb.Path

q *queue.PriorityQueue
pq_max int
channel chan struct{}

wg *sync.WaitGroup // wait for all sub go routines to finish
Expand Down Expand Up @@ -87,6 +93,9 @@ func C_init_subs() unsafe.Pointer {
func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Client, error) {
var evtc EventClient
evtc.prefix = prefix
evtc.pq_max = PQ_MAX_SIZE
log.V(4).Infof("Events priority Q max set default = %v", evtc.pq_max)

for _, path := range paths {
// Only one path is expected. Take the last if many
evtc.path = path
Expand All @@ -100,7 +109,11 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Cl
log.V(7).Infof("evtc.heartbeat_interval is set to %d", val)
Set_heartbeat(val)
}
break
} else if (k == PARAM_QSIZE) {
if val, err := strconv.Atoi(v); err == nil {
evtc.pq_max = val
log.V(7).Infof("Events priority Q max set by qsize param = %v", evtc.pq_max)
}
}
}
}
Expand Down Expand Up @@ -279,18 +292,30 @@ func get_events(evtc *EventClient) {

if rc == 0 {
evtc.counters[MISSED] += (uint64)(evt.Missed_cnt)
qlen := evtc.q.Len()

if (qlen < PQ_MAX_SIZE) {
evtTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_StringVal {
StringVal: evt.Event_str,
}}
if err := send_event(evtc, evtTv, evt.Publish_epoch_ms); err != nil {
return

if ! strings.HasPrefix(evt.Event_str, TEST_EVENT) {
qlen := evtc.q.Len()

if (qlen < evtc.pq_max) {
var fvp map[string]interface{}
json.Unmarshal([]byte(evt.Event_str ), &fvp)

jv, err := json.Marshal(fvp)

if err == nil {
evtTv := &gnmipb.TypedValue {
Value: &gnmipb.TypedValue_JsonIetfVal {
JsonIetfVal: jv,
}}
if err := send_event(evtc, evtTv, evt.Publish_epoch_ms); err != nil {
return
}
} else {
log.V(1).Infof("Invalid event string: %v", evt.Event_str)
}
} else {
evtc.counters[DROPPED] += 1
}
} else {
evtc.counters[DROPPED] += 1
}
}
if evtc.stopped == 1 {
Expand Down