@@ -25,20 +25,29 @@ import (
2525 "testing"
2626 "time"
2727
28+ "github.com/google/uuid"
2829 "google.golang.org/grpc"
2930 "google.golang.org/grpc/codes"
31+ "google.golang.org/grpc/connectivity"
3032 "google.golang.org/grpc/credentials/insecure"
3133 "google.golang.org/grpc/internal"
3234 "google.golang.org/grpc/internal/grpctest"
3335 "google.golang.org/grpc/internal/stubserver"
3436 "google.golang.org/grpc/internal/testutils"
3537 "google.golang.org/grpc/internal/testutils/xds/e2e"
38+ "google.golang.org/grpc/internal/testutils/xds/fakeserver"
39+ "google.golang.org/grpc/peer"
3640 "google.golang.org/grpc/resolver"
3741 "google.golang.org/grpc/status"
42+ "google.golang.org/protobuf/types/known/durationpb"
3843
44+ v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
3945 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
4046 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
41- "github.com/google/uuid"
47+ v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
48+ v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
49+ v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3"
50+ v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
4251 testgrpc "google.golang.org/grpc/interop/grpc_testing"
4352 testpb "google.golang.org/grpc/interop/grpc_testing"
4453
@@ -170,3 +179,183 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
170179 t .Fatal ("New LRS stream created when expected not to" )
171180 }
172181}
182+
183+ // Tests whether load is reported correctly when using pickfirst with endpoints
184+ // in multiple localities.
185+ func (s ) TestLoadReportingPickFirstMultiLocality (t * testing.T ) {
186+ // Create an xDS management server that serves ADS and LRS requests.
187+ mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {SupportLoadReportingService : true })
188+
189+ // Create bootstrap configuration pointing to the above management server.
190+ nodeID := uuid .New ().String ()
191+ bc := e2e .DefaultBootstrapContents (t , nodeID , mgmtServer .Address )
192+
193+ // Create an xDS resolver with the above bootstrap configuration.
194+ var resolverBuilder resolver.Builder
195+ var err error
196+ if newResolver := internal .NewXDSResolverWithConfigForTesting ; newResolver != nil {
197+ resolverBuilder , err = newResolver .(func ([]byte ) (resolver.Builder , error ))(bc )
198+ if err != nil {
199+ t .Fatalf ("Failed to create xDS resolver for testing: %v" , err )
200+ }
201+ }
202+
203+ // Start two server backends exposing the test service.
204+ server1 := stubserver .StartTestService (t , nil )
205+ defer server1 .Stop ()
206+
207+ server2 := stubserver .StartTestService (t , nil )
208+ defer server2 .Stop ()
209+
210+ // Configure the xDS management server.
211+ const serviceName = "my-test-xds-service"
212+ routeConfigName := "route-" + serviceName
213+ clusterName := "cluster-" + serviceName
214+ endpointsName := "endpoints-" + serviceName
215+ resources := e2e.UpdateOptions {
216+ NodeID : nodeID ,
217+ Listeners : []* v3listenerpb.Listener {e2e .DefaultClientListener (serviceName , routeConfigName )},
218+ Routes : []* v3routepb.RouteConfiguration {e2e .DefaultRouteConfig (routeConfigName , serviceName , clusterName )},
219+ Clusters : []* v3clusterpb.Cluster {
220+ {
221+ Name : clusterName ,
222+ ClusterDiscoveryType : & v3clusterpb.Cluster_Type {Type : v3clusterpb .Cluster_EDS },
223+ EdsClusterConfig : & v3clusterpb.Cluster_EdsClusterConfig {
224+ EdsConfig : & v3corepb.ConfigSource {
225+ ConfigSourceSpecifier : & v3corepb.ConfigSource_Ads {
226+ Ads : & v3corepb.AggregatedConfigSource {},
227+ },
228+ },
229+ ServiceName : endpointsName ,
230+ },
231+ // Specify a custom load balancing policy to use pickfirst.
232+ LoadBalancingPolicy : & v3clusterpb.LoadBalancingPolicy {
233+ Policies : []* v3clusterpb.LoadBalancingPolicy_Policy {
234+ {
235+ TypedExtensionConfig : & v3corepb.TypedExtensionConfig {
236+ TypedConfig : testutils .MarshalAny (t , & v3pickfirstpb.PickFirst {}),
237+ },
238+ },
239+ },
240+ },
241+ // Include a fake LRS server config pointing to self.
242+ LrsServer : & v3corepb.ConfigSource {
243+ ConfigSourceSpecifier : & v3corepb.ConfigSource_Self {
244+ Self : & v3corepb.SelfConfigSource {},
245+ },
246+ },
247+ },
248+ },
249+ Endpoints : []* v3endpointpb.ClusterLoadAssignment {e2e .EndpointResourceWithOptions (e2e.EndpointOptions {
250+ ClusterName : endpointsName ,
251+ Host : "localhost" ,
252+ Localities : []e2e.LocalityOptions {
253+ {
254+ Backends : []e2e.BackendOptions {
255+ {Port : testutils .ParsePort (t , server1 .Address )},
256+ },
257+ Weight : 1 ,
258+ },
259+ {
260+ Backends : []e2e.BackendOptions {
261+ {Port : testutils .ParsePort (t , server2 .Address )},
262+ },
263+ Weight : 2 ,
264+ },
265+ },
266+ })},
267+ }
268+
269+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
270+ defer cancel ()
271+ if err := mgmtServer .Update (ctx , resources ); err != nil {
272+ t .Fatal (err )
273+ }
274+
275+ // Create a ClientConn and make a successful RPC.
276+ cc , err := grpc .NewClient (fmt .Sprintf ("xds:///%s" , serviceName ),
277+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
278+ grpc .WithResolvers (resolverBuilder ))
279+ if err != nil {
280+ t .Fatalf ("Failed to dial local test server: %v" , err )
281+ }
282+ defer cc .Close ()
283+
284+ client := testgrpc .NewTestServiceClient (cc )
285+ var peer peer.Peer
286+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}, grpc .Peer (& peer )); err != nil {
287+ t .Fatalf ("rpc EmptyCall() failed: %v" , err )
288+ }
289+
290+ // Verify that the request was sent to server 1.
291+ if got , want := peer .Addr .String (), server1 .Address ; got != want {
292+ t .Errorf ("peer.Addr = %q, want = %q" , got , want )
293+ }
294+
295+ // Ensure that an LRS stream is created.
296+ if _ , err = mgmtServer .LRSServer .LRSStreamOpenChan .Receive (ctx ); err != nil {
297+ t .Fatalf ("Failure when waiting for an LRS stream to be opened: %v" , err )
298+ }
299+
300+ // Handle the initial LRS request from the xDS client.
301+ if _ , err = mgmtServer .LRSServer .LRSRequestChan .Receive (ctx ); err != nil {
302+ t .Fatalf ("Failure waiting for initial LRS request: %v" , err )
303+ }
304+
305+ resp := fakeserver.Response {
306+ Resp : & v3lrspb.LoadStatsResponse {
307+ SendAllClusters : true ,
308+ LoadReportingInterval : durationpb .New (10 * time .Millisecond ),
309+ },
310+ }
311+ mgmtServer .LRSServer .LRSResponseChan <- & resp
312+
313+ // Wait for load to be reported for locality of server 2.
314+ // We (incorrectly) wait for load report for region-2 because presently
315+ // pickfirst always reports load for the locality of the last address in the
316+ // subconn. This will be fixed by ensuring there is only one address per
317+ // subconn.
318+ // TODO(#7339): Change region to region-1 once fixed.
319+ if err := waitForSuccessfulLoadReport (ctx , mgmtServer .LRSServer , "region-2" ); err != nil {
320+ t .Fatalf ("region-2 did not receive load due to error: %v" , err )
321+ }
322+
323+ // Stop server 1 and send one more rpc. Now the request should go to server 2.
324+ server1 .Stop ()
325+
326+ // Wait for the balancer to pick up the server state change.
327+ testutils .AwaitState (ctx , t , cc , connectivity .Idle )
328+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}, grpc .Peer (& peer )); err != nil {
329+ t .Fatalf ("rpc EmptyCall() failed: %v" , err )
330+ }
331+
332+ // Verify that the request was sent to server 2.
333+ if got , want := peer .Addr .String (), server2 .Address ; got != want {
334+ t .Errorf ("peer.Addr = %q, want = %q" , got , want )
335+ }
336+
337+ // Wait for load to be reported for locality of server 2.
338+ if err := waitForSuccessfulLoadReport (ctx , mgmtServer .LRSServer , "region-2" ); err != nil {
339+ t .Fatalf ("Server 2 did not receive load due to error: %v" , err )
340+ }
341+ }
342+
343+ // waitForSuccessfulLoadReport waits for a successful request to be reported for
344+ // the specified locality region.
345+ func waitForSuccessfulLoadReport (ctx context.Context , lrsServer * fakeserver.Server , region string ) error {
346+ for {
347+ select {
348+ case <- ctx .Done ():
349+ return ctx .Err ()
350+ case req := <- lrsServer .LRSRequestChan .C :
351+ loadStats := req .(* fakeserver.Request ).Req .(* v3lrspb.LoadStatsRequest )
352+ for _ , load := range loadStats .ClusterStats {
353+ for _ , locality := range load .UpstreamLocalityStats {
354+ if locality .TotalSuccessfulRequests > 0 && locality .Locality .Region == region {
355+ return nil
356+ }
357+ }
358+ }
359+ }
360+ }
361+ }
0 commit comments