-
Notifications
You must be signed in to change notification settings - Fork 4.6k
health, grpc: Deliver health service updates through the health listener #7900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -277,10 +277,17 @@ type healthData struct { | |
| // to the LB policy. This is stored to avoid sending updates when the | ||
| // SubConn has already exited connectivity state READY. | ||
| connectivityState connectivity.State | ||
| // closeHealthProducer stores function to close the ref counted health | ||
| // producer. The health producer is automatically closed when the SubConn | ||
| // state changes. | ||
| closeHealthProducer func() | ||
| } | ||
|
|
||
| func newHealthData(s connectivity.State) *healthData { | ||
| return &healthData{connectivityState: s} | ||
| return &healthData{ | ||
| connectivityState: s, | ||
| closeHealthProducer: func() {}, | ||
| } | ||
| } | ||
|
|
||
| // updateState is invoked by grpc to push a subConn state update to the | ||
|
|
@@ -413,6 +420,34 @@ func (acbw *acBalancerWrapper) closeProducers() { | |
| } | ||
| } | ||
|
|
||
| // healthProducerRegisterFn is a type alias for the health producer's function | ||
| // for registering listeners. | ||
| type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func() | ||
|
|
||
| // healthServiceOpts returns the options for client side health checking. | ||
easwars marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // It returns a nil registerHealthListenerFn if client side health checks are | ||
| // disabled. | ||
| // Client side health checking is enabled when all the following | ||
| // conditions are satisfied: | ||
| // 1. Health checking is not disabled using the dial option. | ||
| // 2. The health package is imported. | ||
| // 3. The health check config is present in the service config. | ||
| func (acbw *acBalancerWrapper) healthServiceOpts() (string, healthProducerRegisterFn) { | ||
| if acbw.ccb.cc.dopts.disableHealthCheck { | ||
| return "", nil | ||
| } | ||
| regHealthLisFn := internal.RegisterClientHealthCheckListener | ||
| if regHealthLisFn == nil { | ||
| // The health package is not imported. | ||
| return "", nil | ||
| } | ||
| cfg := acbw.ac.cc.healthCheckConfig() | ||
| if cfg == nil { | ||
| return "", nil | ||
| } | ||
| return cfg.ServiceName, regHealthLisFn.(healthProducerRegisterFn) | ||
| } | ||
|
|
||
| // RegisterHealthListener accepts a health listener from the LB policy. It sends | ||
| // updates to the health listener as long as the SubConn's connectivity state | ||
| // doesn't change and a new health listener is not registered. To invalidate | ||
|
|
@@ -421,6 +456,7 @@ func (acbw *acBalancerWrapper) closeProducers() { | |
| func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { | ||
| acbw.healthMu.Lock() | ||
| defer acbw.healthMu.Unlock() | ||
| acbw.healthData.closeHealthProducer() | ||
| // listeners should not be registered when the connectivity state | ||
| // isn't Ready. This may happen when the balancer registers a listener | ||
| // after the connectivityState is updated, but before it is notified | ||
|
|
@@ -436,17 +472,37 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub | |
| return | ||
| } | ||
|
|
||
| serviceName, registerFn := acbw.healthServiceOpts() | ||
| acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { | ||
| if ctx.Err() != nil || acbw.ccb.balancer == nil { | ||
| return | ||
| } | ||
| // Don't send updates if a new listener is registered. | ||
| acbw.healthMu.Lock() | ||
| defer acbw.healthMu.Unlock() | ||
| curHD := acbw.healthData | ||
| if curHD != hd { | ||
| if acbw.healthData != hd { | ||
| return | ||
| } | ||
| if registerFn == nil { | ||
|
||
| listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) | ||
easwars marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return | ||
| } | ||
| listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) | ||
| // Serialize the health updates from the health producer with | ||
| // other calls into the LB policy. | ||
| listenerWrapper := func(scs balancer.SubConnState) { | ||
| acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { | ||
| if ctx.Err() != nil || acbw.ccb.balancer == nil { | ||
| return | ||
| } | ||
| acbw.healthMu.Lock() | ||
| defer acbw.healthMu.Unlock() | ||
| if acbw.healthData != hd { | ||
| return | ||
| } | ||
| listener(scs) | ||
| }) | ||
| } | ||
|
|
||
| hd.closeHealthProducer = registerFn(ctx, acbw, serviceName, listenerWrapper) | ||
| }) | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,116 @@ | ||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * Copyright 2024 gRPC authors. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||
| * you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||
| * You may obtain a copy of the License at | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| package health | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||
| "sync" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| "google.golang.org/grpc" | ||||||||||||||||||||||||||||
| "google.golang.org/grpc/balancer" | ||||||||||||||||||||||||||||
| "google.golang.org/grpc/codes" | ||||||||||||||||||||||||||||
| "google.golang.org/grpc/connectivity" | ||||||||||||||||||||||||||||
| "google.golang.org/grpc/internal" | ||||||||||||||||||||||||||||
| "google.golang.org/grpc/internal/grpcsync" | ||||||||||||||||||||||||||||
| "google.golang.org/grpc/status" | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| func init() { | ||||||||||||||||||||||||||||
| producerBuilderSingleton = &producerBuilder{} | ||||||||||||||||||||||||||||
| internal.RegisterClientHealthCheckListener = registerClientSideHealthCheckListener | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| type producerBuilder struct{} | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| var producerBuilderSingleton *producerBuilder | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Build constructs and returns a producer and its cleanup function. | ||||||||||||||||||||||||||||
| func (*producerBuilder) Build(cci any) (balancer.Producer, func()) { | ||||||||||||||||||||||||||||
| doneCh := make(chan struct{}) | ||||||||||||||||||||||||||||
| p := &healthServiceProducer{ | ||||||||||||||||||||||||||||
| cc: cci.(grpc.ClientConnInterface), | ||||||||||||||||||||||||||||
| cancelDone: doneCh, | ||||||||||||||||||||||||||||
| cancel: grpcsync.OnceFunc(func() { | ||||||||||||||||||||||||||||
| close(doneCh) | ||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
| onClose := func(r transport.GoAwayReason) { | |
| ac.mu.Lock() | |
| defer ac.mu.Unlock() | |
| // adjust params based on GoAwayReason | |
| ac.adjustParams(r) | |
| if ctx.Err() != nil { | |
| // Already shut down or connection attempt canceled. tearDown() or | |
| // updateAddrs() already cleared the transport and canceled hctx | |
| // via ac.ctx, and we expected this connection to be closed, so do | |
| // nothing here. | |
| return | |
| } | |
| hcancel() |
Uh oh!
There was an error while loading. Please reload this page.