From f8aed9ec509d738a84cd5c3e4c42142273522f69 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 26 Jul 2024 18:32:42 -0400 Subject: [PATCH 1/5] Add recording point for endpoint weight not yet usable and add metrics tests --- balancer/weightedroundrobin/balancer.go | 24 +- balancer/weightedroundrobin/balancer_test.go | 67 ++++- balancer/weightedroundrobin/scheduler.go | 12 +- .../testutils/stats/test_metrics_recorder.go | 81 ++++- internal/testutils/xds/e2e/setup/setup.go | 62 ++++ stats/opentelemetry/e2e_test.go | 276 +++++++++++++++++- stats/opentelemetry/go.mod | 4 +- stats/opentelemetry/go.sum | 4 + test/xds/xds_client_affinity_test.go | 3 +- .../xds_client_certificate_providers_test.go | 3 +- test/xds/xds_client_custom_lb_test.go | 3 +- test/xds/xds_client_federation_test.go | 5 +- test/xds/xds_client_integration_test.go | 3 +- test/xds/xds_client_outlier_detection_test.go | 5 +- test/xds/xds_client_retry_test.go | 3 +- .../xds_rls_clusterspecifier_plugin_test.go | 3 +- test/xds/xds_security_config_nack_test.go | 3 +- .../xds_server_certificate_providers_test.go | 3 +- test/xds/xds_server_integration_test.go | 5 +- test/xds/xds_server_rbac_test.go | 7 +- test/xds/xds_server_serving_mode_test.go | 5 +- test/xds/xds_server_test.go | 7 +- test/xds/xds_telemetry_labels_test.go | 3 +- 23 files changed, 543 insertions(+), 48 deletions(-) create mode 100644 internal/testutils/xds/e2e/setup/setup.go diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index 6e50ab56d86d..38787a3bcd85 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -415,7 +415,7 @@ func (p *picker) inc() uint32 { } func (p *picker) regenerateScheduler() { - s := p.newScheduler() + s := p.newScheduler(true) atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s)) } @@ -558,14 +558,17 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect w.SubConn.Connect() case connectivity.Ready: // If we transition back to READY state, reset nonEmptySince so that we - // apply the blackout period after we start receiving load data. Note - // that we cannot guarantee that we will never receive lingering - // callbacks for backend metric reports from the previous connection - // after the new connection has been established, but they should be - // masked by new backend metric reports from the new connection by the - // time the blackout period ends. + // apply the blackout period after we start receiving load data. Also + // reset lastUpdated to trigger endpoint weight not yet usable in the + // case endpoint gets asked what weight it is before receiving a new + // load report. Note that we cannot guarantee that we will never receive + // lingering callbacks for backend metric reports from the previous + // connection after the new connection has been established, but they + // should be masked by new backend metric reports from the new + // connection by the time the blackout period ends. w.mu.Lock() w.nonEmptySince = time.Time{} + w.lastUpdated = time.Time{} w.mu.Unlock() case connectivity.Shutdown: if w.stopORCAListener != nil { @@ -603,6 +606,13 @@ func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackout }() } + // The SubConn has not received a load report (i.e. just turned READY with + // no load report). + if w.lastUpdated == (time.Time{}) { + endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality) + return 0 + } + // If the most recent update was longer ago than the expiration period, // reset nonEmptySince so that we apply the blackout period again if we // start getting data again in the future, and return 0. diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index 6567cdfb3d93..9579cdc99862 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils/roundrobin" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/orca" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -81,6 +82,14 @@ var ( WeightUpdatePeriod: stringp(".050s"), ErrorUtilizationPenalty: float64p(0), } + testMetricsConfig = iwrr.LBConfig{ + EnableOOBLoadReport: boolp(false), + OOBReportingPeriod: stringp("0.005s"), + BlackoutPeriod: stringp("0s"), + WeightExpirationPeriod: stringp("60s"), + WeightUpdatePeriod: stringp(".050s"), + ErrorUtilizationPenalty: float64p(0), + } ) type testServer struct { @@ -196,6 +205,43 @@ func (s) TestBalancer_OneAddress(t *testing.T) { } } +// TestWRRMetricsBasic tests metrics emitted from the WRR balancer. It +// configures a weighted round robin balancer as the top level balancer of a +// ClientConn, and configures a fake stats handler on the ClientConn to receive +// metrics. It verifies stats emitted from the Weighted Round Robin Balancer on +// balancer startup case which triggers the first picker and scheduler update +// before any load reports are received. +// +// Note that this test and others, metrics emission asssertions are a snapshot +// of the most recently emitted metrics. This is due to the nondeterminism of +// scheduler updates with respect to test bodies, so the assertions made are +// from the most recently synced state of the system (picker/scheduler) from the +// test body. +func (s) TestWRRMetricsBasic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + srv := startServer(t, reportCall) + sc := svcConfig(t, testMetricsConfig) + + mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { + t.Fatalf("Error starting client: %v", err) + } + srv.callMetrics.SetQPS(float64(1)) + + if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("Error from EmptyCall: %v", err) + } + + mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn. + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted). + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) + // Unusable, so no endpoint weight. Due to only one SubConn, this will never + // update the weight. Thus, this will stay 0. + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) +} + // Tests two addresses with ORCA reporting disabled (should fall back to pure // RR). func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) { @@ -509,7 +555,8 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { cfg := oobConfig cfg.BlackoutPeriod = tc.blackoutPeriodCfg sc := svcConfig(t, cfg) - if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { + mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { t.Fatalf("Error starting client: %v", err) } addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} @@ -536,12 +583,20 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { // Wait for the weight update period to allow the new weights to be processed. time.Sleep(weightUpdatePeriod) checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) + + mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) + // Either 10 or 100, dependent on whatever ordering SubConns are + // processed, which is nondeterministic. + mr.AssertEitherDataForMetric("grpc.lb.wrr.endpoint_weights", 10, 100) } } // Tests that the weight expiration period causes backends to use 0 as their // weight (meaning to use the average weight) once the expiration period -// elapses. +// elapses. After the weight expires, the expected metrics to be emitted from +// WRR are also configured. func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -577,7 +632,8 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { cfg := oobConfig cfg.OOBReportingPeriod = stringp("60s") sc := svcConfig(t, cfg) - if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { + mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { t.Fatalf("Error starting client: %v", err) } addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} @@ -605,6 +661,11 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { // Wait for the weight expiration period so the weights have expired. time.Sleep(weightUpdatePeriod) checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1}) + + mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) + mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) } // Tests logic surrounding subchannel management. diff --git a/balancer/weightedroundrobin/scheduler.go b/balancer/weightedroundrobin/scheduler.go index a33c5591ddd5..56aa15da10d2 100644 --- a/balancer/weightedroundrobin/scheduler.go +++ b/balancer/weightedroundrobin/scheduler.go @@ -31,14 +31,16 @@ type scheduler interface { // len(scWeights)-1 are zero or there is only a single subconn, otherwise it // will return an Earliest Deadline First (EDF) scheduler implementation that // selects the subchannels according to their weights. -func (p *picker) newScheduler() scheduler { - scWeights := p.scWeights(true) +func (p *picker) newScheduler(recordMetrics bool) scheduler { + scWeights := p.scWeights(recordMetrics) n := len(scWeights) if n == 0 { return nil } if n == 1 { - rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + if recordMetrics { + rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + } return &rrScheduler{numSCs: 1, inc: p.inc} } sum := float64(0) @@ -55,7 +57,9 @@ func (p *picker) newScheduler() scheduler { } if numZero >= n-1 { - rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + if recordMetrics { + rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + } return &rrScheduler{numSCs: uint32(n), inc: p.inc} } unscaledMean := sum / float64(n-numZero) diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index 25817be50b37..e0634ae5c41f 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -21,7 +21,9 @@ package stats import ( "context" + "sync" "testing" + "time" "github.com/google/go-cmp/cmp" estats "google.golang.org/grpc/experimental/stats" @@ -41,18 +43,65 @@ type TestMetricsRecorder struct { intHistoCh *testutils.Channel floatHistoCh *testutils.Channel intGaugeCh *testutils.Channel + + // The most recent update for each metric name. + mu sync.Mutex + data map[estats.Metric]float64 } func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder { - return &TestMetricsRecorder{ + tmr := &TestMetricsRecorder{ t: t, - intCountCh: testutils.NewChannelWithSize(10), - floatCountCh: testutils.NewChannelWithSize(10), - intHistoCh: testutils.NewChannelWithSize(10), - floatHistoCh: testutils.NewChannelWithSize(10), - intGaugeCh: testutils.NewChannelWithSize(10), + intCountCh: testutils.NewChannelWithSize(1000), + floatCountCh: testutils.NewChannelWithSize(1000), + intHistoCh: testutils.NewChannelWithSize(1000), + floatHistoCh: testutils.NewChannelWithSize(1000), + intGaugeCh: testutils.NewChannelWithSize(1000), + + data: make(map[estats.Metric]float64), + } + + for _, metric := range metrics { + tmr.data[estats.Metric(metric)] = 0 + } + + return tmr +} + +// AssertDataForMetric asserts data is present for metric. The zero value in the +// check is equivalent to unset. +func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal float64) { + r.mu.Lock() + defer r.mu.Unlock() + if r.data[estats.Metric(metricName)] != wantVal { + r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", metricName, r.data[estats.Metric(metricName)], wantVal) + } +} + +// AssertEitherDataForMetric asserts either data point is present for metric. +// The zero value in the check is equivalent to unset. + +func (r *TestMetricsRecorder) AssertEitherDataForMetric(metricName string, wantVal1 float64, wantVal2 float64) { + r.mu.Lock() + defer r.mu.Unlock() + if r.data[estats.Metric(metricName)] != wantVal1 && r.data[estats.Metric(metricName)] != wantVal2 { + r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v or %v", metricName, r.data[estats.Metric(metricName)], wantVal1, wantVal2) + } +} + +// PollForDataForMetric polls the metric data for the want. Fails if context +// provided expires before data for metric is found. +func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + r.mu.Lock() + if r.data[estats.Metric(metricName)] == wantVal { + r.mu.Unlock() + break + } + r.mu.Unlock() } + r.t.Fatalf("Timeout waiting for data %v for metric %v", wantVal, metricName) } type MetricsData struct { @@ -85,6 +134,10 @@ func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = float64(incr) } func (r *TestMetricsRecorder) WaitForFloat64Count(ctx context.Context, metricsDataWant MetricsData) { @@ -105,6 +158,10 @@ func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHand LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = incr } func (r *TestMetricsRecorder) WaitForInt64Histo(ctx context.Context, metricsDataWant MetricsData) { @@ -125,6 +182,10 @@ func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = float64(incr) } func (r *TestMetricsRecorder) WaitForFloat64Histo(ctx context.Context, metricsDataWant MetricsData) { @@ -145,6 +206,10 @@ func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHand LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = incr } func (r *TestMetricsRecorder) WaitForInt64Gauge(ctx context.Context, metricsDataWant MetricsData) { @@ -165,6 +230,10 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, }) + + r.mu.Lock() + defer r.mu.Unlock() + r.data[handle.Name] = float64(incr) } // To implement a stats.Handler, which allows it to be set as a dial option: diff --git a/internal/testutils/xds/e2e/setup/setup.go b/internal/testutils/xds/e2e/setup/setup.go new file mode 100644 index 000000000000..f7b34f669629 --- /dev/null +++ b/internal/testutils/xds/e2e/setup/setup.go @@ -0,0 +1,62 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package setup implements setup helpers for xDS e2e tests. +package setup + +import ( + "testing" + + "github.com/google/uuid" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" + _ "google.golang.org/grpc/xds" // Register the xds_resolver. +) + +// ManagementServerAndResolver sets up an xDS management server, creates +// bootstrap configuration pointing to that server and creates an xDS resolver +// using that configuration. +// +// Registers a cleanup function on t to stop the management server. +// +// Returns the following: +// - the xDS management server +// - the node ID to use when talking to this management server +// - bootstrap configuration to use (if creating an xDS-enabled gRPC server) +// - xDS resolver builder (if creating an xDS-enabled gRPC client) +func ManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) { + // Start an xDS management server. + xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address) + + // Create an xDS resolver with the above bootstrap configuration. + var r resolver.Builder + var err error + if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { + r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + } + + return xdsServer, nodeID, bc, r +} diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 0e8558aae72e..b598e0816097 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -18,16 +18,34 @@ package opentelemetry_test import ( "context" + "fmt" + "io" "testing" "time" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3clientsideweightedroundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/client_side_weighted_round_robin/v3" + v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/wrapperspb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" + itestutils "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + setup "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/orca" "google.golang.org/grpc/stats/opentelemetry" "google.golang.org/grpc/stats/opentelemetry/internal/testutils" @@ -47,10 +65,10 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -// setup creates a stub server with OpenTelemetry component configured on client +// setupStubServer creates a stub server with OpenTelemetry component configured on client // and server side. It returns a reader for metrics emitted from OpenTelemetry // component and the server. -func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { +func setupStubServer(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { reader := metric.NewManualReader() provider := metric.NewMeterProvider(metric.WithReader(reader)) ss := &stubserver.StubServer{ @@ -93,7 +111,7 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // Will allow duplex/any other type of RPC. return str != testgrpc.TestService_UnaryCall_FullMethodName } - reader, ss := setup(t, maf) + reader, ss := setupStubServer(t, maf) defer ss.Stop() // Make a Unary and Streaming RPC. The Unary RPC should be filtered by the @@ -178,7 +196,7 @@ func (s) TestMethodAttributeFilter(t *testing.T) { // on the Client (no StaticMethodCallOption set) and Server. The method // attribute on subsequent metrics should be bucketed in "other". func (s) TestAllMetricsOneFunction(t *testing.T) { - reader, ss := setup(t, nil) + reader, ss := setupStubServer(t, nil) defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -304,3 +322,253 @@ func (s) TestAllMetricsOneFunction(t *testing.T) { } } } + +// clusterWithLBConfiguration returns a cluster resource with the proto message +// passed Marshaled to an any and specified through the load_balancing_policy +// field. +func clusterWithLBConfiguration(t *testing.T, clusterName, edsServiceName string, secLevel e2e.SecurityLevel, m proto.Message) *v3clusterpb.Cluster { + cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel) + cluster.LoadBalancingPolicy = &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: itestutils.MarshalAny(t, m), + }, + }, + }, + } + return cluster +} + +// TestWRRMetrics tests the metrics emitted from the WRR LB Policy. It +// configures WRR as an endpoint picking policy through xDS on a ClientConn +// alongside an OpenTelemetry stats handler. It makes a few RPC's, and then +// sleeps for a bit to allow weight to expire. It then asserts OpenTelemetry +// metrics atoms are eventually present for all four WRR Metrics, alongside the +// correct target and locality label for each metric. +func (s) TestWRRMetrics(t *testing.T) { + cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder) + backend1 := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { + // Copy metrics from what the test set in cmr into r. + sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() + r.SetApplicationUtilization(sm.AppUtilization) + r.SetQPS(sm.QPS) + r.SetEPS(sm.EPS) + } + return &testpb.Empty{}, nil + }, + }, orca.CallMetricsServerOption(nil)) + port1 := itestutils.ParsePort(t, backend1.Address) + defer backend1.Stop() + + cmr.SetQPS(10.0) + cmr.SetApplicationUtilization(1.0) + + backend2 := stubserver.StartTestService(t, nil) + port2 := itestutils.ParsePort(t, backend2.Address) + defer backend2.Stop() + + const serviceName = "my-service-client-side-xds" + + // Start an xDS management server. + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) + + wrrConfig := &v3wrrlocalitypb.WrrLocality{ + EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: itestutils.MarshalAny(t, &v3clientsideweightedroundrobinpb.ClientSideWeightedRoundRobin{ + EnableOobLoadReport: &wrapperspb.BoolValue{ + Value: false, + }, + // BlackoutPeriod long enough to cause load report + // weight to trigger in the scope of test case. + // WeightExpirationPeriod will cause the load report + // weight for backend 1 to expire. + BlackoutPeriod: durationpb.New(5 * time.Millisecond), + WeightExpirationPeriod: durationpb.New(500 * time.Millisecond), + WeightUpdatePeriod: durationpb.New(time.Second), + ErrorUtilizationPenalty: &wrapperspb.FloatValue{Value: 1}, + }), + }, + }, + }, + }, + } + + routeConfigName := "route-" + serviceName + clusterName := "cluster-" + serviceName + endpointsName := "endpoints-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{clusterWithLBConfiguration(t, clusterName, endpointsName, e2e.SecurityLevelNone, wrrConfig)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: endpointsName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{{Port: port1}, {Port: port2}}, + Weight: 1, + }, + }, + })}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + + mo := opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"), + OptionalLabels: []string{"grpc.lb.locality"}, + } + + target := fmt.Sprintf("xds:///%s", serviceName) + cc, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver), opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})) + if err != nil { + t.Fatalf("Failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + + // Make 100 RPC's. One of these should hit backend 1 and trigger a per call + // load report, giving that SubConn a weight which will eventually expire. + // Two backends needed as for only one backend, WRR does not recompute the + // scheduler. + for i := 0; i < 100; i++ { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = %v, want ", err) + } + } + + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + // No need to poll for first assertion because WRR emits metrics + // synchronously on the first picker/scheduler update, which happens before + // first RPC finishes. + targetAttr := attribute.String("grpc.target", target) + localityAttr := attribute.String("grpc.lb.locality", `{"region":"region-1","zone":"zone-1","subZone":"subzone-1"}`) + + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.lb.wrr.rr_fallback", + Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.", + Unit: "update", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + + { + Name: "grpc.lb.wrr.endpoint_weight_not_yet_usable", + Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).", + Unit: "endpoint", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.lb.wrr.endpoint_weights", + Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.", + Unit: "endpoint", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + } + + // First three should immediately be present as happens synchronously from + // first scheduler update. + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + } + } + + // Sleep, then poll for 5 seconds for weight expiration metric. No more + // RPC's are being made, so weight should expire on a subsequent scheduler + // update. + time.Sleep(time.Second) + + eventuallyWantMetric := metricdata.Metrics{ + Name: "grpc.lb.wrr.endpoint_weight_stale", + Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.", + Unit: "endpoint", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + } + + // Poll for 5 seconds for stale metric to appear. + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + val, ok := gotMetrics[eventuallyWantMetric.Name] + if !ok { + continue + } + if !metricdatatest.AssertEqual(t, eventuallyWantMetric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("Metrics data type not equal for metric: %v", eventuallyWantMetric.Name) + } + break + } + + if ctx.Err() != nil { + t.Fatalf("Timeout waiting for metric %v", eventuallyWantMetric.Name) + } +} diff --git a/stats/opentelemetry/go.mod b/stats/opentelemetry/go.mod index d6af0becf5b4..549f296384e2 100644 --- a/stats/opentelemetry/go.mod +++ b/stats/opentelemetry/go.mod @@ -5,6 +5,7 @@ go 1.21 replace google.golang.org/grpc => ../.. require ( + github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155 github.com/google/go-cmp v0.6.0 go.opentelemetry.io/contrib/detectors/gcp v1.27.0 go.opentelemetry.io/otel v1.27.0 @@ -20,11 +21,12 @@ require ( cloud.google.com/go/compute/metadata v0.3.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect - github.com/envoyproxy/go-control-plane v0.12.1-0.20240621013728-1eb8caab5155 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect go.opentelemetry.io/otel/trace v1.27.0 // indirect golang.org/x/net v0.26.0 // indirect diff --git a/stats/opentelemetry/go.sum b/stats/opentelemetry/go.sum index 845f63cad7ee..1dfe121fa2ab 100644 --- a/stats/opentelemetry/go.sum +++ b/stats/opentelemetry/go.sum @@ -6,6 +6,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.23.0/go.mod h1:p2puVVSKjQ84Qb1gzw2XHLs34WQyHTYFZLaVxypAFYs= github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw= github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -21,6 +23,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/test/xds/xds_client_affinity_test.go b/test/xds/xds_client_affinity_test.go index a8439e0673db..e7db416a7156 100644 --- a/test/xds/xds_client_affinity_test.go +++ b/test/xds/xds_client_affinity_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -83,7 +84,7 @@ func ringhashCluster(clusterName, edsServiceName string) *v3clusterpb.Cluster { // propagated to pick the ring_hash policy. It doesn't test the affinity // behavior in ring_hash policy. func (s) TestClientSideAffinitySanityCheck(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() diff --git a/test/xds/xds_client_certificate_providers_test.go b/test/xds/xds_client_certificate_providers_test.go index 49bd00feba0c..3f705210dd33 100644 --- a/test/xds/xds_client_certificate_providers_test.go +++ b/test/xds/xds_client_certificate_providers_test.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -60,7 +61,7 @@ import ( // used on the client. func (s) TestClientSideXDS_WithNoCertificateProvidersInBootstrap_Success(t *testing.T) { // Spin up an xDS management server. - mgmtServer, nodeID, _, resolverBuilder := setupManagementServerAndResolver(t) + mgmtServer, nodeID, _, resolverBuilder := setup.ManagementServerAndResolver(t) // Spin up a test backend. server := stubserver.StartTestService(t, nil) diff --git a/test/xds/xds_client_custom_lb_test.go b/test/xds/xds_client_custom_lb_test.go index d0a5e56f0534..8d87a89753c7 100644 --- a/test/xds/xds_client_custom_lb_test.go +++ b/test/xds/xds_client_custom_lb_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/roundrobin" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/resolver" v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" @@ -222,7 +223,7 @@ func (s) TestWrrLocality(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // Start an xDS management server. - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) routeConfigName := "route-" + serviceName clusterName := "cluster-" + serviceName diff --git a/test/xds/xds_client_federation_test.go b/test/xds/xds_client_federation_test.go index 4d0346404d06..55d428d88928 100644 --- a/test/xds/xds_client_federation_test.go +++ b/test/xds/xds_client_federation_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" @@ -250,7 +251,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) { // server and actually making an RPC ensures that the xDS client is // configured properly, and when we dial with an unknown authority in the // next step, we can be sure that the error we receive is legitimate. - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() @@ -298,7 +299,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) { // with an authority which is not specified in the bootstrap configuration. The // test verifies that RPCs fail with an appropriate error. func (s) TestFederation_UnknownAuthorityInReceivedResponse(t *testing.T) { - mgmtServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + mgmtServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // LDS is old style name. // RDS is new style, with an unknown authority. diff --git a/test/xds/xds_client_integration_test.go b/test/xds/xds_client_integration_test.go index 150171dc8161..30b03cf927cd 100644 --- a/test/xds/xds_client_integration_test.go +++ b/test/xds/xds_client_integration_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/resolver" testgrpc "google.golang.org/grpc/interop/grpc_testing" @@ -84,7 +85,7 @@ func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, stri } func (s) TestClientSideXDS(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go index eb91b87b14ca..6ae920e07f5d 100644 --- a/test/xds/xds_client_outlier_detection_test.go +++ b/test/xds/xds_client_outlier_detection_test.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/peer" @@ -50,7 +51,7 @@ import ( // Detection balancer. This test verifies that an RPC is able to proceed // normally with this configuration. func (s) TestOutlierDetection_NoopConfig(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := &stubserver.StubServer{ EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, @@ -161,7 +162,7 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, // the unhealthy upstream is ejected, RPC's should regularly round robin across // all three upstreams. func (s) TestOutlierDetectionWithOutlier(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // Working backend 1. backend1 := stubserver.StartTestService(t, nil) diff --git a/test/xds/xds_client_retry_test.go b/test/xds/xds_client_retry_test.go index 78c1c95d462c..f4c4ed38d507 100644 --- a/test/xds/xds_client_retry_test.go +++ b/test/xds/xds_client_retry_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/wrapperspb" @@ -41,7 +42,7 @@ func (s) TestClientSideRetry(t *testing.T) { ctr := 0 errs := []codes.Code{codes.ResourceExhausted} - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, &stubserver.StubServer{ EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { diff --git a/test/xds/xds_rls_clusterspecifier_plugin_test.go b/test/xds/xds_rls_clusterspecifier_plugin_test.go index d0eb753a6fce..98c802fcf78e 100644 --- a/test/xds/xds_rls_clusterspecifier_plugin_test.go +++ b/test/xds/xds_rls_clusterspecifier_plugin_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/protobuf/types/known/durationpb" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" @@ -105,7 +106,7 @@ func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) { // Set up all components and configuration necessary - management server, // xDS resolver, fake RLS Server, and xDS configuration which specifies an // RLS Balancer that communicates to this set up fake RLS Server. - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() diff --git a/test/xds/xds_security_config_nack_test.go b/test/xds/xds_security_config_nack_test.go index 697d135f57c6..d14f9821411d 100644 --- a/test/xds/xds_security_config_nack_test.go +++ b/test/xds/xds_security_config_nack_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/resolver" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -147,7 +148,7 @@ func (s) TestUnmarshalListener_WithUpdateValidatorFunc(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() diff --git a/test/xds/xds_server_certificate_providers_test.go b/test/xds/xds_server_certificate_providers_test.go index 3359bc58d354..0932f318e1cd 100644 --- a/test/xds/xds_server_certificate_providers_test.go +++ b/test/xds/xds_server_certificate_providers_test.go @@ -34,6 +34,7 @@ import ( xdscreds "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds" "google.golang.org/protobuf/types/known/wrapperspb" @@ -57,7 +58,7 @@ import ( // credentials are getting used on the server. func (s) TestServerSideXDS_WithNoCertificateProvidersInBootstrap_Success(t *testing.T) { // Spin up an xDS management server. - mgmtServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + mgmtServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) // Spin up an xDS-enabled gRPC server that uses xDS credentials with // insecure fallback, and the above bootstrap configuration. diff --git a/test/xds/xds_server_integration_test.go b/test/xds/xds_server_integration_test.go index 08aed76137c2..1525cd1a65ee 100644 --- a/test/xds/xds_server_integration_test.go +++ b/test/xds/xds_server_integration_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" "google.golang.org/grpc/xds" @@ -143,7 +144,7 @@ func hostPortFromListener(lis net.Listener) (string, uint32, error) { // the client and the server. This results in both of them using the // configured fallback credentials (which is insecure creds in this case). func (s) TestServerSideXDS_Fallback(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() @@ -224,7 +225,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() diff --git a/test/xds/xds_server_rbac_test.go b/test/xds/xds_server_rbac_test.go index 13cce4f2ecf5..70000e2a8a2c 100644 --- a/test/xds/xds_server_rbac_test.go +++ b/test/xds/xds_server_rbac_test.go @@ -37,6 +37,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" @@ -59,7 +60,7 @@ import ( // (NonForwardingAction), and the RPC's matching those routes should proceed as // normal. func (s) TestServerSideXDS_RouteConfiguration(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() @@ -626,7 +627,7 @@ func (s) TestRBACHTTPFilter(t *testing.T) { } audit.RegisterLoggerBuilder(lb) - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() @@ -802,7 +803,7 @@ func serverListenerWithBadRouteConfiguration(t *testing.T, host string, port uin } func (s) TestRBACToggledOn_WithBadRouteConfiguration(t *testing.T) { - managementServer, nodeID, bootstrapContents, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, xdsResolver := setup.ManagementServerAndResolver(t) lis, cleanup2 := setupGRPCServer(t, bootstrapContents) defer cleanup2() diff --git a/test/xds/xds_server_serving_mode_test.go b/test/xds/xds_server_serving_mode_test.go index d68436e34597..40bc1f6898c0 100644 --- a/test/xds/xds_server_serving_mode_test.go +++ b/test/xds/xds_server_serving_mode_test.go @@ -31,6 +31,7 @@ import ( xdscreds "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/xds" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" @@ -43,7 +44,7 @@ import ( // change callback is not invoked and client connections to the server are not // recycled. func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()}) if err != nil { @@ -165,7 +166,7 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { // xDS enabled gRPC servers. It verifies that appropriate mode changes happen in // the server, and also verifies behavior of clientConns under these modes. func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) // Configure xDS credentials to be used on the server-side. creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{ diff --git a/test/xds/xds_server_test.go b/test/xds/xds_server_test.go index 20adfea7ab6f..3ede7af3cb0e 100644 --- a/test/xds/xds_server_test.go +++ b/test/xds/xds_server_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" "google.golang.org/grpc/status" "google.golang.org/grpc/xds" @@ -55,7 +56,7 @@ var ( // dynamically, and subsequent RPC's on that connection should start failing // with status code UNAVAILABLE. func (s) TestServeLDSRDS(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) lis, err := testutils.LocalTCPListener() if err != nil { @@ -165,7 +166,7 @@ func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.Clie // serving, successfully Accept Connections, and fail at the L7 level with a // certain error message. func (s) TestRDSNack(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) @@ -235,7 +236,7 @@ func (s) TestRDSNack(t *testing.T) { // RPC's will match to). This configuration should eventually be represented in // the Server's state, and RPCs should proceed successfully. func (s) TestMultipleUpdatesImmediatelySwitch(t *testing.T) { - managementServer, nodeID, bootstrapContents, _ := setupManagementServerAndResolver(t) + managementServer, nodeID, bootstrapContents, _ := setup.ManagementServerAndResolver(t) lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) diff --git a/test/xds/xds_telemetry_labels_test.go b/test/xds/xds_telemetry_labels_test.go index 9068273628ee..7a0e76227a32 100644 --- a/test/xds/xds_telemetry_labels_test.go +++ b/test/xds/xds_telemetry_labels_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e/setup" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/stats" @@ -53,7 +54,7 @@ const localityValue = `{"region":"region-1","zone":"zone-1","subZone":"subzone-1 // handler asserts that subsequent HandleRPC calls from the RPC lifecycle // contain telemetry labels that it can see. func (s) TestTelemetryLabels(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) server := stubserver.StartTestService(t, nil) defer server.Stop() From 9a1fb9b69950a7c40864e72e3f73136be434f41d Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 2 Aug 2024 15:29:42 -0400 Subject: [PATCH 2/5] Responded to some comments --- balancer/weightedroundrobin/balancer_test.go | 18 +-- stats/opentelemetry/e2e_test.go | 122 +++++++++++-------- 2 files changed, 72 insertions(+), 68 deletions(-) diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index 9579cdc99862..e1965d008060 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -555,8 +555,7 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { cfg := oobConfig cfg.BlackoutPeriod = tc.blackoutPeriodCfg sc := svcConfig(t, cfg) - mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) - if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { t.Fatalf("Error starting client: %v", err) } addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} @@ -583,20 +582,12 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) { // Wait for the weight update period to allow the new weights to be processed. time.Sleep(weightUpdatePeriod) checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}) - - mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) - // Either 10 or 100, dependent on whatever ordering SubConns are - // processed, which is nondeterministic. - mr.AssertEitherDataForMetric("grpc.lb.wrr.endpoint_weights", 10, 100) } } // Tests that the weight expiration period causes backends to use 0 as their // weight (meaning to use the average weight) once the expiration period -// elapses. After the weight expires, the expected metrics to be emitted from -// WRR are also configured. +// elapses. func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -661,11 +652,6 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { // Wait for the weight expiration period so the weights have expired. time.Sleep(weightUpdatePeriod) checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1}) - - mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) } // Tests logic surrounding subchannel management. diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index b598e0816097..1260ac4609ab 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -19,6 +19,7 @@ package opentelemetry_test import ( "context" "fmt" + "google.golang.org/grpc/internal/grpcsync" "io" "testing" @@ -340,6 +341,18 @@ func clusterWithLBConfiguration(t *testing.T, clusterName, edsServiceName string return cluster } +func metricsDataFromReader(ctx context.Context, reader *metric.ManualReader) map[string]metricdata.Metrics { + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + return gotMetrics +} + // TestWRRMetrics tests the metrics emitted from the WRR LB Policy. It // configures WRR as an endpoint picking policy through xDS on a ClientConn // alongside an OpenTelemetry stats handler. It makes a few RPC's, and then @@ -366,7 +379,18 @@ func (s) TestWRRMetrics(t *testing.T) { cmr.SetQPS(10.0) cmr.SetApplicationUtilization(1.0) - backend2 := stubserver.StartTestService(t, nil) + backend2 := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { + // Copy metrics from what the test set in cmr into r. + sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() + r.SetApplicationUtilization(sm.AppUtilization) + r.SetQPS(sm.QPS) + r.SetEPS(sm.EPS) + } + return &testpb.Empty{}, nil + }, + }, orca.CallMetricsServerOption(nil)) port2 := itestutils.ParsePort(t, backend2.Address) defer backend2.Stop() @@ -443,28 +467,22 @@ func (s) TestWRRMetrics(t *testing.T) { client := testgrpc.NewTestServiceClient(cc) - // Make 100 RPC's. One of these should hit backend 1 and trigger a per call - // load report, giving that SubConn a weight which will eventually expire. - // Two backends needed as for only one backend, WRR does not recompute the + // Make 100 RPC's. The two backends will send back load reports per call + // giving the two SubChannels weights which will eventually expire. Two + // backends needed as for only one backend, WRR does not recompute the // scheduler. - for i := 0; i < 100; i++ { - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() = %v, want ", err) - } - } - - rm := &metricdata.ResourceMetrics{} - reader.Collect(ctx, rm) - gotMetrics := map[string]metricdata.Metrics{} - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - gotMetrics[m.Name] = m + receivedExpectedMetrics := grpcsync.NewEvent() + go func() { + for i := 0; i < 100; i++ { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = %v, want ", err) + } + if receivedExpectedMetrics.HasFired() { + break + } } - } + }() - // No need to poll for first assertion because WRR emits metrics - // synchronously on the first picker/scheduler update, which happens before - // first RPC finishes. targetAttr := attribute.String("grpc.target", target) localityAttr := attribute.String("grpc.lb.locality", `{"region":"region-1","zone":"zone-1","subZone":"subzone-1"}`) @@ -515,23 +533,13 @@ func (s) TestWRRMetrics(t *testing.T) { }, } - // First three should immediately be present as happens synchronously from - // first scheduler update. - for _, metric := range wantMetrics { - val, ok := gotMetrics[metric.Name] - if !ok { - t.Fatalf("Metric %v not present in recorded metrics", metric.Name) - } - if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) - } + if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil { + t.Fatal(err) } + receivedExpectedMetrics.Fire() - // Sleep, then poll for 5 seconds for weight expiration metric. No more - // RPC's are being made, so weight should expire on a subsequent scheduler - // update. - time.Sleep(time.Second) - + // Poll for 5 seconds for weight expiration metric. No more RPC's are being + // made, so weight should expire on a subsequent scheduler update. eventuallyWantMetric := metricdata.Metrics{ Name: "grpc.lb.wrr.endpoint_weight_stale", Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.", @@ -548,27 +556,37 @@ func (s) TestWRRMetrics(t *testing.T) { }, } - // Poll for 5 seconds for stale metric to appear. + if ctx.Err() != nil { + t.Fatalf("Timeout waiting for metric %v", eventuallyWantMetric.Name) + } + + if err := pollForWantMetrics(ctx, t, reader, []metricdata.Metrics{eventuallyWantMetric}); err != nil { + t.Fatal(err) + } +} + +// pollForWantMetrics polls for the wantMetrics to show up on reader. Returns an +// error if metric is present but not equal to expected, or if the wantMetrics +// do not show up during the context timeout. +func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.ManualReader, wantMetrics []metricdata.Metrics) error { for ; ctx.Err() == nil; <-time.After(time.Millisecond) { - rm := &metricdata.ResourceMetrics{} - reader.Collect(ctx, rm) - gotMetrics := map[string]metricdata.Metrics{} - for _, sm := range rm.ScopeMetrics { - for _, m := range sm.Metrics { - gotMetrics[m.Name] = m + gotMetrics := metricsDataFromReader(ctx, reader) + containsAllMetrics := true + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + containsAllMetrics = false + break + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + return fmt.Errorf("metrics data type not equal for metric: %v", metric.Name) } } - val, ok := gotMetrics[eventuallyWantMetric.Name] - if !ok { - continue - } - if !metricdatatest.AssertEqual(t, eventuallyWantMetric, val, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { - t.Fatalf("Metrics data type not equal for metric: %v", eventuallyWantMetric.Name) + if containsAllMetrics { + return nil } - break + time.Sleep(5 * time.Millisecond) } - if ctx.Err() != nil { - t.Fatalf("Timeout waiting for metric %v", eventuallyWantMetric.Name) - } + return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err()) } From 6ff2e00074a30abcd4dc74d26f080c23ad88e827 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 2 Aug 2024 15:32:33 -0400 Subject: [PATCH 3/5] Add unit tests for WRR Metrics --- balancer/weightedroundrobin/balancer.go | 2 +- balancer/weightedroundrobin/metrics_test.go | 136 ++++++++++++++++++ .../testutils/stats/test_metrics_recorder.go | 10 ++ stats/opentelemetry/e2e_test.go | 6 +- 4 files changed, 149 insertions(+), 5 deletions(-) create mode 100644 balancer/weightedroundrobin/metrics_test.go diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index 38787a3bcd85..ed241124219e 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -595,7 +595,7 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect // account the parameters. Returns 0 for blacked out or expired data, which // will cause the backend weight to be treated as the mean of the weights of the // other backends. If forScheduler is set to true, this function will emit -// metrics through the mtrics registry. +// metrics through the metrics registry. func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration, recordMetrics bool) (weight float64) { w.mu.Lock() defer w.mu.Unlock() diff --git a/balancer/weightedroundrobin/metrics_test.go b/balancer/weightedroundrobin/metrics_test.go new file mode 100644 index 000000000000..ba22777e16cb --- /dev/null +++ b/balancer/weightedroundrobin/metrics_test.go @@ -0,0 +1,136 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package weightedroundrobin + +import ( + "testing" + "time" + + "google.golang.org/grpc/internal/grpctest" + iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/testutils/stats" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestWRR_Metrics_SubConnWeight tests different scenarios for the weight call +// on a weighted SubConn, and expects certain metrics for each of these +// scenarios. +func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { + tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + + wsc := &weightedSubConn{ + metricsRecorder: tmr, + weightVal: 3, + } + + // The weighted SubConn's lastUpdated field hasn't been set, so this + // SubConn's weight is not yet usable. Thus, should emit that endpoint + // weight is not yet usable, and 0 for weight. + wsc.weight(time.Now(), time.Second, time.Second, true) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0. + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) + // Unusable, so no endpoint weight (i.e. 0). + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) + tmr.ClearMetrics() + + // Setup a scenario where the SubConn's weight expires. Thus, should emit + // that endpoint weight is stale, and 0 for weight. + wsc.lastUpdated = time.Now() + wsc.weight(time.Now().Add(100*time.Second), 2*time.Second, time.Second, true) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) + // Unusable, so no endpoint weight (i.e. 0). + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) + tmr.ClearMetrics() + + // Setup a scenario where the SubConn's weight is in the blackout period. + // Thus, should emit that endpoint weight is not yet usable, and 0 for + // weight. + wsc.weight(time.Now(), time.Minute, 10*time.Second, true) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) + // Unusable, so no endpoint weight (i.e. 0). + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) + tmr.ClearMetrics() + + // Setup a scenario where SubConn's weight is what is persists in weight + // field. This is triggered by last update being past blackout period and + // before weight update period. Should thus emit that endpoint weight is 3, + // and no other metrics. + wsc.nonEmptySince = time.Now() + wsc.weight(time.Now().Add(10*time.Second), time.Minute, time.Second, true) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 3) + tmr.ClearMetrics() + + // Setup a scenario where a SubConn's weight both expires and is within the + // blackout period. In this case, weight expiry should take precedence with + // respect to metrics emitted. Thus, should emit that endpoint weight is not + // yet usable, and 0 for weight. + wsc.weight(time.Now().Add(10*time.Second), time.Second, time.Minute, true) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) + tmr.ClearMetrics() +} + +// TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric +// for scheduler updates. It tests the case with one SubConn, and two SubConns +// with no weights. Both of these should emit a count metric for round robin +// fallback. +func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) { + tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + wsc := &weightedSubConn{ + metricsRecorder: tmr, + weightVal: 0, + } + + p := &picker{ + cfg: &lbConfig{ + BlackoutPeriod: iserviceconfig.Duration(10 * time.Second), + WeightExpirationPeriod: iserviceconfig.Duration(3 * time.Minute), + }, + subConns: []*weightedSubConn{wsc}, + metricsRecorder: tmr, + } + // There is only one SubConn, so no matter if the SubConn has a weight or + // not will fallback to round robin. + p.regenerateScheduler() + tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) + tmr.ClearMetrics() + + // With two SubConns, if neither of them have weights, it will also fallback + // to round robin. + wsc2 := &weightedSubConn{ + target: "target", + metricsRecorder: tmr, + weightVal: 0, + } + p.subConns = append(p.subConns, wsc2) + p.regenerateScheduler() + tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) +} diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index e0634ae5c41f..ff8e41cf47ac 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -104,6 +104,16 @@ func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricNa r.t.Fatalf("Timeout waiting for data %v for metric %v", wantVal, metricName) } +// ClearMetrics clears the metrics data stores of the test metrics recorder by +// setting all the data to 0. +func (r *TestMetricsRecorder) ClearMetrics() { + r.mu.Lock() + defer r.mu.Unlock() + for metric := range r.data { + r.data[metric] = 0 + } +} + type MetricsData struct { Handle *estats.MetricDescriptor diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 1260ac4609ab..7d64ae4841d3 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -19,7 +19,6 @@ package opentelemetry_test import ( "context" "fmt" - "google.golang.org/grpc/internal/grpcsync" "io" "testing" @@ -39,6 +38,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" itestutils "google.golang.org/grpc/internal/testutils" @@ -474,9 +474,7 @@ func (s) TestWRRMetrics(t *testing.T) { receivedExpectedMetrics := grpcsync.NewEvent() go func() { for i := 0; i < 100; i++ { - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() = %v, want ", err) - } + client.EmptyCall(ctx, &testpb.Empty{}) if receivedExpectedMetrics.HasFired() { break } From 761b7293d6290ce194ed326be5f045de255deeae Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 6 Aug 2024 20:14:14 -0400 Subject: [PATCH 4/5] Responded to Doug's comments --- balancer/weightedroundrobin/balancer_test.go | 3 +- balancer/weightedroundrobin/metrics_test.go | 137 +++++++++++------- .../testutils/stats/test_metrics_recorder.go | 23 +-- stats/opentelemetry/e2e_test.go | 10 +- test/xds/xds_client_integration_test.go | 35 ----- test/xds/xds_client_outlier_detection_test.go | 2 +- 6 files changed, 94 insertions(+), 116 deletions(-) diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index e1965d008060..b994322dc52e 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -623,8 +623,7 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) { cfg := oobConfig cfg.OOBReportingPeriod = stringp("60s") sc := svcConfig(t, cfg) - mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) - if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { + if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil { t.Fatalf("Error starting client: %v", err) } addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}} diff --git a/balancer/weightedroundrobin/metrics_test.go b/balancer/weightedroundrobin/metrics_test.go index ba22777e16cb..e6079d67d5f1 100644 --- a/balancer/weightedroundrobin/metrics_test.go +++ b/balancer/weightedroundrobin/metrics_test.go @@ -39,63 +39,94 @@ func Test(t *testing.T) { // on a weighted SubConn, and expects certain metrics for each of these // scenarios. func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { - tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) - - wsc := &weightedSubConn{ - metricsRecorder: tmr, - weightVal: 3, + tests := []struct { + name string + weightExpirationPeriod time.Duration + blackoutPeriod time.Duration + lastUpdatedSet bool + nonEmptySet bool + nowTime time.Time + endpointWeightStaleWant float64 + endpointWeightNotYetUsableWant float64 + endpointWeightWant float64 + }{ + // The weighted SubConn's lastUpdated field hasn't been set, so this + // SubConn's weight is not yet usable. Thus, should emit that endpoint + // weight is not yet usable, and 0 for weight. + { + name: "no weight set", + weightExpirationPeriod: time.Second, + blackoutPeriod: time.Second, + nowTime: time.Now(), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 1, + endpointWeightWant: 0, + }, + { + name: "weight expiration", + lastUpdatedSet: true, + weightExpirationPeriod: 2 * time.Second, + blackoutPeriod: time.Second, + nowTime: time.Now().Add(100 * time.Second), + endpointWeightStaleWant: 1, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 0, + }, + { + name: "in blackout period", + lastUpdatedSet: true, + weightExpirationPeriod: time.Minute, + blackoutPeriod: 10 * time.Second, + nowTime: time.Now(), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 1, + endpointWeightWant: 0, + }, + { + name: "normal weight", + lastUpdatedSet: true, + nonEmptySet: true, + weightExpirationPeriod: time.Minute, + blackoutPeriod: time.Second, + nowTime: time.Now().Add(10 * time.Second), + endpointWeightStaleWant: 0, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 3, + }, + { + name: "weight expiration takes precdedence over blackout", + lastUpdatedSet: true, + nonEmptySet: true, + weightExpirationPeriod: time.Second, + blackoutPeriod: time.Minute, + nowTime: time.Now().Add(10 * time.Second), + endpointWeightStaleWant: 1, + endpointWeightNotYetUsableWant: 0, + endpointWeightWant: 0, + }, } - // The weighted SubConn's lastUpdated field hasn't been set, so this - // SubConn's weight is not yet usable. Thus, should emit that endpoint - // weight is not yet usable, and 0 for weight. - wsc.weight(time.Now(), time.Second, time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0. - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) - // Unusable, so no endpoint weight (i.e. 0). - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() - - // Setup a scenario where the SubConn's weight expires. Thus, should emit - // that endpoint weight is stale, and 0 for weight. - wsc.lastUpdated = time.Now() - wsc.weight(time.Now().Add(100*time.Second), 2*time.Second, time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) - // Unusable, so no endpoint weight (i.e. 0). - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() - - // Setup a scenario where the SubConn's weight is in the blackout period. - // Thus, should emit that endpoint weight is not yet usable, and 0 for - // weight. - wsc.weight(time.Now(), time.Minute, 10*time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) - // Unusable, so no endpoint weight (i.e. 0). - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + wsc := &weightedSubConn{ + metricsRecorder: tmr, + weightVal: 3, + } + if test.lastUpdatedSet { + wsc.lastUpdated = time.Now() + } + if test.nonEmptySet { + wsc.nonEmptySince = time.Now() + } + wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true) - // Setup a scenario where SubConn's weight is what is persists in weight - // field. This is triggered by last update being past blackout period and - // before weight update period. Should thus emit that endpoint weight is 3, - // and no other metrics. - wsc.nonEmptySince = time.Now() - wsc.weight(time.Now().Add(10*time.Second), time.Minute, time.Second, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 3) - tmr.ClearMetrics() + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant) + tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant) + }) + } - // Setup a scenario where a SubConn's weight both expires and is within the - // blackout period. In this case, weight expiry should take precedence with - // respect to metrics emitted. Thus, should emit that endpoint weight is not - // yet usable, and 0 for weight. - wsc.weight(time.Now().Add(10*time.Second), time.Second, time.Minute, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) - tmr.ClearMetrics() } // TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index ff8e41cf47ac..580c538caa2f 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -53,11 +53,11 @@ func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder tmr := &TestMetricsRecorder{ t: t, - intCountCh: testutils.NewChannelWithSize(1000), - floatCountCh: testutils.NewChannelWithSize(1000), - intHistoCh: testutils.NewChannelWithSize(1000), - floatHistoCh: testutils.NewChannelWithSize(1000), - intGaugeCh: testutils.NewChannelWithSize(1000), + intCountCh: testutils.NewChannelWithSize(10), + floatCountCh: testutils.NewChannelWithSize(10), + intHistoCh: testutils.NewChannelWithSize(10), + floatHistoCh: testutils.NewChannelWithSize(10), + intGaugeCh: testutils.NewChannelWithSize(10), data: make(map[estats.Metric]float64), } @@ -79,17 +79,6 @@ func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal flo } } -// AssertEitherDataForMetric asserts either data point is present for metric. -// The zero value in the check is equivalent to unset. - -func (r *TestMetricsRecorder) AssertEitherDataForMetric(metricName string, wantVal1 float64, wantVal2 float64) { - r.mu.Lock() - defer r.mu.Unlock() - if r.data[estats.Metric(metricName)] != wantVal1 && r.data[estats.Metric(metricName)] != wantVal2 { - r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v or %v", metricName, r.data[estats.Metric(metricName)], wantVal1, wantVal2) - } -} - // PollForDataForMetric polls the metric data for the want. Fails if context // provided expires before data for metric is found. func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) { @@ -97,7 +86,7 @@ func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricNa r.mu.Lock() if r.data[estats.Metric(metricName)] == wantVal { r.mu.Unlock() - break + return } r.mu.Unlock() } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 7d64ae4841d3..e56c0fe94805 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -473,11 +473,9 @@ func (s) TestWRRMetrics(t *testing.T) { // scheduler. receivedExpectedMetrics := grpcsync.NewEvent() go func() { - for i := 0; i < 100; i++ { + for !receivedExpectedMetrics.HasFired() { client.EmptyCall(ctx, &testpb.Empty{}) - if receivedExpectedMetrics.HasFired() { - break - } + time.Sleep(2 * time.Millisecond) } }() @@ -554,10 +552,6 @@ func (s) TestWRRMetrics(t *testing.T) { }, } - if ctx.Err() != nil { - t.Fatalf("Timeout waiting for metric %v", eventuallyWantMetric.Name) - } - if err := pollForWantMetrics(ctx, t, reader, []metricdata.Metrics{eventuallyWantMetric}); err != nil { t.Fatal(err) } diff --git a/test/xds/xds_client_integration_test.go b/test/xds/xds_client_integration_test.go index 30b03cf927cd..f3b5d6e6ec43 100644 --- a/test/xds/xds_client_integration_test.go +++ b/test/xds/xds_client_integration_test.go @@ -24,16 +24,13 @@ import ( "testing" "time" - "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/testutils/xds/e2e/setup" - "google.golang.org/grpc/resolver" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" @@ -52,38 +49,6 @@ const ( defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ) -// setupManagementServerAndResolver sets up an xDS management server, creates -// bootstrap configuration pointing to that server and creates an xDS resolver -// using that configuration. -// -// Registers a cleanup function on t to stop the management server. -// -// Returns the following: -// - the xDS management server -// - the node ID to use when talking to this management server -// - bootstrap configuration to use (if creating an xDS-enabled gRPC server) -// - xDS resolver builder (if creating an xDS-enabled gRPC client) -func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) { - // Start an xDS management server. - xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) - - // Create bootstrap configuration pointing to the above management server. - nodeID := uuid.New().String() - bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address) - - // Create an xDS resolver with the above bootstrap configuration. - var r resolver.Builder - var err error - if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { - r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) - if err != nil { - t.Fatalf("Failed to create xDS resolver for testing: %v", err) - } - } - - return xdsServer, nodeID, bc, r -} - func (s) TestClientSideXDS(t *testing.T) { managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go index 6ae920e07f5d..4df142f4a7c1 100644 --- a/test/xds/xds_client_outlier_detection_test.go +++ b/test/xds/xds_client_outlier_detection_test.go @@ -243,7 +243,7 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) { // Detection present in the CDS update, but with SuccessRateEjection unset, and // asserts that Outlier Detection is turned on and ejects upstreams. func (s) TestOutlierDetectionXDSDefaultOn(t *testing.T) { - managementServer, nodeID, _, xdsResolver := setupManagementServerAndResolver(t) + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // Working backend 1. backend1 := stubserver.StartTestService(t, nil) From 2ceb6e68898b2f7038bc3fbb69763b73027fd981 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Fri, 9 Aug 2024 19:25:25 -0400 Subject: [PATCH 5/5] Responded to Doug's comments --- balancer/weightedroundrobin/balancer_test.go | 2 +- balancer/weightedroundrobin/metrics_test.go | 28 ++++++++----------- internal/stats/metrics_recorder_list_test.go | 4 +-- .../testutils/stats/test_metrics_recorder.go | 10 ++----- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index b994322dc52e..6ffddc0a7739 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -224,7 +224,7 @@ func (s) TestWRRMetricsBasic(t *testing.T) { srv := startServer(t, reportCall) sc := svcConfig(t, testMetricsConfig) - mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + mr := stats.NewTestMetricsRecorder(t) if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { t.Fatalf("Error starting client: %v", err) } diff --git a/balancer/weightedroundrobin/metrics_test.go b/balancer/weightedroundrobin/metrics_test.go index e6079d67d5f1..0ff43dc26e52 100644 --- a/balancer/weightedroundrobin/metrics_test.go +++ b/balancer/weightedroundrobin/metrics_test.go @@ -43,8 +43,8 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { name string weightExpirationPeriod time.Duration blackoutPeriod time.Duration - lastUpdatedSet bool - nonEmptySet bool + lastUpdated time.Time + nonEmpty time.Time nowTime time.Time endpointWeightStaleWant float64 endpointWeightNotYetUsableWant float64 @@ -64,7 +64,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { }, { name: "weight expiration", - lastUpdatedSet: true, + lastUpdated: time.Now(), weightExpirationPeriod: 2 * time.Second, blackoutPeriod: time.Second, nowTime: time.Now().Add(100 * time.Second), @@ -74,7 +74,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { }, { name: "in blackout period", - lastUpdatedSet: true, + lastUpdated: time.Now(), weightExpirationPeriod: time.Minute, blackoutPeriod: 10 * time.Second, nowTime: time.Now(), @@ -84,8 +84,8 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { }, { name: "normal weight", - lastUpdatedSet: true, - nonEmptySet: true, + lastUpdated: time.Now(), + nonEmpty: time.Now(), weightExpirationPeriod: time.Minute, blackoutPeriod: time.Second, nowTime: time.Now().Add(10 * time.Second), @@ -95,8 +95,8 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { }, { name: "weight expiration takes precdedence over blackout", - lastUpdatedSet: true, - nonEmptySet: true, + lastUpdated: time.Now(), + nonEmpty: time.Now(), weightExpirationPeriod: time.Second, blackoutPeriod: time.Minute, nowTime: time.Now().Add(10 * time.Second), @@ -108,16 +108,12 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + tmr := stats.NewTestMetricsRecorder(t) wsc := &weightedSubConn{ metricsRecorder: tmr, weightVal: 3, - } - if test.lastUpdatedSet { - wsc.lastUpdated = time.Now() - } - if test.nonEmptySet { - wsc.nonEmptySince = time.Now() + lastUpdated: test.lastUpdated, + nonEmptySince: test.nonEmpty, } wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true) @@ -134,7 +130,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { // with no weights. Both of these should emit a count metric for round robin // fallback. func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) { - tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"}) + tmr := stats.NewTestMetricsRecorder(t) wsc := &weightedSubConn{ metricsRecorder: tmr, weightVal: 0, diff --git a/internal/stats/metrics_recorder_list_test.go b/internal/stats/metrics_recorder_list_test.go index b09ad043ce65..c58266a31bf8 100644 --- a/internal/stats/metrics_recorder_list_test.go +++ b/internal/stats/metrics_recorder_list_test.go @@ -144,8 +144,8 @@ func (s) TestMetricsRecorderList(t *testing.T) { // Create two stats.Handlers which also implement MetricsRecorder, configure // one as a global dial option and one as a local dial option. - mr1 := stats.NewTestMetricsRecorder(t, []string{}) - mr2 := stats.NewTestMetricsRecorder(t, []string{}) + mr1 := stats.NewTestMetricsRecorder(t) + mr2 := stats.NewTestMetricsRecorder(t) defer internal.ClearGlobalDialOptions() internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(mr1)) diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index 580c538caa2f..f36089d47ff5 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -49,7 +49,7 @@ type TestMetricsRecorder struct { data map[estats.Metric]float64 } -func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder { +func NewTestMetricsRecorder(t *testing.T) *TestMetricsRecorder { tmr := &TestMetricsRecorder{ t: t, @@ -62,10 +62,6 @@ func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder data: make(map[estats.Metric]float64), } - for _, metric := range metrics { - tmr.data[estats.Metric(metric)] = 0 - } - return tmr } @@ -98,9 +94,7 @@ func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricNa func (r *TestMetricsRecorder) ClearMetrics() { r.mu.Lock() defer r.mu.Unlock() - for metric := range r.data { - r.data[metric] = 0 - } + r.data = make(map[estats.Metric]float64) } type MetricsData struct {