Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 5 additions & 59 deletions xds/csds/csds.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ import (
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/timestamppb"

v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)
Expand Down Expand Up @@ -77,7 +74,7 @@ func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
return s, nil
}

// StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer.
// StreamClientStatus implements interface ClientStatusDiscoveryServiceServer.
func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statusgrpc.ClientStatusDiscoveryService_StreamClientStatusServer) error {
for {
req, err := stream.Recv()
Expand All @@ -97,13 +94,13 @@ func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statusgrpc.Cli
}
}

// FetchClientStatus implementations interface ClientStatusDiscoveryServiceServer.
// FetchClientStatus implements interface ClientStatusDiscoveryServiceServer.
func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
return s.buildClientStatusRespForReq(req)
}

// buildClientStatusRespForReq fetches the status from the client, and returns
// the response to be sent back to xdsclient.
// buildClientStatusRespForReq fetches the status of xDS resources from the
// xdsclient, and returns the response to be sent back to the csds client.
//
// If it returns an error, the error is a status error.
func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
Expand All @@ -119,16 +116,7 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp
return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers)
}

dump := s.xdsClient.DumpResources()
ret := &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
Node: s.xdsClient.BootstrapConfig().NodeProto,
GenericXdsConfigs: dumpToGenericXdsConfig(dump),
},
},
}
return ret, nil
return s.xdsClient.DumpResources()
}

// Close cleans up the resources.
Expand All @@ -137,45 +125,3 @@ func (s *ClientStatusDiscoveryServer) Close() {
s.xdsClientClose()
}
}

func dumpToGenericXdsConfig(dump map[string]map[string]xdsresource.UpdateWithMD) []*v3statuspb.ClientConfig_GenericXdsConfig {
var ret []*v3statuspb.ClientConfig_GenericXdsConfig
for typeURL, updates := range dump {
for name, update := range updates {
config := &v3statuspb.ClientConfig_GenericXdsConfig{
TypeUrl: typeURL,
Name: name,
VersionInfo: update.MD.Version,
XdsConfig: update.Raw,
LastUpdated: timestamppb.New(update.MD.Timestamp),
ClientStatus: serviceStatusToProto(update.MD.Status),
}
if errState := update.MD.ErrState; errState != nil {
config.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
ret = append(ret, config)
}
}
return ret
}

func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case xdsresource.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case xdsresource.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case xdsresource.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case xdsresource.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case xdsresource.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
}
}
48 changes: 40 additions & 8 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/transport"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)

type watchState int
Expand Down Expand Up @@ -586,26 +590,54 @@ func (a *authority) reportLoad() (*load.Store, func()) {
return a.transport.ReportLoad()
}

func (a *authority) dumpResources() map[string]map[string]xdsresource.UpdateWithMD {
func (a *authority) dumpResources() ([]*v3statuspb.ClientConfig_GenericXdsConfig, error) {
a.resourcesMu.Lock()
defer a.resourcesMu.Unlock()

dump := make(map[string]map[string]xdsresource.UpdateWithMD)
var ret []*v3statuspb.ClientConfig_GenericXdsConfig
for rType, resourceStates := range a.resources {
states := make(map[string]xdsresource.UpdateWithMD)
typeURL := rType.TypeURL()
for name, state := range resourceStates {
var raw *anypb.Any
if state.cache != nil {
raw = state.cache.Raw()
}
states[name] = xdsresource.UpdateWithMD{
MD: state.md,
Raw: raw,
config := &v3statuspb.ClientConfig_GenericXdsConfig{
TypeUrl: typeURL,
Name: name,
VersionInfo: state.md.Version,
XdsConfig: raw,
LastUpdated: timestamppb.New(state.md.Timestamp),
ClientStatus: serviceStatusToProto(state.md.Status),
}
if errState := state.md.ErrState; errState != nil {
config.ErrorState = &v3adminpb.UpdateFailureState{
LastUpdateAttempt: timestamppb.New(errState.Timestamp),
Details: errState.Err.Error(),
VersionInfo: errState.Version,
}
}
ret = append(ret, config)
}
dump[rType.TypeURL()] = states
}
return dump
return ret, nil
}

func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case xdsresource.ServiceStatusUnknown:
return v3adminpb.ClientResourceStatus_UNKNOWN
case xdsresource.ServiceStatusRequested:
return v3adminpb.ClientResourceStatus_REQUESTED
case xdsresource.ServiceStatusNotExist:
return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
case xdsresource.ServiceStatusACKed:
return v3adminpb.ClientResourceStatus_ACKED
case xdsresource.ServiceStatusNACKed:
return v3adminpb.ClientResourceStatus_NACKED
default:
return v3adminpb.ClientResourceStatus_UNKNOWN
}
}

func combineErrors(rType string, topLevelErrors []error, perResourceErrors map[string]error) error {
Expand Down
4 changes: 3 additions & 1 deletion xds/internal/xdsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"

v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)

// XDSClient is a full fledged gRPC client which queries a set of discovery APIs
Expand All @@ -48,7 +50,7 @@ type XDSClient interface {

// DumpResources returns the status of the xDS resources. Returns a map of
// resource type URLs to a map of resource names to resource state.
DumpResources() map[string]map[string]xdsresource.UpdateWithMD
DumpResources() (*v3statuspb.ClientStatusResponse, error)

ReportLoad(*bootstrap.ServerConfig) (*load.Store, func())

Expand Down
43 changes: 19 additions & 24 deletions xds/internal/xdsclient/clientimpl_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,30 @@
package xdsclient

import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
)

func appendMaps(dst, src map[string]map[string]xdsresource.UpdateWithMD) {
// Iterate through the resource types.
for rType, srcResources := range src {
// Lookup/create the resource type specific map in the destination.
dstResources := dst[rType]
if dstResources == nil {
dstResources = make(map[string]xdsresource.UpdateWithMD)
dst[rType] = dstResources
}

// Iterate through the resources within the resource type in the source,
// and copy them over to the destination.
for name, update := range srcResources {
dstResources[name] = update
}
}
}

// DumpResources returns the status and contents of all xDS resources.
func (c *clientImpl) DumpResources() map[string]map[string]xdsresource.UpdateWithMD {
func (c *clientImpl) DumpResources() (*v3statuspb.ClientStatusResponse, error) {
c.authorityMu.Lock()
defer c.authorityMu.Unlock()
dumps := make(map[string]map[string]xdsresource.UpdateWithMD)

var retCfg []*v3statuspb.ClientConfig_GenericXdsConfig
for _, a := range c.authorities {
dump := a.dumpResources()
appendMaps(dumps, dump)
cfg, err := a.dumpResources()
if err != nil {
return nil, err
}
retCfg = append(retCfg, cfg...)
}
return dumps

return &v3statuspb.ClientStatusResponse{
Config: []*v3statuspb.ClientConfig{
{
// TODO: Populate ClientScope. Need to update go-control-plane dependency.
Node: c.config.NodeProto,
GenericXdsConfigs: retCfg,
},
},
}, nil
}
Loading