Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ var (
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_entries",
Description: "EXPERIMENTAL. Number of entries in the RLS cache.",
Unit: "entry",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
Default: false,
})
cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
Name: "grpc.lb.rls.cache_size",
Description: "EXPERIMENTAL. The current size of the RLS cache.",
Unit: "By",
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
Default: false,
})
defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.rls.default_target_picks",
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
Expand Down Expand Up @@ -126,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String())
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
Expand Down Expand Up @@ -317,18 +331,17 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.stateMu.Unlock()
<-done

// We cannot do cache operations above because `cacheMu` needs to be grabbed
// before `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.updateRLSServerTarget(newCfg.lookupService)
if resizeCache {
// If the new config changes reduces the size of the data cache, we
// might have to evict entries to get the cache size down to the newly
// specified size.
//
// And we cannot do this operation above (where we compute the
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
b.cacheMu.Unlock()
return nil
}

Expand Down
49 changes: 36 additions & 13 deletions balancer/rls/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"container/list"
"time"

"github.com/google/uuid"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/backoff"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -163,22 +165,40 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
//
// It is not safe for concurrent access.
type dataCache struct {
maxSize int64 // Maximum allowed size.
currentSize int64 // Current size.
keys *lru // Cache keys maintained in lru order.
entries map[cacheKey]*cacheEntry
logger *internalgrpclog.PrefixLogger
shutdown *grpcsync.Event
maxSize int64 // Maximum allowed size.
currentSize int64 // Current size.
keys *lru // Cache keys maintained in lru order.
entries map[cacheKey]*cacheEntry
logger *internalgrpclog.PrefixLogger
shutdown *grpcsync.Event
rlsServerTarget string

// Read only after initialization.
grpcTarget string
uuid string
metricsRecorder estats.MetricsRecorder
}

func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache {
return &dataCache{
maxSize: size,
keys: newLRU(),
entries: make(map[cacheKey]*cacheEntry),
logger: logger,
shutdown: grpcsync.NewEvent(),
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, metricsRecorder estats.MetricsRecorder, grpcTarget string) *dataCache {
dc := &dataCache{
maxSize: size,
keys: newLRU(),
entries: make(map[cacheKey]*cacheEntry),
logger: logger,
shutdown: grpcsync.NewEvent(),
grpcTarget: grpcTarget,
uuid: uuid.New().String(),
metricsRecorder: metricsRecorder,
}
cacheSizeMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid)
cacheEntriesMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid)
Comment on lines +193 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really required? The grpc.lb.rls.server_target is empty here. So, will this measurement even be useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. This gives a measurement at the beginning that the cache is current of 0 (vs unset and not showing up). This is a gauge, so just the most recent state of the system, but I think it makes sense to at construction time give the state that it's empty).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline with Doug and Yash, at first we thought leave it but then they mused about the same point you brought up, which is this gauge as written will live around the lifetime of the binary with an empty target. So this is actually a valid correctness issue. Thanks for catching this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I brought this up in the Observability thread alongside another one of my concerns, but this case is the main one that is seen as a correctness issue. If the rls server target changes over the lifetime of the balancer, it's up to exporter to have logic for the same uuid gauge with a different rls server target. Yash mentioned dashboards can group on uuid, and then see the rls server target change over time, so WAI.

return dc
}

// updateRLSServerTarget updates the RLS Server Target the RLS Balancer is
// configured with.
func (dc *dataCache) updateRLSServerTarget(rlsServerTarget string) {
dc.rlsServerTarget = rlsServerTarget
}

// resize changes the maximum allowed size of the data cache.
Expand Down Expand Up @@ -319,6 +339,7 @@ func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) {
dc.currentSize -= entry.size
entry.size = newSize
dc.currentSize += entry.size
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
}

func (dc *dataCache) getEntry(key cacheKey) *cacheEntry {
Expand Down Expand Up @@ -351,6 +372,8 @@ func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) {
delete(dc.entries, key)
dc.currentSize -= entry.size
dc.keys.removeEntry(key)
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
}

func (dc *dataCache) stop() {
Expand Down
11 changes: 6 additions & 5 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/testutils/stats"
)

var (
Expand Down Expand Up @@ -119,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) {

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -133,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) {

func (s) TestDataCache_AddForcesResize(t *testing.T) {
initCacheEntries()
dc := newDataCache(1, nil)
dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "")

// The first entry in cacheEntries has a minimum expiry time in the future.
// This entry would stop the resize operation since we do not evict entries
Expand Down Expand Up @@ -162,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) {

func (s) TestDataCache_Resize(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down Expand Up @@ -193,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) {

func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -220,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
}

initCacheEntries()
dc := newDataCache(5, nil)
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down
14 changes: 14 additions & 0 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,17 @@ func (r *TestMetricsRecorder) TagConn(ctx context.Context, _ *stats.ConnTagInfo)
}

func (r *TestMetricsRecorder) HandleConn(context.Context, stats.ConnStats) {}

// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent
// nil panics.
type NoopMetricsRecorder struct{}

func (r *NoopMetricsRecorder) RecordInt64Count(*estats.Int64CountHandle, int64, ...string) {}

func (r *NoopMetricsRecorder) RecordFloat64Count(*estats.Float64CountHandle, float64, ...string) {}

func (r *NoopMetricsRecorder) RecordInt64Histo(*estats.Int64HistoHandle, int64, ...string) {}

func (r *NoopMetricsRecorder) RecordFloat64Histo(*estats.Float64HistoHandle, float64, ...string) {}

func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64, ...string) {}