Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/features/xds/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func main() {
}
}

greeterServer := xds.NewGRPCServer(grpc.Creds(creds))
greeterServer, err := xds.NewGRPCServer(grpc.Creds(creds))
if err != nil {
log.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
pb.RegisterGreeterServer(greeterServer, &server{serverName: determineHostname()})

healthPort := fmt.Sprintf(":%d", *port+1)
Expand Down
52 changes: 31 additions & 21 deletions internal/testutils/xds/e2e/setup_management_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package e2e

import (
"encoding/json"
"fmt"
"path"
"testing"

Expand Down Expand Up @@ -55,18 +56,39 @@ func SetupManagementServer(t *testing.T, opts ManagementServerOptions) (*Managem
}
}()

// Create a directory to hold certs and key files used on the server side.
serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
nodeID := uuid.New().String()
bootstrapContents, err := DefaultBootstrapContents(nodeID, server.Address)
if err != nil {
server.Stop()
t.Fatal(err)
}
var rb resolver.Builder
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
rb, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
server.Stop()
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}

return server, nodeID, bootstrapContents, rb, func() { server.Stop() }
}

// DefaultBootstrapContents creates a default bootstrap configuration with the
// given node ID and server URI. It also creates certificate provider
// configuration and sets the listener resource name template to be used on the
// server side.
func DefaultBootstrapContents(nodeID, serverURI string) ([]byte, error) {
// Create a directory to hold certs and key files used on the server side.
serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
if err != nil {
return nil, fmt.Errorf("failed to create bootstrap configuration: %v", err)
}

// Create a directory to hold certs and key files used on the client side.
clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
if err != nil {
server.Stop()
t.Fatal(err)
return nil, fmt.Errorf("failed to create bootstrap configuration: %v", err)
}

// Create certificate providers section of the bootstrap config with entries
Expand All @@ -76,27 +98,15 @@ func SetupManagementServer(t *testing.T, opts ManagementServerOptions) (*Managem
ClientSideCertProviderInstance: DefaultFileWatcherConfig(path.Join(clientDir, certFile), path.Join(clientDir, keyFile), path.Join(clientDir, rootFile)),
}

// Create a bootstrap file in a temporary directory.
nodeID := uuid.New().String()
bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
// Create the bootstrap configuration.
bs, err := bootstrap.Contents(bootstrap.Options{
NodeID: nodeID,
ServerURI: server.Address,
ServerURI: serverURI,
CertificateProviders: cpc,
ServerListenerResourceNameTemplate: ServerListenerResourceNameTemplate,
})
if err != nil {
server.Stop()
t.Fatalf("Failed to create bootstrap file: %v", err)
return nil, fmt.Errorf("failed to create bootstrap configuration: %v", err)
}

var rb resolver.Builder
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
rb, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
server.Stop()
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}

return server, nodeID, bootstrapContents, rb, func() { server.Stop() }
return bs, nil
}
5 changes: 4 additions & 1 deletion interop/xds/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ func main() {

// Create an xDS enabled gRPC server, register the test service
// implementation and start serving.
testServer := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
testServer, err := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
if err != nil {
logger.Fatal("Failed to create an xDS enabled gRPC server: %v", err)
}
testgrpc.RegisterTestServiceServer(testServer, testService)
go func() {
if err := testServer.Serve(testLis); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion test/xds/xds_client_ignore_resource_deletion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ func setupGRPCServerWithModeChangeChannelAndServe(t *testing.T, bootstrapContent
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
updateCh <- args.Mode
})
server := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
t.Cleanup(server.Stop)
testgrpc.RegisterTestServiceServer(server, &testService{})

Expand Down
5 changes: 4 additions & 1 deletion test/xds/xds_server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func
})

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
testgrpc.RegisterTestServiceServer(server, &testService{})

// Create a local listener and pass it to Serve().
Expand Down
10 changes: 8 additions & 2 deletions test/xds/xds_server_serving_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
})

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})

Expand Down Expand Up @@ -205,7 +208,10 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
})

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
if err != nil {
t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
}
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})

Expand Down
122 changes: 51 additions & 71 deletions xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"net"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -76,24 +75,19 @@ type grpcServer interface {
// grpc.ServiceRegistrar interface and can be passed to service registration
// functions in IDL generated code.
type GRPCServer struct {
gs grpcServer
quit *grpcsync.Event
logger *internalgrpclog.PrefixLogger
xdsCredsInUse bool
opts *serverOptions

// clientMu is used only in initXDSClient(), which is called at the
// beginning of Serve(), where we have to decide if we have to create a
// client or use an existing one.
clientMu sync.Mutex
gs grpcServer
quit *grpcsync.Event
logger *internalgrpclog.PrefixLogger
xdsCredsInUse bool
opts *serverOptions
xdsC xdsclient.XDSClient
xdsClientClose func()
}

// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
// The underlying gRPC server has no service registered and has not started to
// accept requests yet.
func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) {
newOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
grpc.ChainStreamInterceptor(xdsStreamInterceptor),
Expand All @@ -103,8 +97,6 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
gs: newGRPCServer(newOpts...),
quit: grpcsync.NewEvent(),
}
s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
s.logger.Infof("Created xds.GRPCServer")
s.handleServerOptions(opts)

// We type assert our underlying gRPC server to the real grpc.Server here
Expand All @@ -119,8 +111,48 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
}
}

// Initializing the xDS client upfront (instead of at serving time)
// simplifies the code by eliminating the need for a mutex to protect the
// xdsC and xdsClientClose fields.
newXDSClient := newXDSClient
if s.opts.bootstrapContentsForTesting != nil {
// Bootstrap file contents may be specified as a server option for tests.
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
}
}
xdsClient, xdsClientClose, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("xDS client creation failed: %v", err)
}

// Validate the bootstrap configuration for server specific fields.

// Listener resource name template is mandatory on the server side.
cfg := xdsClient.BootstrapConfig()
if cfg.ServerListenerResourceNameTemplate == "" {
xdsClientClose()
return nil, errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
}

// If xds credentials were specified by the user, but bootstrap configs do
// not contain any certificate provider configuration, it is better to fail
// right now rather than failing when attempting to create certificate
// providers after receiving an LDS response with security configuration.
if s.xdsCredsInUse {
if len(cfg.CertProviderConfigs) == 0 {
xdsClientClose()
return nil, fmt.Errorf("xds credentials are passed to the user, but certificate_providers config is missing in the bootstrap configuration")
}
}
s.xdsC = xdsClient
s.xdsClientClose = xdsClientClose

s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
s.logger.Infof("Created xds.GRPCServer")
s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse)
return s

return s, nil
}

// handleServerOptions iterates through the list of server options passed in by
Expand Down Expand Up @@ -169,35 +201,6 @@ func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
return s.gs.GetServiceInfo()
}

// initXDSClient creates a new xdsClient if there is no existing one available.
func (s *GRPCServer) initXDSClient() error {
s.clientMu.Lock()
defer s.clientMu.Unlock()
if s.quit.HasFired() {
return grpc.ErrServerStopped
}

if s.xdsC != nil {
return nil
}

newXDSClient := newXDSClient
if s.opts.bootstrapContentsForTesting != nil {
// Bootstrap file contents may be specified as a server option for tests.
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
}
}

client, close, err := newXDSClient()
if err != nil {
return fmt.Errorf("xds: failed to create xds-client: %v", err)
}
s.xdsC = client
s.xdsClientClose = close
return nil
}

// Serve gets the underlying gRPC server to accept incoming connections on the
// listener lis, which is expected to be listening on a TCP port.
//
Expand All @@ -211,35 +214,16 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
}

// If this is the first time Serve() is being called, we need to initialize
// our xdsClient. If not, we can use the existing one.
if err := s.initXDSClient(); err != nil {
return err
}
cfg := s.xdsC.BootstrapConfig()
if cfg == nil {
return errors.New("bootstrap configuration is empty")
}

// If xds credentials were specified by the user, but bootstrap configs do
// not contain any certificate provider configuration, it is better to fail
// right now rather than failing when attempting to create certificate
// providers after receiving an LDS response with security configuration.
if s.xdsCredsInUse {
if len(cfg.CertProviderConfigs) == 0 {
return errors.New("xds: certificate_providers config missing in bootstrap file")
}
if s.quit.HasFired() {
return grpc.ErrServerStopped
}

// The server listener resource name template from the bootstrap
// configuration contains a template for the name of the Listener resource
// to subscribe to for a gRPC server. If the token `%s` is present in the
// string, it will be replaced with the server's listening "IP:port" (e.g.,
// "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
// as an error since we do not have any default value for this.
if cfg.ServerListenerResourceNameTemplate == "" {
return errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
}
// "0.0.0.0:8080", "[::]:8080").
cfg := s.xdsC.BootstrapConfig()
name := bootstrap.PopulateResourceTemplate(cfg.ServerListenerResourceNameTemplate, lis.Addr().String())

modeUpdateCh := buffer.NewUnbounded()
Expand Down Expand Up @@ -335,8 +319,6 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
// corresponding pending RPCs on the client side will get notified by connection
// errors.
func (s *GRPCServer) Stop() {
s.clientMu.Lock()
defer s.clientMu.Unlock()
s.quit.Fire()
s.gs.Stop()
if s.xdsC != nil {
Expand All @@ -348,8 +330,6 @@ func (s *GRPCServer) Stop() {
// from accepting new connections and RPCs and blocks until all the pending RPCs
// are finished.
func (s *GRPCServer) GracefulStop() {
s.clientMu.Lock()
defer s.clientMu.Unlock()
s.quit.Fire()
s.gs.GracefulStop()
if s.xdsC != nil {
Expand Down
Loading