Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
Expand Down
199 changes: 187 additions & 12 deletions rib/rib.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rib
import (
"errors"
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -59,6 +60,8 @@ type RIB struct {

// defaultName is the name assigned to the default network instance.
defaultName string
// ribCheck indicates whether this RIB is running the RIB check function.
ribCheck bool

// TODO(robjs): reference count NHGs and NHs across all AFTs to ensure that we
// don't allow entries to be deleted that are in use.
Expand Down Expand Up @@ -155,12 +158,13 @@ func New(dn string, opt ...RIBOpt) *RIB {
}

rhOpt := []ribHolderOpt{}
if !hasDisableCheckFn(opt) {
checkRIB := !hasDisableCheckFn(opt)
if checkRIB {
rhOpt = append(rhOpt, RIBHolderCheckFn(r.canResolve))
}
r.ribCheck = checkRIB

defRH := NewRIBHolder(dn, rhOpt...)
r.niRIB[dn] = defRH
r.niRIB[dn] = NewRIBHolder(dn, rhOpt...)

return r
}
Expand Down Expand Up @@ -190,6 +194,39 @@ func (r *RIB) NetworkInstanceRIB(s string) (*RIBHolder, bool) {
return rh, ok
}

// AddNetworkInstance adds a new network instance with the specified name
// to the RIB.
func (r *RIB) AddNetworkInstance(name string) error {
r.nrMu.Lock()
defer r.nrMu.Unlock()

if r.niRIB[name] != nil {
return fmt.Errorf("RIB %s already exists", name)
}

rhOpt := []ribHolderOpt{}
if r.ribCheck {
rhOpt = append(rhOpt, RIBHolderCheckFn(r.canResolve))
}

r.niRIB[name] = NewRIBHolder(name, rhOpt...)
return nil
}

// KnownNetworkInstances returns the name of all known network instances
// within the RIB.
func (r *RIB) KnownNetworkInstances() []string {
r.nrMu.RLock()
defer r.nrMu.RUnlock()
names := []string{}
for n := range r.niRIB {
names = append(names, n)
}
// return the RIB names in a stable order.
sort.Strings(names)
return names
}

// OpResult contains the result of an operation (Add, Modify, Delete).
type OpResult struct {
// ID is the ID of the operation as specified in the input request.
Expand Down Expand Up @@ -653,26 +690,21 @@ func (r *RIBHolder) DeleteIPv4(e *aftpb.Afts_Ipv4EntryKey) error {
return errors.New("invalid RIB structure, nil")
}

ribE := r.r.Afts.Ipv4Entry[e.GetPrefix()]
if ribE == nil {
return status.Newf(codes.NotFound, "cannot find IPv4Entry to delete, %s", e.Prefix).Err()
}

// This is an optional check, today some servers do not implement it and return true
// even if the load does not match. Compliance tests should note this.
if e.GetIpv4Entry() != nil {
existingEntryProto, err := concreteIPv4Proto(ribE)
// We do not mind if we don't find this entry - since this shouldn't be an
// error.
existingEntryProto, _, err := r.ipv4EntryProto(e.GetPrefix())
if err != nil {
return status.Newf(codes.Internal, "invalid existing entry in RIB %s", e).Err()
return err
}

if !proto.Equal(existingEntryProto, e) {
return status.Newf(codes.NotFound, "delete of an entry with non-matching, existing: %s, candidate: %s", existingEntryProto, e).Err()
}
}

de := r.r.Afts.Ipv4Entry[e.GetPrefix()]

r.doDeleteIPv4(e.GetPrefix())

if r.postChangeHook != nil {
Expand All @@ -682,6 +714,25 @@ func (r *RIBHolder) DeleteIPv4(e *aftpb.Afts_Ipv4EntryKey) error {
return nil
}

// ipv4EntryProto returns a protobuf message for the specified IPv4 prefix. It returns
// the found prefix as a Ipv4EntryKey protobuf, along with a bool indicating whether the
// prefix was found in the RIB.
func (r *RIBHolder) ipv4EntryProto(pfx string) (*aftpb.Afts_Ipv4EntryKey, bool, error) {
r.mu.RLock()
defer r.mu.RUnlock()
ribE := r.r.Afts.Ipv4Entry[pfx]
if ribE == nil {
return nil, false, nil
}

existingEntryProto, err := concreteIPv4Proto(ribE)
if err != nil {
return nil, true, status.Newf(codes.Internal, "invalid existing entry in RIB %v", ribE).Err()
}

return existingEntryProto, true, nil
}

// doDeleteIPv4 deletes pfx from the IPv4Entry RIB holding the shortest possible lock.
func (r *RIBHolder) doDeleteIPv4(pfx string) {
r.mu.Lock()
Expand Down Expand Up @@ -824,6 +875,48 @@ func concreteIPv4Proto(e *aft.Afts_Ipv4Entry) (*aftpb.Afts_Ipv4EntryKey, error)
}, nil
}

// concreteNextHopProto takes the input NextHop GoStruct and returns it as a gRIBI
// NextHopEntryKey protobuf. It returns an error if the protobuf cannot be marshalled.
func concreteNextHopProto(e *aft.Afts_NextHop) (*aftpb.Afts_NextHopKey, error) {
nhproto := &aftpb.Afts_NextHop{}
if err := protoFromGoStruct(e, &gpb.Path{
Elem: []*gpb.PathElem{{
Name: "afts",
}, {
Name: "next-hops",
}, {
Name: "next-hop",
}},
}, nhproto); err != nil {
return nil, fmt.Errorf("cannot marshal next-hop index %d, %v", e.GetIndex(), err)
}
return &aftpb.Afts_NextHopKey{
Index: *e.Index,
NextHop: nhproto,
}, nil
}

// concreteNextHopGroupProto takes the input NextHopGroup GoStruct and returns it as a gRIBI
// NextHopGroupEntryKey protobuf. It returns an error if the protobuf cannot be marshalled.
func concreteNextHopGroupProto(e *aft.Afts_NextHopGroup) (*aftpb.Afts_NextHopGroupKey, error) {
nhgproto := &aftpb.Afts_NextHopGroup{}
if err := protoFromGoStruct(e, &gpb.Path{
Elem: []*gpb.PathElem{{
Name: "afts",
}, {
Name: "next-hop-groups",
}, {
Name: "next-hop-group",
}},
}, nhgproto); err != nil {
return nil, fmt.Errorf("cannot marshal next-hop index %d, %v", e.GetId(), err)
}
return &aftpb.Afts_NextHopGroupKey{
Id: *e.Id,
NextHopGroup: nhgproto,
}, nil
}

// protoFromGoStruct takes the input GoStruct and marshals into the supplied pb
// protobuf message, trimming the prefix specified from the annotated paths within
// the protobuf.
Expand Down Expand Up @@ -851,3 +944,85 @@ func protoFromGoStruct(s ygot.GoStruct, prefix *gpb.Path, pb proto.Message) erro

return nil
}

func (r *RIBHolder) GetRIB(msgCh chan *spb.GetResponse, stopCh chan struct{}) error {
// TODO(robjs): since we are wanting to ensure that we tell the client
// exactly what is installed, this leads to a decision to make about locking
// of the RIB -- either we can go and lock the entire network instance RIB,
// or be more granular than that.
//
// * we take the NI-level lock: in the incoming master case, the client can
// ensure that they wait for the Get to complete before writing ==> there
// is no convergence impact. In the multi-master case (or even a consistency)
// check case, we impact convergence.
// * we take a more granular lock, in this case we do not impact convergence
// for any other entity than that individual entry.
//
// The latter is a better choice for a high-performance implementation, but
// its not clear that we need to worry about this for this implementation *yet*.
// In the future we should consider a fine-grained per-entry lock.
r.mu.RLock()
defer r.mu.RUnlock()

for pfx, e := range r.r.Afts.Ipv4Entry {
select {
case <-stopCh:
return nil
default:
p, err := concreteIPv4Proto(e)
if err != nil {
return status.Errorf(codes.Internal, "cannot marshal IPv4Entry for %s into GetResponse, %v", pfx, err)
}
msgCh <- &spb.GetResponse{
Entry: []*spb.AFTEntry{{
NetworkInstance: r.name,
Entry: &spb.AFTEntry_Ipv4{
Ipv4: p,
},
}},
}
}
}

for index, e := range r.r.Afts.NextHopGroup {
select {
case <-stopCh:
return nil
default:
p, err := concreteNextHopGroupProto(e)
if err != nil {
return status.Errorf(codes.Internal, "cannot marshal NextHopGroupEntry for index %d into GetResponse, %v", index, err)
}
msgCh <- &spb.GetResponse{
Entry: []*spb.AFTEntry{{
NetworkInstance: r.name,
Entry: &spb.AFTEntry_NextHopGroup{
NextHopGroup: p,
},
}},
}
}
}

for id, e := range r.r.Afts.NextHop {
select {
case <-stopCh:
return nil
default:
p, err := concreteNextHopProto(e)
if err != nil {
return status.Errorf(codes.Internal, "cannot marshal NextHopEntry for ID %d into GetResponse, %v", id, err)
}
msgCh <- &spb.GetResponse{
Entry: []*spb.AFTEntry{{
NetworkInstance: r.name,
Entry: &spb.AFTEntry_NextHop{
NextHop: p,
},
}},
}
}
}

return nil
}
Loading