@@ -652,6 +652,179 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
652652 verifyRLSRequest (t , rlsReqCh , true )
653653}
654654
655+ // Test that when a data cache entry is evicted due to config change
656+ // in cache size, the picker is updated accordingly.
657+ func (s ) TestPickerUpdateOnDataCacheSizeDecrease (t * testing.T ) {
658+ // Override the clientConn update hook to get notified.
659+ clientConnUpdateDone := make (chan struct {}, 1 )
660+ origClientConnUpdateHook := clientConnUpdateHook
661+ clientConnUpdateHook = func () { clientConnUpdateDone <- struct {}{} }
662+ defer func () { clientConnUpdateHook = origClientConnUpdateHook }()
663+
664+ // Override the cache entry size func, and always return 1.
665+ origEntrySizeFunc := computeDataCacheEntrySize
666+ computeDataCacheEntrySize = func (cacheKey , * cacheEntry ) int64 { return 1 }
667+ defer func () { computeDataCacheEntrySize = origEntrySizeFunc }()
668+
669+ // Override the backoff strategy to return a large backoff which
670+ // will make sure the date cache entry remains in backoff for the
671+ // duration of the test.
672+ origBackoffStrategy := defaultBackoffStrategy
673+ defaultBackoffStrategy = & fakeBackoffStrategy {backoff : defaultTestTimeout }
674+ defer func () { defaultBackoffStrategy = origBackoffStrategy }()
675+
676+ // Override the minEvictionDuration to ensure that when the config update
677+ // reduces the cache size, the resize operation is not stopped because
678+ // we find an entry whose minExpiryDuration has not elapsed.
679+ origMinEvictDuration := minEvictDuration
680+ minEvictDuration = time .Duration (0 )
681+ defer func () { minEvictDuration = origMinEvictDuration }()
682+
683+ // Register the top-level wrapping balancer which forwards calls to RLS.
684+ topLevelBalancerName := t .Name () + "top-level"
685+ var ccWrapper * testCCWrapper
686+ stub .Register (topLevelBalancerName , stub.BalancerFuncs {
687+ Init : func (bd * stub.BalancerData ) {
688+ ccWrapper = & testCCWrapper {ClientConn : bd .ClientConn }
689+ bd .Data = balancer .Get (Name ).Build (ccWrapper , bd .BuildOptions )
690+ },
691+ ParseConfig : func (sc json.RawMessage ) (serviceconfig.LoadBalancingConfig , error ) {
692+ parser := balancer .Get (Name ).(balancer.ConfigParser )
693+ return parser .ParseConfig (sc )
694+ },
695+ UpdateClientConnState : func (bd * stub.BalancerData , ccs balancer.ClientConnState ) error {
696+ bal := bd .Data .(balancer.Balancer )
697+ return bal .UpdateClientConnState (ccs )
698+ },
699+ Close : func (bd * stub.BalancerData ) {
700+ bal := bd .Data .(balancer.Balancer )
701+ bal .Close ()
702+ },
703+ })
704+
705+ // Start an RLS server and set the throttler to never throttle requests.
706+ rlsServer , rlsReqCh := rlstest .SetupFakeRLSServer (t , nil )
707+ overrideAdaptiveThrottler (t , neverThrottlingThrottler ())
708+
709+ // Register an LB policy to act as the child policy for RLS LB policy.
710+ childPolicyName := "test-child-policy" + t .Name ()
711+ e2e .RegisterRLSChildPolicy (childPolicyName , nil )
712+ t .Logf ("Registered child policy with name %q" , childPolicyName )
713+
714+ // Start a couple of test backends, and set up the fake RLS server to return
715+ // these as targets in the RLS response, based on request keys.
716+ // Start a couple of test backends, and set up the fake RLS server to return
717+ // these as targets in the RLS response, based on request keys.
718+ backendCh1 , backendAddress1 := startBackend (t )
719+ backendCh2 , backendAddress2 := startBackend (t )
720+ rlsServer .SetResponseCallback (func (ctx context.Context , req * rlspb.RouteLookupRequest ) * rlstest.RouteLookupResponse {
721+ if req .KeyMap ["k1" ] == "v1" {
722+ return & rlstest.RouteLookupResponse {Resp : & rlspb.RouteLookupResponse {Targets : []string {backendAddress1 }}}
723+ }
724+ if req .KeyMap ["k2" ] == "v2" {
725+ return & rlstest.RouteLookupResponse {Resp : & rlspb.RouteLookupResponse {Targets : []string {backendAddress2 }}}
726+ }
727+ return & rlstest.RouteLookupResponse {Err : errors .New ("no keys in request metadata" )}
728+ })
729+
730+ // Register a manual resolver and push the RLS service config through it.
731+ r := manual .NewBuilderWithScheme ("rls-e2e" )
732+ headers := `
733+ [
734+ {
735+ "key": "k1",
736+ "names": [
737+ "n1"
738+ ]
739+ },
740+ {
741+ "key": "k2",
742+ "names": [
743+ "n2"
744+ ]
745+ }
746+ ]
747+ `
748+
749+ configJSON := `
750+ {
751+ "loadBalancingConfig": [
752+ {
753+ "%s": {
754+ "routeLookupConfig": {
755+ "grpcKeybuilders": [{
756+ "names": [{"service": "grpc.testing.TestService"}],
757+ "headers": %s
758+ }],
759+ "lookupService": "%s",
760+ "cacheSizeBytes": %d
761+ },
762+ "childPolicy": [{"%s": {}}],
763+ "childPolicyConfigTargetFieldName": "Backend"
764+ }
765+ }
766+ ]
767+ }`
768+ scJSON := fmt .Sprintf (configJSON , topLevelBalancerName , headers , rlsServer .Address , 1000 , childPolicyName )
769+ sc := internal .ParseServiceConfig .(func (string ) * serviceconfig.ParseResult )(scJSON )
770+ r .InitialState (resolver.State {ServiceConfig : sc })
771+
772+ cc , err := grpc .Dial (r .Scheme ()+ ":///" , grpc .WithResolvers (r ), grpc .WithTransportCredentials (insecure .NewCredentials ()))
773+ if err != nil {
774+ t .Fatalf ("create grpc.Dial() failed: %v" , err )
775+ }
776+ defer cc .Close ()
777+
778+ <- clientConnUpdateDone
779+
780+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
781+ defer cancel ()
782+ // Make an RPC call with empty metadata, which will eventually throw
783+ // the error as no metadata will match from rlsServer response
784+ // callback defined above. This will cause the control channel to
785+ // throw the error and cause the item to get into backoff.
786+ makeTestRPCAndVerifyError (ctx , t , cc , codes .Unavailable , nil )
787+
788+ ctxOutgoing := metadata .AppendToOutgoingContext (ctx , "n1" , "v1" )
789+ makeTestRPCAndExpectItToReachBackend (ctxOutgoing , t , cc , backendCh1 )
790+ verifyRLSRequest (t , rlsReqCh , true )
791+
792+ ctxOutgoing = metadata .AppendToOutgoingContext (ctx , "n2" , "v2" )
793+ makeTestRPCAndExpectItToReachBackend (ctxOutgoing , t , cc , backendCh2 )
794+ verifyRLSRequest (t , rlsReqCh , true )
795+
796+ initialStateCnt := len (ccWrapper .getStates ())
797+ // Setting the size to 1 will cause the entries to be
798+ // evicted.
799+ scJSON1 := fmt .Sprintf (`
800+ {
801+ "loadBalancingConfig": [
802+ {
803+ "%s": {
804+ "routeLookupConfig": {
805+ "grpcKeybuilders": [{
806+ "names": [{"service": "grpc.testing.TestService"}],
807+ "headers": %s
808+ }],
809+ "lookupService": "%s",
810+ "cacheSizeBytes": 2
811+ },
812+ "childPolicy": [{"%s": {}}],
813+ "childPolicyConfigTargetFieldName": "Backend"
814+ }
815+ }
816+ ]
817+ }` , topLevelBalancerName , headers , rlsServer .Address , childPolicyName )
818+ sc1 := internal .ParseServiceConfig .(func (string ) * serviceconfig.ParseResult )(scJSON1 )
819+ r .UpdateState (resolver.State {ServiceConfig : sc1 })
820+ <- clientConnUpdateDone
821+ finalStateCnt := len (ccWrapper .getStates ())
822+
823+ if finalStateCnt != initialStateCnt + 1 {
824+ t .Errorf ("Unexpected balancer state count: got %v, want %v" , finalStateCnt , initialStateCnt )
825+ }
826+ }
827+
655828// TestDataCachePurging verifies that the LB policy periodically evicts expired
656829// entries from the data cache.
657830func (s ) TestDataCachePurging (t * testing.T ) {
0 commit comments