Skip to content

Commit d7f45cd

Browse files
authored
xds/server: create the xDS client when the xDS enabled gRPC server is created (#6446)
1 parent f1fc2ca commit d7f45cd

File tree

8 files changed

+515
-647
lines changed

8 files changed

+515
-647
lines changed

examples/features/xds/server/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ func main() {
8585
}
8686
}
8787

88-
greeterServer := xds.NewGRPCServer(grpc.Creds(creds))
88+
greeterServer, err := xds.NewGRPCServer(grpc.Creds(creds))
89+
if err != nil {
90+
log.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
91+
}
8992
pb.RegisterGreeterServer(greeterServer, &server{serverName: determineHostname()})
9093

9194
healthPort := fmt.Sprintf(":%d", *port+1)

internal/testutils/xds/e2e/setup_management_server.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package e2e
2020

2121
import (
2222
"encoding/json"
23+
"fmt"
2324
"path"
2425
"testing"
2526

@@ -55,18 +56,39 @@ func SetupManagementServer(t *testing.T, opts ManagementServerOptions) (*Managem
5556
}
5657
}()
5758

58-
// Create a directory to hold certs and key files used on the server side.
59-
serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
59+
nodeID := uuid.New().String()
60+
bootstrapContents, err := DefaultBootstrapContents(nodeID, server.Address)
6061
if err != nil {
6162
server.Stop()
6263
t.Fatal(err)
6364
}
65+
var rb resolver.Builder
66+
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
67+
rb, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
68+
if err != nil {
69+
server.Stop()
70+
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
71+
}
72+
}
73+
74+
return server, nodeID, bootstrapContents, rb, func() { server.Stop() }
75+
}
76+
77+
// DefaultBootstrapContents creates a default bootstrap configuration with the
78+
// given node ID and server URI. It also creates certificate provider
79+
// configuration and sets the listener resource name template to be used on the
80+
// server side.
81+
func DefaultBootstrapContents(nodeID, serverURI string) ([]byte, error) {
82+
// Create a directory to hold certs and key files used on the server side.
83+
serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
84+
if err != nil {
85+
return nil, fmt.Errorf("failed to create bootstrap configuration: %v", err)
86+
}
6487

6588
// Create a directory to hold certs and key files used on the client side.
6689
clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
6790
if err != nil {
68-
server.Stop()
69-
t.Fatal(err)
91+
return nil, fmt.Errorf("failed to create bootstrap configuration: %v", err)
7092
}
7193

7294
// Create certificate providers section of the bootstrap config with entries
@@ -76,27 +98,15 @@ func SetupManagementServer(t *testing.T, opts ManagementServerOptions) (*Managem
7698
ClientSideCertProviderInstance: DefaultFileWatcherConfig(path.Join(clientDir, certFile), path.Join(clientDir, keyFile), path.Join(clientDir, rootFile)),
7799
}
78100

79-
// Create a bootstrap file in a temporary directory.
80-
nodeID := uuid.New().String()
81-
bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
101+
// Create the bootstrap configuration.
102+
bs, err := bootstrap.Contents(bootstrap.Options{
82103
NodeID: nodeID,
83-
ServerURI: server.Address,
104+
ServerURI: serverURI,
84105
CertificateProviders: cpc,
85106
ServerListenerResourceNameTemplate: ServerListenerResourceNameTemplate,
86107
})
87108
if err != nil {
88-
server.Stop()
89-
t.Fatalf("Failed to create bootstrap file: %v", err)
109+
return nil, fmt.Errorf("failed to create bootstrap configuration: %v", err)
90110
}
91-
92-
var rb resolver.Builder
93-
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
94-
rb, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
95-
if err != nil {
96-
server.Stop()
97-
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
98-
}
99-
}
100-
101-
return server, nodeID, bootstrapContents, rb, func() { server.Stop() }
111+
return bs, nil
102112
}

interop/xds/server/server.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,10 @@ func main() {
156156

157157
// Create an xDS enabled gRPC server, register the test service
158158
// implementation and start serving.
159-
testServer := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
159+
testServer, err := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
160+
if err != nil {
161+
logger.Fatal("Failed to create an xDS enabled gRPC server: %v", err)
162+
}
160163
testgrpc.RegisterTestServiceServer(testServer, testService)
161164
go func() {
162165
if err := testServer.Serve(testLis); err != nil {

test/xds/xds_client_ignore_resource_deletion_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,10 @@ func setupGRPCServerWithModeChangeChannelAndServe(t *testing.T, bootstrapContent
306306
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
307307
updateCh <- args.Mode
308308
})
309-
server := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
309+
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
310+
if err != nil {
311+
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
312+
}
310313
t.Cleanup(server.Stop)
311314
testgrpc.RegisterTestServiceServer(server, &testService{})
312315

test/xds/xds_server_integration_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,10 @@ func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func
8282
})
8383

8484
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
85-
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
85+
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
86+
if err != nil {
87+
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
88+
}
8689
testgrpc.RegisterTestServiceServer(server, &testService{})
8790

8891
// Create a local listener and pass it to Serve().

test/xds/xds_server_serving_mode_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
6363
})
6464

6565
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
66-
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
66+
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
67+
if err != nil {
68+
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
69+
}
6770
defer server.Stop()
6871
testgrpc.RegisterTestServiceServer(server, &testService{})
6972

@@ -205,7 +208,10 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
205208
})
206209

207210
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
208-
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
211+
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
212+
if err != nil {
213+
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
214+
}
209215
defer server.Stop()
210216
testgrpc.RegisterTestServiceServer(server, &testService{})
211217

xds/server.go

Lines changed: 51 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"errors"
2424
"fmt"
2525
"net"
26-
"sync"
2726

2827
"google.golang.org/grpc"
2928
"google.golang.org/grpc/codes"
@@ -76,24 +75,19 @@ type grpcServer interface {
7675
// grpc.ServiceRegistrar interface and can be passed to service registration
7776
// functions in IDL generated code.
7877
type GRPCServer struct {
79-
gs grpcServer
80-
quit *grpcsync.Event
81-
logger *internalgrpclog.PrefixLogger
82-
xdsCredsInUse bool
83-
opts *serverOptions
84-
85-
// clientMu is used only in initXDSClient(), which is called at the
86-
// beginning of Serve(), where we have to decide if we have to create a
87-
// client or use an existing one.
88-
clientMu sync.Mutex
78+
gs grpcServer
79+
quit *grpcsync.Event
80+
logger *internalgrpclog.PrefixLogger
81+
xdsCredsInUse bool
82+
opts *serverOptions
8983
xdsC xdsclient.XDSClient
9084
xdsClientClose func()
9185
}
9286

9387
// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
9488
// The underlying gRPC server has no service registered and has not started to
9589
// accept requests yet.
96-
func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
90+
func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) {
9791
newOpts := []grpc.ServerOption{
9892
grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
9993
grpc.ChainStreamInterceptor(xdsStreamInterceptor),
@@ -103,8 +97,6 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
10397
gs: newGRPCServer(newOpts...),
10498
quit: grpcsync.NewEvent(),
10599
}
106-
s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
107-
s.logger.Infof("Created xds.GRPCServer")
108100
s.handleServerOptions(opts)
109101

110102
// We type assert our underlying gRPC server to the real grpc.Server here
@@ -119,8 +111,48 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
119111
}
120112
}
121113

114+
// Initializing the xDS client upfront (instead of at serving time)
115+
// simplifies the code by eliminating the need for a mutex to protect the
116+
// xdsC and xdsClientClose fields.
117+
newXDSClient := newXDSClient
118+
if s.opts.bootstrapContentsForTesting != nil {
119+
// Bootstrap file contents may be specified as a server option for tests.
120+
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
121+
return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
122+
}
123+
}
124+
xdsClient, xdsClientClose, err := newXDSClient()
125+
if err != nil {
126+
return nil, fmt.Errorf("xDS client creation failed: %v", err)
127+
}
128+
129+
// Validate the bootstrap configuration for server specific fields.
130+
131+
// Listener resource name template is mandatory on the server side.
132+
cfg := xdsClient.BootstrapConfig()
133+
if cfg.ServerListenerResourceNameTemplate == "" {
134+
xdsClientClose()
135+
return nil, errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
136+
}
137+
138+
// If xds credentials were specified by the user, but bootstrap configs do
139+
// not contain any certificate provider configuration, it is better to fail
140+
// right now rather than failing when attempting to create certificate
141+
// providers after receiving an LDS response with security configuration.
142+
if s.xdsCredsInUse {
143+
if len(cfg.CertProviderConfigs) == 0 {
144+
xdsClientClose()
145+
return nil, fmt.Errorf("xds credentials are passed to the user, but certificate_providers config is missing in the bootstrap configuration")
146+
}
147+
}
148+
s.xdsC = xdsClient
149+
s.xdsClientClose = xdsClientClose
150+
151+
s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
152+
s.logger.Infof("Created xds.GRPCServer")
122153
s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse)
123-
return s
154+
155+
return s, nil
124156
}
125157

126158
// handleServerOptions iterates through the list of server options passed in by
@@ -169,35 +201,6 @@ func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
169201
return s.gs.GetServiceInfo()
170202
}
171203

172-
// initXDSClient creates a new xdsClient if there is no existing one available.
173-
func (s *GRPCServer) initXDSClient() error {
174-
s.clientMu.Lock()
175-
defer s.clientMu.Unlock()
176-
if s.quit.HasFired() {
177-
return grpc.ErrServerStopped
178-
}
179-
180-
if s.xdsC != nil {
181-
return nil
182-
}
183-
184-
newXDSClient := newXDSClient
185-
if s.opts.bootstrapContentsForTesting != nil {
186-
// Bootstrap file contents may be specified as a server option for tests.
187-
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
188-
return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
189-
}
190-
}
191-
192-
client, close, err := newXDSClient()
193-
if err != nil {
194-
return fmt.Errorf("xds: failed to create xds-client: %v", err)
195-
}
196-
s.xdsC = client
197-
s.xdsClientClose = close
198-
return nil
199-
}
200-
201204
// Serve gets the underlying gRPC server to accept incoming connections on the
202205
// listener lis, which is expected to be listening on a TCP port.
203206
//
@@ -211,35 +214,16 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
211214
return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
212215
}
213216

214-
// If this is the first time Serve() is being called, we need to initialize
215-
// our xdsClient. If not, we can use the existing one.
216-
if err := s.initXDSClient(); err != nil {
217-
return err
218-
}
219-
cfg := s.xdsC.BootstrapConfig()
220-
if cfg == nil {
221-
return errors.New("bootstrap configuration is empty")
222-
}
223-
224-
// If xds credentials were specified by the user, but bootstrap configs do
225-
// not contain any certificate provider configuration, it is better to fail
226-
// right now rather than failing when attempting to create certificate
227-
// providers after receiving an LDS response with security configuration.
228-
if s.xdsCredsInUse {
229-
if len(cfg.CertProviderConfigs) == 0 {
230-
return errors.New("xds: certificate_providers config missing in bootstrap file")
231-
}
217+
if s.quit.HasFired() {
218+
return grpc.ErrServerStopped
232219
}
233220

234221
// The server listener resource name template from the bootstrap
235222
// configuration contains a template for the name of the Listener resource
236223
// to subscribe to for a gRPC server. If the token `%s` is present in the
237224
// string, it will be replaced with the server's listening "IP:port" (e.g.,
238-
// "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
239-
// as an error since we do not have any default value for this.
240-
if cfg.ServerListenerResourceNameTemplate == "" {
241-
return errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
242-
}
225+
// "0.0.0.0:8080", "[::]:8080").
226+
cfg := s.xdsC.BootstrapConfig()
243227
name := bootstrap.PopulateResourceTemplate(cfg.ServerListenerResourceNameTemplate, lis.Addr().String())
244228

245229
modeUpdateCh := buffer.NewUnbounded()
@@ -335,8 +319,6 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
335319
// corresponding pending RPCs on the client side will get notified by connection
336320
// errors.
337321
func (s *GRPCServer) Stop() {
338-
s.clientMu.Lock()
339-
defer s.clientMu.Unlock()
340322
s.quit.Fire()
341323
s.gs.Stop()
342324
if s.xdsC != nil {
@@ -348,8 +330,6 @@ func (s *GRPCServer) Stop() {
348330
// from accepting new connections and RPCs and blocks until all the pending RPCs
349331
// are finished.
350332
func (s *GRPCServer) GracefulStop() {
351-
s.clientMu.Lock()
352-
defer s.clientMu.Unlock()
353333
s.quit.Fire()
354334
s.gs.GracefulStop()
355335
if s.xdsC != nil {

0 commit comments

Comments
 (0)