Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions afthelper/afthelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
// NextHopSummary provides a summary of an next-hop for a particular entry.
type NextHopSummary struct {
// Weight is the share of traffic that the next-hop gets.
Weight uint64
Weight uint64 `json:"weight"`
// Address is the IP address of the next-hop.
Address string
Address string `json:"address"`
// NetworkInstance is the network instance within which the address was resolved.
NetworkInstance string
NetworkInstance string `json:"network-instance"`
}

// NextHopAddrsForPrefix unrolls the prefix specified within the network-instance netInst from the
Expand Down
13 changes: 13 additions & 0 deletions afthelper/afthelper_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package afthelper

import (
Expand Down
2 changes: 1 addition & 1 deletion compliance/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func addIPv4Internal(addr string, t testing.TB, wantACK fluent.ProgrammingResult
c := fluent.NewClient()
ops := []func(){
func() {
c.Modify().AddEntry(t, fluent.NextHopEntry().WithNetworkInstance(server.DefaultNetworkInstanceName).WithIndex(1))
c.Modify().AddEntry(t, fluent.NextHopEntry().WithNetworkInstance(server.DefaultNetworkInstanceName).WithIndex(1).WithIPAddress("192.0.2.1"))
},
func() {
c.Modify().AddEntry(t, fluent.NextHopGroupEntry().WithNetworkInstance(server.DefaultNetworkInstanceName).WithID(42).AddNextHop(1, 1))
Expand Down
47 changes: 43 additions & 4 deletions device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (

log "github.com/golang/glog"
"github.com/openconfig/gribigo/aft"
"github.com/openconfig/gribigo/afthelper"
"github.com/openconfig/gribigo/constants"
"github.com/openconfig/gribigo/gnmit"
"github.com/openconfig/gribigo/ocrt"
"github.com/openconfig/gribigo/server"
"github.com/openconfig/gribigo/sysrib"
"github.com/openconfig/ygot/ygot"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/encoding/prototext"

gpb "github.com/openconfig/gnmi/proto/gnmi"
spb "github.com/openconfig/gribi/v1/proto/service"
Expand Down Expand Up @@ -138,7 +141,17 @@ func (*tlsCreds) isDevOpt() {}
func New(ctx context.Context, opts ...DevOpt) (*Device, error) {
d := &Device{}

if jcfg := optDeviceCfg(opts); jcfg != nil {
jcfg := optDeviceCfg(opts)
switch jcfg {
case nil:
dev := &ocrt.Device{}
dev.GetOrCreateNetworkInstance("DEFAULT").Type = ocrt.NetworkInstanceTypes_NETWORK_INSTANCE_TYPE_DEFAULT_INSTANCE
sr, err := sysrib.NewSysRIB(dev)
if err != nil {
return nil, fmt.Errorf("cannot build system RIB, %v", err)
}
d.sysRIB = sr
default:
sr, err := sysrib.NewSysRIBFromJSON(jcfg)
if err != nil {
return nil, fmt.Errorf("cannot build system RIB, %v", err)
Expand All @@ -154,11 +167,13 @@ func New(ctx context.Context, opts ...DevOpt) (*Device, error) {
case err != nil:
log.Errorf("invalid notifications, %v", err)
default:
go d.gnmiSrv.TargetUpdate(&gpb.SubscribeResponse{
u := &gpb.SubscribeResponse{
Response: &gpb.SubscribeResponse_Update{
Update: n,
},
})
}
log.V(2).Infof("sending gNMI Notification, %s", prototext.Format(u))
go d.gnmiSrv.TargetUpdate(u)
}

// server.WithFIBProgrammedCheck()
Expand All @@ -168,6 +183,27 @@ func New(ctx context.Context, opts ...DevOpt) (*Device, error) {
// here we just write to something that the server has access to.
}

ribAddfn := func(ribs map[string]*aft.RIB, optype constants.OpType, netinst, prefix string) {
if optype != constants.Add {
// TODO(robjs): handle replace and delete :-)
return
}
nhs, err := afthelper.NextHopAddrsForPrefix(ribs, netinst, prefix)
if err != nil {
log.Errorf("cannot add netinst:prefix %s:%s to the RIB, %v", netinst, prefix, err)
return
}
nhSum := []*afthelper.NextHopSummary{}
for _, nh := range nhs {
nhSum = append(nhSum, nh)
}

d.sysRIB.AddRoute(netinst, &sysrib.Route{
Prefix: prefix,
NextHops: nhSum,
})
}

gr := optGRIBIAddr(opts)
gn := optGNMIAddr(opts)

Expand All @@ -176,7 +212,10 @@ func New(ctx context.Context, opts ...DevOpt) (*Device, error) {
return nil, fmt.Errorf("must specific TLS credentials to start a server")
}

gRIBIStop, err := d.startgRIBI(ctx, gr.host, gr.port, creds, server.WithRIBHook(ribHookfn))
gRIBIStop, err := d.startgRIBI(ctx, gr.host, gr.port, creds,
server.WithPostChangeRIBHook(ribHookfn),
server.WithRIBResolvedEntryHook(ribAddfn),
)
if err != nil {
return nil, fmt.Errorf("cannot start gRIBI server, %v", err)
}
Expand Down
70 changes: 68 additions & 2 deletions device/device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,43 @@ package device

import (
"context"
"encoding/json"
"fmt"
"net"
"testing"

log "github.com/golang/glog"
"github.com/google/go-cmp/cmp"
"github.com/openconfig/gribigo/compliance"
"github.com/openconfig/gribigo/ocrt"
"github.com/openconfig/gribigo/sysrib"
"github.com/openconfig/gribigo/testcommon"
"github.com/openconfig/ygot/ygot"
)

type ribQuery struct {
NetworkInstance string
Prefix *net.IPNet
}

func jsonDevice() []byte {
d := &ocrt.Device{}
d.GetOrCreateNetworkInstance("DEFAULT").Type = ocrt.NetworkInstanceTypes_NETWORK_INSTANCE_TYPE_DEFAULT_INSTANCE
d.GetOrCreateInterface("eth0").GetOrCreateSubinterface(1).GetOrCreateIpv4().GetOrCreateAddress("192.0.2.1").PrefixLength = ygot.Uint8(31)

j, err := ygot.Marshal7951(d, nil)
if err != nil {
panic(fmt.Sprintf("cannot create JSON, %v", err))
}
return j
}

func TestDevice(t *testing.T) {
devCh := make(chan string, 1)
errCh := make(chan error, 1)
ribCh := make(chan *ribQuery, 1)
ribErrCh := make(chan error)
ribResultCh := make(chan []*sysrib.Interface, 1)

creds, err := TLSCredsFromFile(testcommon.TLSCreds())
if err != nil {
Expand All @@ -34,17 +62,55 @@ func TestDevice(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
d, err := New(ctx, creds)
d, err := New(ctx, creds, DeviceConfig(jsonDevice()))
if err != nil {
errCh <- err
}
devCh <- d.GRIBIAddr()
<-ctx.Done()

for {
select {
case qryPfx := <-ribCh:
ints, err := d.sysRIB.EgressInterface(qryPfx.NetworkInstance, qryPfx.Prefix)
if err != nil {
ribErrCh <- err
}
ribResultCh <- ints
case <-ctx.Done():
return
}
}
}()
select {
case err := <-errCh:
t.Fatalf("got unexpected error from device, got: %v", err)
case addr := <-devCh:
compliance.AddIPv4EntryRIBACK(addr, t)

_, cidr, err := net.ParseCIDR("1.1.1.1/32")
if err != nil {
t.Fatalf("cannot parse CIDR for destination, err: %v", err)
}

ribCh <- &ribQuery{NetworkInstance: "DEFAULT", Prefix: cidr}
select {
case err := <-ribErrCh:
t.Fatalf("cannot run RIB query, gotErr: %v", err)
case got := <-ribResultCh:
js, err := json.MarshalIndent(got, "", " ")
if err != nil {
t.Fatalf("cannot marshal JSON response, %v", err)
}
log.Infof("got egress interface, %s", js)

want := []*sysrib.Interface{{
Name: "eth0",
Subinterface: 1,
}}

if diff := cmp.Diff(got, want); diff != "" {
t.Fatalf("did not get expected egress interface, diff(-got,+want):\n%s", diff)
}
}
}
}
17 changes: 17 additions & 0 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,23 @@ func (n *nextHopEntry) WithNetworkInstance(ni string) *nextHopEntry {
return n
}

// WithIPAddress specifies an IP address to be used for the next-hop. The IP
// address is resolved within the network instance specified by WithNextHopNetworkInstance.
func (n *nextHopEntry) WithIPAddress(addr string) *nextHopEntry {
n.pb.NextHop.IpAddress = &wpb.StringValue{Value: addr}
return n
}

// WithNextHopNetworkInstance specifies the network instance within which the next-hop
// should be resolved. If it is not specified, the next-hop is resolved in the network
// instance that the next-hop is installed in. If no other parameters are specified, the
// lookup uses the input packet within the specified network instance to determine the
// next-hop.
func (n *nextHopEntry) WithNextHopNetworkInstance(ni string) *nextHopEntry {
n.pb.NextHop.NetworkInstance = &wpb.StringValue{Value: ni}
return n
}

// TODO(robjs): add additional NextHopEntry fields.

// opproto implements the gRIBIEntry interface, returning a gRIBI AFTOperation. ID
Expand Down
2 changes: 1 addition & 1 deletion gnmit/gnmit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func TestSTREAM(t *testing.T) {
wg.Wait()
for _, s := range got {
if s.T == SYNC {
if len(seenVal) < 1 || meta != 2 { // seen hello, meta/sync, meta/connected
if len(seenVal) < 1 || meta < 1 { // seen hello, may see meta/sync, meta/connected
t.Fatalf("did not get expected set of updates from client before sync, got: %d %s, want: 3 values, sync (updates %v, meta = %d)", len(got), got, seenVal, meta)
}
}
Expand Down
75 changes: 73 additions & 2 deletions rib/rib.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ var unixTS = time.Now().UnixNano
// - a ygot.GoStruct containing the entry that has been changed.
type RIBHookFn func(constants.OpType, int64, string, ygot.GoStruct)

// ResolvedEntryFn is a function that is called for all entries that can be fully
// resolved within the RIB. Fully resolved in this case is defined as an input
// packet match criteria set of next-hops.
//
// It takes arguments of:
// - the set of RIBs that were stored in the RIB as a map keyed by the name of
// a network instance, with a RIB represented as a ygot-generated AFT struct.
// - the prefix that was impacted.
// - the OpType that the entry was subject to (add/replace/delete).
// - a string indicating the network instance that the operation was within
// - a string indicating the prefix that was impacted
type ResolvedEntryFn func(ribs map[string]*aft.RIB, optype constants.OpType, netinst, prefix string)

// RIBHolderCheckFunc is a function that is used as a check to determine whether
// a RIB entry is eligible for a particular operation. It takes arguments of:
// - the operation type that is being performed.
Expand Down Expand Up @@ -97,6 +110,11 @@ type RIB struct {
// same AddXXX methods can be used along with network instance the operation
// referred to. The map is keyed by the operation ID.
pendingEntries map[uint64]*pendingEntry

// resolvedEntryHook is a function that is called for all entries that
// can be fully resolved in the RIB. In the current implementation it
// is called only for IPv4 entries.
resolvedEntryHook ResolvedEntryFn
}

// RIBHolder is a container for a set of RIBs.
Expand Down Expand Up @@ -237,14 +255,22 @@ type pendingEntry struct {
op *spb.AFTOperation
}

// SetHook assigns the supplied hook to all network instance RIBs within
// SetPostChangeHook assigns the supplied hook to all network instance RIBs within
// the RIB structure.
func (r *RIB) SetHook(fn RIBHookFn) {
func (r *RIB) SetPostChangeHook(fn RIBHookFn) {
for _, nir := range r.niRIB {
nir.mu.Lock()
nir.postChangeHook = fn
nir.mu.Unlock()
}
}

// SetResolvedEntryHook asssigns the supplied hook to all network instance RIBs within
// the RIB structure.
func (r *RIB) SetResolvedEntryHook(fn ResolvedEntryFn) {
r.resolvedEntryHook = fn
}

// NetworkInstanceRIB returns the RIB for the network instance with name s.
func (r *RIB) NetworkInstanceRIB(s string) (*RIBHolder, bool) {
r.nrMu.RLock()
Expand Down Expand Up @@ -429,6 +455,14 @@ func (r *RIB) addEntryInternal(ni string, op *spb.AFTOperation, oks, fails *[]*O
ID: op.GetId(),
Op: op,
})

// call the resolved entry hook if this was an IPv4 prefix.
if v4Prefix != "" {
if err := r.callResolvedEntryHook(constants.Add, ni, v4Prefix); err != nil {
return fmt.Errorf("cannot run resolvedEntyHook, %v", err)
}
}

// we may now have made some other pending entry be possible to install,
// so try them all out!
for _, e := range r.getPending() {
Expand All @@ -447,6 +481,40 @@ func (r *RIB) addEntryInternal(ni string, op *spb.AFTOperation, oks, fails *[]*O
return nil
}

// callResolvedEntryHook calls the resolvedEntryHook based on the operation optype on
// prefix prefix within the network instance netinst occurring. It returns an error
// if the hook cannot be called. Any error from the hook must be handled externally.
func (r *RIB) callResolvedEntryHook(optype constants.OpType, netinst, prefix string) error {
if r.resolvedEntryHook == nil {
return nil
}

ribs, err := r.copyRIBs()
if err != nil {
return err
}
go r.resolvedEntryHook(ribs, optype, netinst, prefix)
return nil
}

// copyRIBs returns a map, keyed by network instance name, with the value of the ygot-generated
// AFT struct, of the set of RIBs stored by the instance r. A DeepCopy of the RIBs is returned,
// along with an error that indicates whether the entries could be copied.
func (r *RIB) copyRIBs() (map[string]*aft.RIB, error) {
rib := map[string]*aft.RIB{}
for name, niR := range r.niRIB {
// this is likely expensive on very large RIBs, but with today's implementatiom
// it seems acceptable, since we then allow the caller not to have to figure out
// any locking since they have their own RIB to work on.
dupRIB, err := ygot.DeepCopy(niR.r)
if err != nil {
return nil, fmt.Errorf("cannot copy RIB for NI %s, %v", name, err)
}
rib[name] = dupRIB.(*aft.RIB)
}
return rib, nil
}

// DeleteEntry removes the entry specified by op from the network instance ni.
func (r *RIB) DeleteEntry(ni string, op *spb.AFTOperation) ([]*OpResult, []*OpResult, error) {
niR, ok := r.NetworkInstanceRIB(ni)
Expand Down Expand Up @@ -476,6 +544,9 @@ func (r *RIB) DeleteEntry(ni string, op *spb.AFTOperation) ([]*OpResult, []*OpRe
return nil, nil, status.Newf(codes.Unimplemented, "unsupported AFT operation type %T", t).Err()
}

// TODO(robjs): after merging chain --> #60, then make sure that the hook
// is called for deletes of IPv4 prefixes to clean up.

switch {
case err != nil:
fails = append(fails, &OpResult{
Expand Down
Loading