Skip to content
81 changes: 75 additions & 6 deletions ygnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,45 @@ import (

// subscribe create a gNMI SubscribeClient for the given query.
func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.SubscriptionList_Mode, o *opt) (_ gpb.GNMI_SubscribeClient, rerr error) {
var queryPaths []*gpb.Path
var subs []*gpb.Subscription
for _, path := range q.subPaths() {
path, err := resolvePath(path)
if err != nil {
return nil, err
}
queryPaths = append(queryPaths, path)
}
if len(queryPaths) > 0 && o.ft != nil {
if !q.isLeaf() {
return nil, fmt.Errorf("functional translators only support leaf queries, given %+v", queryPaths)
}
if len(queryPaths) != 1 {
// This should never happen because leaf queries should only have one path. (Batch queries with multiple leaf paths still don't have isLeaf == true)
return nil, fmt.Errorf("functional translators only support one query path, given %d paths", len(queryPaths))
}
// Convert the query path to a schema path by stripping keys because output paths provided to FT
// OutputToInput() for subscription translation must be schema paths. This is expected to cause
// the subscription to return more data than just the requested key, but we'll filter the output
// in receive() to only include paths matching the keys we actually queried.
schemaPath := proto.Clone(queryPaths[0]).(*gpb.Path)
for _, elem := range schemaPath.Elem {
elem.Key = nil
}
match, inputs, err := o.ft.OutputToInput(schemaPath)
if err != nil {
log.ErrorContextf(ctx, "Received error from FunctionalTranslator.OutputToInput(): %v", err)
return nil, err
}
if !match {
return nil, fmt.Errorf("FunctionalTranslator.OutputToInput() did not match on path: %s", prototext.Format(schemaPath))
}

log.V(2).InfoContextf(ctx, "FunctionalTranslator.OutputToInput() mapped original query path %s to actual subscription paths: %+v", prototext.Format(queryPaths[0]), inputs)
queryPaths = inputs
}

for _, path := range queryPaths {
subs = append(subs, &gpb.Subscription{
Path: &gpb.Path{
Elem: path.GetElem(),
Expand Down Expand Up @@ -93,7 +126,7 @@ func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.Su
}
defer closer.Close(&rerr, sub.CloseSend, "error closing gNMI send stream")
if !o.useGet {
log.V(c.requestLogLevel).Info(prototext.Format(sr))
log.V(c.requestLogLevel).InfoContext(ctx, prototext.Format(sr))
}
if err := sub.Send(sr); err != nil {
// If the server closes the RPC with an error, the real error may only be visible on Recv.
Expand Down Expand Up @@ -165,13 +198,27 @@ func (gs *getSubscriber) CloseSend() error {
// the data is returned as-is and the second return value is true. If Delete paths are present in
// the update, they are appended to the given data before the Update values. If deletesExpected
// is false, however, any deletes received will cause an error.
func receive(sub gpb.GNMI_SubscribeClient, data []*DataPoint, deletesExpected bool) ([]*DataPoint, bool, error) {
func receive(sub gpb.GNMI_SubscribeClient, data []*DataPoint, deletesExpected bool, queryPath *gpb.Path, o *opt) ([]*DataPoint, bool, error) {
res, err := sub.Recv()
if err != nil {
return data, false, err
}
recvTS := time.Now()

if o.ft != nil {
out, err := o.ft.Translate(res)
if err != nil {
log.Errorf("FunctionalTranslator.Translate() failed to translate notification: %v", err)
return data, false, nil
}
if out == nil {
log.V(2).Infof("Received nil response from functional translatator with input: %s", prototext.Format(res))
return data, false, nil
}
log.V(2).Infof("FT successfully translated a notification. input: %s, output: %s", prototext.Format(res), prototext.Format(out))
res = out
}

switch v := res.Response.(type) {
case *gpb.SubscribeResponse_Update:
n := v.Update
Expand Down Expand Up @@ -205,6 +252,14 @@ func receive(sub gpb.GNMI_SubscribeClient, data []*DataPoint, deletesExpected bo
return data, false, err
}
log.V(2).Infof("Constructed datapoint for delete: %v", dp)
// Filter out paths that don't match the query here as a workaround for the edge case where we
// query a path including a specific key and the FT subscribes to more data than just that.
// This extra data would otherwise be filtered out downstream but with a compliance error.
// This uses the same logic that unmarshal(...) in unmarshal.go uses to check compliance.
if o.ft != nil && !util.PathMatchesQuery(dp.Path, queryPath) {
log.V(2).Infof("Skipping delete datapoint that doesn't match the query. path: %s, query: %s", prototext.Format(dp.Path), prototext.Format(queryPath))
continue
}
data = append(data, dp)
}
for _, u := range n.GetUpdate() {
Expand All @@ -220,6 +275,10 @@ func receive(sub gpb.GNMI_SubscribeClient, data []*DataPoint, deletesExpected bo
return data, false, err
}
log.V(2).Infof("Constructed datapoint for update: %v", dp)
if o.ft != nil && !util.PathMatchesQuery(dp.Path, queryPath) {
log.V(2).Infof("Skipping update datapoint that doesn't match the query. path: %s, query: %s", prototext.Format(dp.Path), prototext.Format(queryPath))
continue
}
data = append(data, dp)
}
return data, false, nil
Expand All @@ -237,10 +296,14 @@ func receive(sub gpb.GNMI_SubscribeClient, data []*DataPoint, deletesExpected bo

// receiveAll receives data until the context deadline is reached, or when a sync response is received.
// This func is only used when receiving data from a ONCE subscription.
func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool) (data []*DataPoint, err error) {
func receiveAll[T any](sub gpb.GNMI_SubscribeClient, deletesExpected bool, query AnyQuery[T], o *opt) (data []*DataPoint, err error) {
queryPath, err := resolvePath(query.PathStruct())
if err != nil {
return nil, fmt.Errorf("failed to resolve path: %w", err)
}
for {
var sync bool
data, sync, err = receive(sub, data, deletesExpected)
data, sync, err = receive(sub, data, deletesExpected, queryPath, o)
if err != nil {
if err == io.EOF {
// TODO(wenbli): It is unclear whether "subscribe ONCE stream closed without sync_response"
Expand All @@ -266,7 +329,7 @@ func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool) (data []*Dat
// Note: this does not imply that mode is gpb.SubscriptionList_STREAM (though it usually is).
// If the query is a leaf, each datapoint will be sent the chan individually.
// If the query is a non-leaf, all the datapoints from a SubscriptionResponse are bundled.
func receiveStream[T any](ctx context.Context, sub gpb.GNMI_SubscribeClient, query AnyQuery[T]) (<-chan []*DataPoint, <-chan error) {
func receiveStream[T any](ctx context.Context, sub gpb.GNMI_SubscribeClient, query AnyQuery[T], o *opt) (<-chan []*DataPoint, <-chan error) {
dataCh := make(chan []*DataPoint)
errCh := make(chan error)

Expand All @@ -278,8 +341,14 @@ func receiveStream[T any](ctx context.Context, sub gpb.GNMI_SubscribeClient, que
var hasSynced bool
var sync bool
var err error

queryPath, err := resolvePath(query.PathStruct())
if err != nil {
errCh <- fmt.Errorf("failed to resolve path: %w", err)
return
}
for {
recvData, sync, err = receive(sub, recvData, true)
recvData, sync, err = receive(sub, recvData, true, queryPath, o)
if err != nil {
// In the case that the context is cancelled, the reader of errCh
// may have gone away. In order to avoid this goroutine blocking
Expand Down
107 changes: 102 additions & 5 deletions ygnmi/ygnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,60 @@ type Option func(*opt)
// ValidateFn is a function that validates the datapoint.
type ValidateFn func(*DataPoint) error

// FunctionalTranslator objects translate gNMI subscriptions and notifications to address
// deviations in OpenConfig compliance.
//
// A functional translator is responsible for:
// - Determining the necessary subscription(s) to support a specific set of *output* leaf paths
// (using OutputToInput). This is done via mapping desired OpenConfig paths to the
// corresponding vendor-native paths or alternative OpenConfig paths.
// - Translating notifications from those subscriptions into the requested target paths.
//
// Example: Consider a functional translator, laserTranslator, designed to handle temperature
// alerting threshold paths. The vendor-native paths have separate paths for severity ("critical" or
// "warning"), while the OpenConfig schema models severity as a key within a single path.
//
// Output: /components/component/transceiver/thresholds/threshold/state/module-temperature-upper
// - Input: /vendor/native/transceivers/critical/upper
// - Input: /vendor/native/transceivers/warning/upper
// Output: /components/component/transceiver/thresholds/threshold/state/module-temperature-lower
// - Input: /vendor/native/transceivers/critical/lower
// - Input: /vendor/native/transceivers/warning/lower
//
// Example OutputToInput invocation:
//
// laserTranslator.OutputToInput(/components/component/transceiver/thresholds/threshold/state/module-temperature-lower) returns:
// - match: true
// - inputs: [/vendor/native/transceivers/critical/lower, /vendor/native/transceivers/warning/lower]
//
// Example Translate invocation:
//
// laserTranslator.Translate([
// /vendor/native/transceivers[name=Optics0/0/0/0]/critical/lower: 10,
// /vendor/native/transceivers[name=Optics0/0/0/0]/warning/lower: 50,
// ]) returns:
// - /components/component[name=Optics0/0/0/0]/transceiver/thresholds/threshold[severity=critical]/state/module-temperature-lower: 10
// - /components/component[name=Optics0/0/0/0]/transceiver/thresholds/threshold[severity=warning]/state/module-temperature-lower: 50
type FunctionalTranslator interface {
// Translate translates gNMI notifications from vendor-native paths or alternative
// OpenConfig paths to schema compliant OpenConfig paths.
//
// - It must support translation for updates.
// - It may optionally support translation for deletes; if delete translation is not
// supported, deletes received must be silently ignored.
// - Paths in the notification that are not within the scope of the functional
// translator must be silently ignored.
Translate(*gpb.SubscribeResponse) (*gpb.SubscribeResponse, error)

// OutputToInput returns the input subscription path(s) required by Translate for the
// desired output path.
//
// - The "match" value provided in the return indicates whether the requested output path
// is in scope for translation by this translator. match == (len(inputs) > 0 && rerr == nil)
// - The output path must be a schema path without any keys specified, including wildcards.
OutputToInput(output *gpb.Path) (match bool, inputs []*gpb.Path, rerr error)
}

type opt struct {
useGet bool
mode gpb.SubscriptionMode
Expand All @@ -204,6 +258,7 @@ type opt struct {
setFallback bool
sampleInterval uint64
datapointValidator ValidateFn
ft FunctionalTranslator
}

// resolveOpts applies all the options and returns a struct containing the result.
Expand Down Expand Up @@ -278,14 +333,56 @@ func WithDatapointValidator(fn ValidateFn) Option {
}
}

// WithFT creates an option to set a functional translator that intercepts and translates gNMI
// subscriptions and notifications.
//
// Functional translator output paths must be leaf paths, so ygnmi methods will throw an error if
// the path is not a leaf path.
//
// To illustrate, consider a functional translator laserTranslator designed to handle temperature
// alerting threshold paths. The vendor-native paths have separate paths for severity ("critical" or
// "warning"), while the OpenConfig schema models severity as a key within a single path.
//
// Leaf path, supported:
// ygnmi.Lookup(ctx, c, gnmi.OC().Components().Component().Transceiver().Thresholds().Threshold().ModuleTemperatureUpper().State(), WithFT(laserTranslator))
// Non-leaf path, not supported, returns an error:
// ygnmi.Lookup(ctx, c, gnmi.OC().Components().Component().Transceiver().Thresholds().Threshold().State(), WithFT(laserTranslator))
//
// Functional translator subscription translation (OutputToInput) output paths must be schema paths,
// without any keys specified. ygnmi still supports querying with a key, but this works by
// subscribing to all keys and filtering the response. This may result in O(n) subscription
// performance when O(1) is expected.
//
// No keys, supported:
// ygnmi.Lookup(ctx, c, gnmi.OC().Components().Component().Transceiver().Thresholds().Threshold().ModuleTemperatureUpper().State(), WithFT(laserTranslator))
// Actual subscription: /vendor/native/transceivers/critical/upper
// /vendor/native/transceivers/warning/upper
// With key, supported but we subscribe to all keys internally:
// ygnmi.Lookup(ctx, c, gnmi.OC().Components().Component("Optics0/0/0/0").Transceiver().Thresholds().Threshold("critical").ModuleTemperatureUpper().State(), WithFT(laserTranslator))
// Actual subscription: /vendor/native/transceivers/critical/upper
// /vendor/native/transceivers/warning/upper
// Note that the subscription includes all tranceivers as in the previous example.
// ygnmi.Lookup still behaves as expected and only returns data for Optics0/0/0/0.
//
// ygnmi methods will return an error if the queried path doesn't match the paths in scope for the
// functional translator according to the OutputToInput method.
//
// Path that doesn't match the translator, not supported, returns an error:
// ygnmi.Lookup(ctx, c, gnmi.OC().System().BootTime().State(), WithFT(laserTranslator))
func WithFT(ft FunctionalTranslator) Option {
return func(o *opt) {
o.ft = ft
}
}

// Lookup fetches the value of a SingletonQuery with a ONCE subscription.
func Lookup[T any](ctx context.Context, c *Client, q SingletonQuery[T], opts ...Option) (*Value[T], error) {
resolvedOpts := resolveOpts(opts)
sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_ONCE, resolvedOpts)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to path: %w", err)
}
data, err := receiveAll(sub, false)
data, err := receiveAll(sub, false, q, resolvedOpts)
if err != nil {
return nil, fmt.Errorf("failed to receive to data: %w", err)
}
Expand Down Expand Up @@ -363,7 +460,7 @@ func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func
return w
}

dataCh, errCh := receiveStream[T](ctx, sub, q)
dataCh, errCh := receiveStream[T](ctx, sub, q, resolvedOpts)
go func() {
defer cancel()
// Create an intially empty GoStruct, into which all received datapoints will be unmarshalled.
Expand Down Expand Up @@ -448,7 +545,7 @@ func LookupAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], opts .
if err != nil {
return nil, fmt.Errorf("failed to subscribe to path: %w", err)
}
data, err := receiveAll(sub, false)
data, err := receiveAll(sub, false, q, resolvedOpts)
if err != nil {
return nil, fmt.Errorf("failed to receive to data: %w", err)
}
Expand Down Expand Up @@ -524,7 +621,7 @@ func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred fu
return w
}

dataCh, errCh := receiveStream[T](ctx, sub, q)
dataCh, errCh := receiveStream[T](ctx, sub, q, resolvedOpts)
go func() {
defer cancel()
// Create a map intially empty GoStruct, into which all received datapoints will be unmarshalled based on their path prefixes.
Expand Down Expand Up @@ -942,7 +1039,7 @@ func (r *Reconciler[T]) Start(ctx context.Context, fn func(cfg *Value[T], state
return
}

dataCh, errCh := receiveStream(ctx, sub, r.rootCfg)
dataCh, errCh := receiveStream(ctx, sub, r.rootCfg, resolvedOpts)
go func() {
defer cancel()
// Create an intially empty GoStruct, into which all received datapoints will be unmarshalled.
Expand Down
Loading
Loading