Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions internal/xds/clients/lrsclient/lrsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ type LRSClient struct {

// New returns a new LRS Client configured with the provided config.
func New(config Config) (*LRSClient, error) {
switch {
case config.Node.ID == "":
return nil, errors.New("lrsclient: node ID in node is empty")
case config.TransportBuilder == nil:
if config.TransportBuilder == nil {
return nil, errors.New("lrsclient: transport builder is nil")
}

Expand Down
16 changes: 15 additions & 1 deletion internal/xds/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,21 @@ func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecor
if err != nil {
return nil, err
}
c := &clientImpl{XDSClient: client, xdsClientConfig: gConfig, bootstrapConfig: config, target: target, refCount: 1}
lrsC, err := lrsclient.New(lrsclient.Config{
Node: gConfig.Node,
TransportBuilder: gConfig.TransportBuilder,
})
if err != nil {
return nil, err
}
c := &clientImpl{
XDSClient: client,
xdsClientConfig: gConfig,
bootstrapConfig: config,
target: target,
refCount: 1,
lrsClient: lrsC,
}
c.logger = prefixLogger(c)
return c, nil
}
Expand Down
12 changes: 0 additions & 12 deletions internal/xds/xdsclient/clientimpl_loadreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@ import (
//
// It returns a lrsclient.LoadStore for the user to report loads.
func (c *clientImpl) ReportLoad(server *bootstrap.ServerConfig) (*lrsclient.LoadStore, func(context.Context)) {
if c.lrsClient == nil {
lrsC, err := lrsclient.New(lrsclient.Config{
Node: c.xdsClientConfig.Node,
TransportBuilder: c.xdsClientConfig.TransportBuilder,
})
if err != nil {
c.logger.Warningf("Failed to create an lrs client to the management server to report load: %v", server, err)
return nil, func(context.Context) {}
}
c.lrsClient = lrsC
}

load, err := c.lrsClient.ReportLoad(clients.ServerIdentifier{
ServerURI: server.ServerURI(),
Extensions: grpctransport.ServerIdentifierExtension{
Expand Down
118 changes: 118 additions & 0 deletions internal/xds/xdsclient/tests/loadreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,26 @@ import (
"encoding/json"
"fmt"
"net"
"sync"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/testutils/xds/fakeserver"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/xdsclient"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/testing/protocmp"

Expand Down Expand Up @@ -437,3 +445,113 @@ func (s) TestReportLoad_StreamCreation(t *testing.T) {
defer sCancel3()
cancel3(sCtx3)
}

// TestConcurrentReportLoad verifies that the client can safely handle concurrent
// requests to initiate load reporting streams. It launches multiple goroutines
// that all call client.ReportLoad simultaneously.
func (s) TestConcurrentReportLoad(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{SupportLoadReportingService: true})
nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
client := createXDSClient(t, bc)

serverConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{URI: mgmtServer.Address})
if err != nil {
t.Fatalf("Failed to create server config for testing: %v", err)
}

// Call ReportLoad() concurrently from multiple go routines.
var wg sync.WaitGroup
const numGoroutines = 10
wg.Add(numGoroutines)
for range numGoroutines {
go func() {
defer wg.Done()
_, cancelStore := client.ReportLoad(serverConfig)
defer cancelStore(ctx)
}()
}
wg.Wait()
}

// TestConcurrentChannels verifies that we can create multiple gRPC channels
// concurrently with a shared XDSClient, each of which will create a new LRS
// stream without any race.
func (s) TestConcurrentChannels(t *testing.T) {
// TODO(emchandwani) : Remove after https://github.com/grpc/grpc-go/pull/8526 gets merged.
t.Skip()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true, SupportLoadReportingService: true})

nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

if internal.NewXDSResolverWithPoolForTesting == nil {
t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil")
}

config, err := bootstrap.NewConfigFromContents(bc)
if err != nil {
t.Fatalf("Failed to parse bootstrap contents: %s, %v", string(bc), err)
}
pool := xdsclient.NewPool(config)

resolverBuilder := internal.NewXDSResolverWithPoolForTesting.(func(*xdsclient.Pool) (resolver.Builder, error))
xdsResolver, err := resolverBuilder(pool)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}

server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure the management server with resources that enable LRS.
const serviceName = "my-service-e2e-lrs-test"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: testutils.ParsePort(t, server.Address),
SecLevel: e2e.SecurityLevelNone,
})
resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
Self: &v3corepb.SelfConfigSource{},
},
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup
const (
numGoroutines = 10
numRPCs = 10
)
for range numGoroutines {
wg.Add(1)
go func() {
defer wg.Done()
for range numRPCs {
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver))
if err != nil {
t.Errorf("grpc.NewClient() failed: %v", err)
return
}
defer cc.Close()

testClient := testgrpc.NewTestServiceClient(cc)
if _, err := testClient.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("EmptyCall() failed: %v", err)
return
}
}
}()
}
wg.Wait()
}