This repository was archived by the owner on Aug 31, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 96
Expand file tree
/
Copy pathclient_subscribe.go
More file actions
271 lines (238 loc) · 7.08 KB
/
client_subscribe.go
File metadata and controls
271 lines (238 loc) · 7.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package gnmi
import (
"fmt"
"io"
"net"
"sync"
"runtime"
"runtime/debug"
"github.com/Workiva/go-datastructures/queue"
log "github.com/golang/glog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
sdc "github.com/Azure/sonic-telemetry/sonic_data_client"
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
)
// Client contains information about a subscribe client that has connected to the server.
type Client struct {
addr net.Addr
sendMsg int64
recvMsg int64
errors int64
polled chan struct{}
stop chan struct{}
once chan struct{}
mu sync.RWMutex
q *queue.PriorityQueue
subscribe *gnmipb.SubscriptionList
// Wait for all sub go routine to finish
w sync.WaitGroup
fatal bool
}
// NewClient returns a new initialized client.
func NewClient(addr net.Addr) *Client {
pq := queue.NewPriorityQueue(1, false)
return &Client{
addr: addr,
q: pq,
}
}
// String returns the target the client is querying.
func (c *Client) String() string {
return c.addr.String()
}
// Populate SONiC data path from prefix and subscription path.
func (c *Client) populateDbPathSubscrition(sublist *gnmipb.SubscriptionList) ([]*gnmipb.Path, error) {
var paths []*gnmipb.Path
prefix := sublist.GetPrefix()
log.V(6).Infof("prefix : %#v SubscribRequest : %#v", prefix, sublist)
subscriptions := sublist.GetSubscription()
if subscriptions == nil {
return nil, fmt.Errorf("No Subscription")
}
for _, subscription := range subscriptions {
path := subscription.GetPath()
paths = append(paths, path)
}
log.V(6).Infof("gnmi Paths : %v", paths)
return paths, nil
}
// Run starts the subscribe client. The first message received must be a
// SubscriptionList. Once the client is started, it will run until the stream
// is closed or the schedule completes. For Poll queries the Run will block
// internally after sync until a Poll request is made to the server.
func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
defer log.V(1).Infof("Client %s shutdown", c)
ctx := stream.Context()
if stream == nil {
return grpc.Errorf(codes.FailedPrecondition, "cannot start client: stream is nil")
}
defer func() {
if err != nil {
c.errors++
}
log.V(2).Infof("Client %s exiting", c)
}()
query, err := stream.Recv()
c.recvMsg++
if err != nil {
if err == io.EOF {
return grpc.Errorf(codes.Aborted, "stream EOF received before init")
}
return grpc.Errorf(grpc.Code(err), "received error from client")
}
log.V(2).Infof("Client %s recieved initial query %v", c, query)
c.subscribe = query.GetSubscribe()
extensions := query.GetExtension()
if c.subscribe == nil {
return grpc.Errorf(codes.InvalidArgument, "first message must be SubscriptionList: %q", query)
}
var target string
prefix := c.subscribe.GetPrefix()
if prefix == nil {
return grpc.Errorf(codes.Unimplemented, "No target specified in prefix")
} else {
target = prefix.GetTarget()
// TODO: add data client support for fetching non-db data
if target == "" {
return grpc.Errorf(codes.Unimplemented, "Empty target data not supported yet")
}
}
paths, err := c.populateDbPathSubscrition(c.subscribe)
if err != nil {
return grpc.Errorf(codes.NotFound, "Invalid subscription path: %v %q", err, query)
}
var dc sdc.Client
if target == "OTHERS" {
dc, err = sdc.NewNonDbClient(paths, prefix)
} else if isTargetDb(target) == true {
dc, err = sdc.NewDbClient(paths, prefix)
} else {
/* For any other target or no target create new Transl Client. */
dc, err = sdc.NewTranslClient(prefix, paths, ctx, extensions)
}
if err != nil {
return grpc.Errorf(codes.NotFound, "%v", err)
}
switch mode := c.subscribe.GetMode(); mode {
case gnmipb.SubscriptionList_STREAM:
c.stop = make(chan struct{}, 1)
c.w.Add(1)
go dc.StreamRun(c.q, c.stop, &c.w, c.subscribe)
case gnmipb.SubscriptionList_POLL:
c.polled = make(chan struct{}, 1)
c.polled <- struct{}{}
c.w.Add(1)
go dc.PollRun(c.q, c.polled, &c.w, c.subscribe)
case gnmipb.SubscriptionList_ONCE:
c.once = make(chan struct{}, 1)
c.once <- struct{}{}
c.w.Add(1)
go dc.OnceRun(c.q, c.once, &c.w, c.subscribe)
default:
return grpc.Errorf(codes.InvalidArgument, "Unkown subscription mode: %q", query)
}
log.V(1).Infof("Client %s running", c)
go c.recv(stream)
err = c.send(stream)
c.Close()
// Wait until all child go routines exited
c.w.Wait()
return grpc.Errorf(codes.InvalidArgument, "%s", err)
}
// Closing of client queue is triggered upon end of stream receive or stream error
// or fatal error of any client go routine .
// it will cause cancle of client context and exit of the send goroutines.
func (c *Client) Close() {
c.mu.Lock()
defer c.mu.Unlock()
log.V(1).Infof("Client %s Close, sendMsg %v recvMsg %v errors %v", c, c.sendMsg, c.recvMsg, c.errors)
if c.q != nil {
if c.q.Disposed() {
return
}
c.q.Dispose()
}
if c.stop != nil {
close(c.stop)
}
if c.polled != nil {
close(c.polled)
}
if c.once != nil {
close(c.once)
}
}
func (c *Client) recv(stream gnmipb.GNMI_SubscribeServer) {
defer c.Close()
for {
log.V(5).Infof("Client %s blocking on stream.Recv()", c)
event, err := stream.Recv()
c.recvMsg++
switch err {
default:
log.V(1).Infof("Client %s received error: %v", c, err)
return
case io.EOF:
log.V(1).Infof("Client %s received io.EOF", c)
if c.subscribe.Mode == gnmipb.SubscriptionList_STREAM {
// The client->server could be closed after the sending the subscription list.
// EOF is not a indication of client is not listening.
// Instead stream.Context() which is signaled once the underlying connection is terminated.
log.V(1).Infof("Waiting for client '%s'", c)
// This context is done when the client connection is terminated.
<-stream.Context().Done()
log.V(1).Infof("Client is done '%s'", c)
}
return
case nil:
}
if c.subscribe.Mode == gnmipb.SubscriptionList_POLL {
log.V(3).Infof("Client %s received Poll event: %v", c, event)
if _, ok := event.Request.(*gnmipb.SubscribeRequest_Poll); !ok {
return
}
c.polled <- struct{}{}
continue
}
log.V(1).Infof("Client %s received invalid event: %s", c, event)
}
log.V(1).Infof("Client %s exit from recv()", c)
}
// send runs until process Queue returns an error.
func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error {
for {
items, err := c.q.Get(1)
if items == nil {
log.V(1).Infof("%v", err)
return err
}
if err != nil {
c.errors++
log.V(1).Infof("%v", err)
return fmt.Errorf("unexpected queue Gext(1): %v", err)
}
var resp *gnmipb.SubscribeResponse
switch v := items[0].(type) {
case sdc.Value:
if resp, err = sdc.ValToResp(v); err != nil {
c.errors++
return err
}
default:
log.V(1).Infof("Unknown data type %v for %s in queue", items[0], c)
c.errors++
}
c.sendMsg++
err = stream.Send(resp)
if err != nil {
log.V(1).Infof("Client %s sending error:%v", c, err)
c.errors++
return err
}
log.V(5).Infof("Client %s done sending, msg count %d, msg %v", c, c.sendMsg, resp)
debug.FreeOSMemory()
n := runtime.NumGoroutine()
log.V(1).Infof("Force mem release; numRoutine=%v", n)
}
}