Skip to content

Commit 7407dbf

Browse files
yashyktcopybara-github
authored andcommitted
[RlsLB] Fix Deadlock (grpc#37459)
Internal bug: b/357864682 A lock ordering inversion was noticed with the following stacks - ``` [mutex.cc : 1418] RAW: Potential Mutex deadlock: @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck() @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock() @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock() @ 0x564f4be968c5 grpc::internal::OpenTelemetryPluginImpl::RemoveCallback() @ 0x564f4cd097b8 grpc_core::RegisteredMetricCallback::~RegisteredMetricCallback() @ 0x564f4c1f1216 std::default_delete<>::operator()() @ 0x564f4c1f157f std::__uniq_ptr_impl<>::reset() @ 0x564f4c1ee967 std::unique_ptr<>::reset() @ 0x564f4c352f44 grpc_core::GrpcXdsClient::Orphaned() @ 0x564f4c25dad1 grpc_core::DualRefCounted<>::Unref() @ 0x564f4c4653ed grpc_core::RefCountedPtr<>::reset() @ 0x564f4c463c73 grpc_core::XdsClusterDropStats::~XdsClusterDropStats() @ 0x564f4c463d02 grpc_core::XdsClusterDropStats::~XdsClusterDropStats() @ 0x564f4c25efa9 grpc_core::UnrefDelete::operator()<>() @ 0x564f4c25d5f0 grpc_core::RefCounted<>::Unref() @ 0x564f4c25c2d9 grpc_core::RefCountedPtr<>::~RefCountedPtr() @ 0x564f4c25b1d8 grpc_core::(anonymous namespace)::XdsClusterImplLb::Picker::~Picker() @ 0x564f4c25b240 grpc_core::(anonymous namespace)::XdsClusterImplLb::Picker::~Picker() @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>() @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref() @ 0x564f4c124fb8 grpc_core::DualRefCounted<>::Unref() @ 0x564f4c11f029 grpc_core::RefCountedPtr<>::~RefCountedPtr() @ 0x564f4c14e958 grpc_core::(anonymous namespace)::OutlierDetectionLb::Picker::~Picker() @ 0x564f4c14e980 grpc_core::(anonymous namespace)::OutlierDetectionLb::Picker::~Picker() @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>() @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref() @ 0x564f4c124fb8 grpc_core::DualRefCounted<>::Unref() @ 0x564f4c11f029 grpc_core::RefCountedPtr<>::~RefCountedPtr() @ 0x564f4c26bafc std::pair<>::~pair() @ 0x564f4c26bb28 __gnu_cxx::new_allocator<>::destroy<>() @ 0x564f4c26b88f std::allocator_traits<>::destroy<>() @ 0x564f4c26b297 std::_Rb_tree<>::_M_destroy_node() @ 0x564f4c26abfb std::_Rb_tree<>::_M_drop_node() @ 0x564f4c26a926 std::_Rb_tree<>::_M_erase() @ 0x564f4c26a6f0 std::_Rb_tree<>::~_Rb_tree() @ 0x564f4c26a62a std::map<>::~map() @ 0x564f4c2691a4 grpc_core::(anonymous namespace)::XdsClusterManagerLb::ClusterPicker::~ClusterPicker() @ 0x564f4c2691cc grpc_core::(anonymous namespace)::XdsClusterManagerLb::ClusterPicker::~ClusterPicker() @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>() @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref() [mutex.cc : 1428] RAW: Acquiring absl::Mutex 0x564f4f22ad40 while holding 0x7f939834bb70; a cycle in the historical lock ordering graph has been observed [mutex.cc : 1432] RAW: Cycle: [mutex.cc : 1446] RAW: mutex@0x564f4f22ad40 stack: @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck() @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock() @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock() @ 0x564f4be96124 grpc::internal::OpenTelemetryPluginImpl::AddCallback() @ 0x564f4cd096f0 grpc_core::RegisteredMetricCallback::RegisteredMetricCallback() @ 0x564f4c1f111b std::make_unique<>() @ 0x564f4c3564b0 grpc_core::GlobalStatsPluginRegistry::StatsPluginGroup::RegisterCallback<>() @ 0x564f4c352dea grpc_core::GrpcXdsClient::GrpcXdsClient() @ 0x564f4c355bc6 grpc_core::MakeRefCounted<>() @ 0x564f4c3525f2 grpc_core::GrpcXdsClient::GetOrCreate() @ 0x564f4c28f8f8 grpc_core::(anonymous namespace)::XdsResolver::StartLocked() @ 0x564f4c2f5f82 grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartXdsResolver() @ 0x564f4c2f515d grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::ZoneQueryDone() @ 0x564f4c2f496b grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartLocked()::{lambda()#1}::operator()()::{lambda()#1}::operator()() @ 0x564f4c2f80f6 std::__invoke_impl<>() @ 0x564f4c2f7b9d _ZSt10__invoke_rIvRZZN9grpc_core12_GLOBAL__N_124GoogleCloud2ProdResolver11StartLockedEvENUlNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEN4absl12lts_202401168StatusOrIS8_EEE_clES8_SC_EUlvE_J... @ 0x564f4c2f748c std::_Function_handler<>::_M_invoke() @ 0x564f4b8ad682 std::function<>::operator()() @ 0x564f4cd1c6bf grpc_core::WorkSerializer::LegacyWorkSerializer::Run() @ 0x564f4cd1dae4 grpc_core::WorkSerializer::Run() @ 0x564f4c2f4b0b grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartLocked()::{lambda()#1}::operator()() @ 0x564f4c2f8dc7 absl::lts_20240116::base_internal::Callable::Invoke<>() @ 0x564f4c2f8cb8 absl::lts_20240116::base_internal::invoke<>() @ 0x564f4c2f8b16 absl::lts_20240116::internal_any_invocable::InvokeR<>() @ 0x564f4c2f8a0c absl::lts_20240116::internal_any_invocable::LocalInvoker<>() @ 0x564f4c2fb88d absl::lts_20240116::internal_any_invocable::Impl<>::operator()() @ 0x564f4c2fb1f3 grpc_core::GcpMetadataQuery::OnDone() @ 0x564f4cd75a72 exec_ctx_run() @ 0x564f4cd75ba9 grpc_core::ExecCtx::Flush() @ 0x564f4cc8ee1d end_worker() @ 0x564f4cc8f304 pollset_work() @ 0x564f4cc5dcaf pollset_work() @ 0x564f4cc69220 grpc_pollset_work() @ 0x564f4cbe7733 cq_pluck() @ 0x564f4cbe7ad5 grpc_completion_queue_pluck @ 0x564f4bc61d96 grpc::CompletionQueue::Pluck() @ 0x564f4bfdb055 grpc::ClientReader<>::ClientReader<>() @ 0x564f4bfd6035 grpc::internal::ClientReaderFactory<>::Create<>() @ 0x564f4bfc322b google::storage::v2::Storage::Stub::ReadObjectRaw() @ 0x564f4bf9934b google::storage::v2::Storage::Stub::ReadObject() [mutex.cc : 1446] RAW: mutex@0x7f939834bb70 stack: @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck() @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock() @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock() @ 0x564f4c1ce9eb grpc_core::(anonymous namespace)::RlsLb::RlsLb()::{lambda()#1}::operator()() @ 0x564f4c1e794c absl::lts_20240116::base_internal::Callable::Invoke<>() @ 0x564f4c1e72c1 absl::lts_20240116::base_internal::invoke<>() @ 0x564f4c1e6af1 absl::lts_20240116::internal_any_invocable::InvokeR<>() @ 0x564f4c1e5d6c absl::lts_20240116::internal_any_invocable::LocalInvoker<>() @ 0x564f4be9d0c8 absl::lts_20240116::internal_any_invocable::Impl<>::operator()() @ 0x564f4be9b4ff grpc_core::RegisteredMetricCallback::Run() @ 0x564f4bea07ae grpc::internal::OpenTelemetryPluginImpl::CallbackGaugeState<>::CallbackGaugeCallback() @ 0x564f4bf844de opentelemetry::v1::sdk::metrics::ObservableRegistry::Observe() @ 0x564f4bf56529 opentelemetry::v1::sdk::metrics::Meter::Collect() @ 0x564f4bf8c1d5 opentelemetry::v1::sdk::metrics::MetricCollector::Collect()::{lambda()#1}::operator()() @ 0x564f4bf8c5ac opentelemetry::v1::nostd::function_ref<>::BindTo<>()::{lambda()#1}::operator()() @ 0x564f4bf8c5e8 opentelemetry::v1::nostd::function_ref<>::BindTo<>()::{lambda()#1}::_FUN() @ 0x564f4bf7604d opentelemetry::v1::nostd::function_ref<>::operator()() @ 0x564f4bf74ad9 opentelemetry::v1::sdk::metrics::MeterContext::ForEachMeter() @ 0x564f4bf8c457 opentelemetry::v1::sdk::metrics::MetricCollector::Collect() @ 0x564f4bf4a7fe opentelemetry::v1::sdk::metrics::MetricReader::Collect() @ 0x564f4bed5e24 opentelemetry::v1::exporter::metrics::PrometheusCollector::Collect() @ 0x564f4bef004f prometheus::detail::CollectMetrics() @ 0x564f4beec26d prometheus::detail::MetricsHandler::handleGet() @ 0x564f4bf1cd8b CivetServer::requestHandler() @ 0x564f4bf35e7b handle_request @ 0x564f4bf29534 handle_request_stat_log @ 0x564f4bf39b3f process_new_connection @ 0x564f4bf3a448 worker_thread_run @ 0x564f4bf3a57f worker_thread @ 0x7f93e9137ea7 start_thread [mutex.cc : 1454] RAW: dying due to potential deadlock Aborted ``` From the stack, it looks like we are ending up holding a lock to the `RlsLB` policy while removing a callback from the gRPC OpenTelemetry plugin, which is a lock ordering inversion. The correct order is `OpenTelemetry` -> `gRPC OpenTelemetry plugin` -> `gRPC Component like RLS/xDSClient`. A common pattern we employ for metrics is for the callbacks to be unregistered when the corresponding component object is orphaned/destroyed (unreffing). Also, note that removing callbacks requires a lock in `gRPC OpenTelemetry plugin`. To avoid deadlocks, we remove the callback inside `RlsLb` from outside the critical region, but `RlsLb` owns refs to child policies which in turn hold refs to `XdsClient`. The lock ordering inversion occurred due to unreffing child policies within the critical region. This PR is an alternative fix to this problem. Original fix in grpc#37425. Verified that it fixes the bug. Closes grpc#37459 COPYBARA_INTEGRATE_REVIEW=grpc#37459 from yashykt:FixDeadlocks ec7fbcf PiperOrigin-RevId: 663360427
1 parent a09aaf0 commit 7407dbf

File tree

1 file changed

+90
-33
lines changed
  • src/core/load_balancing/rls

1 file changed

+90
-33
lines changed

src/core/load_balancing/rls/rls.cc

Lines changed: 90 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,8 @@ class RlsLb final : public LoadBalancingPolicy {
353353
// is called after releasing it.
354354
//
355355
// Both methods grab the data they need from the parent object.
356-
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
356+
void StartUpdate(OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
357+
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
357358
absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);
358359

359360
void ExitIdleLocked() {
@@ -397,14 +398,14 @@ class RlsLb final : public LoadBalancingPolicy {
397398
};
398399

399400
// Note: We are forced to disable lock analysis here because
400-
// Orphan() is called by Unref() which is called by RefCountedPtr<>, which
401+
// Orphaned() is called by Unref() which is called by RefCountedPtr<>, which
401402
// cannot have lock annotations for this particular caller.
402403
void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
403404

404405
RefCountedPtr<RlsLb> lb_policy_;
405406
std::string target_;
406407

407-
bool is_shutdown_ = false;
408+
bool is_shutdown_ = false; // Protected by WorkSerializer
408409

409410
OrphanablePtr<ChildPolicyHandler> child_policy_;
410411
RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
@@ -503,12 +504,25 @@ class RlsLb final : public LoadBalancingPolicy {
503504
// Returns a list of child policy wrappers on which FinishUpdate()
504505
// needs to be called after releasing the lock.
505506
std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
506-
ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
507+
ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
508+
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
507509
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
508510

509511
// Moves entry to the end of the LRU list.
510512
void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
511513

514+
// Takes entries from child_policy_wrappers_ and appends them to the end
515+
// of \a child_policy_wrappers.
516+
void TakeChildPolicyWrappers(
517+
std::vector<RefCountedPtr<ChildPolicyWrapper>>* child_policy_wrappers)
518+
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
519+
child_policy_wrappers->insert(
520+
child_policy_wrappers->end(),
521+
std::make_move_iterator(child_policy_wrappers_.begin()),
522+
std::make_move_iterator(child_policy_wrappers_.end()));
523+
child_policy_wrappers_.clear();
524+
}
525+
512526
private:
513527
class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
514528
public:
@@ -566,19 +580,24 @@ class RlsLb final : public LoadBalancingPolicy {
566580
// the caller. Otherwise, the entry found is returned to the caller. The
567581
// entry returned to the user is considered recently used and its order in
568582
// the LRU list of the cache is updated.
569-
Entry* FindOrInsert(const RequestKey& key)
583+
Entry* FindOrInsert(const RequestKey& key,
584+
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
585+
child_policy_wrappers_to_delete)
570586
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
571587

572588
// Resizes the cache. If the new cache size is greater than the current size
573589
// of the cache, do nothing. Otherwise, evict the oldest entries that
574590
// exceed the new size limit of the cache.
575-
void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
591+
void Resize(size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
592+
child_policy_wrappers_to_delete)
593+
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
576594

577595
// Resets backoff of all the cache entries.
578596
void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
579597

580598
// Shutdown the cache; clean-up and orphan all the stored cache entries.
581-
void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
599+
GRPC_MUST_USE_RESULT std::vector<RefCountedPtr<ChildPolicyWrapper>>
600+
Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
582601

583602
void ReportMetricsLocked(CallbackMetricReporter& reporter)
584603
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
@@ -594,7 +613,9 @@ class RlsLb final : public LoadBalancingPolicy {
594613

595614
// Evicts oversized cache elements when the current size is greater than
596615
// the specified limit.
597-
void MaybeShrinkSize(size_t bytes)
616+
void MaybeShrinkSize(size_t bytes,
617+
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
618+
child_policy_wrappers_to_delete)
598619
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
599620

600621
RlsLb* lb_policy_;
@@ -857,7 +878,8 @@ absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
857878
return Json::FromArray(std::move(array));
858879
}
859880

860-
void RlsLb::ChildPolicyWrapper::StartUpdate() {
881+
void RlsLb::ChildPolicyWrapper::StartUpdate(
882+
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
861883
ValidationErrors errors;
862884
auto child_policy_config = InsertOrUpdateChildPolicyField(
863885
lb_policy_->config_->child_policy_config_target_field_name(), target_,
@@ -880,7 +902,7 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() {
880902
pending_config_.reset();
881903
picker_ = MakeRefCounted<TransientFailurePicker>(
882904
absl::UnavailableError(config.status().message()));
883-
child_policy_.reset();
905+
*child_policy_to_delete = std::move(child_policy_);
884906
} else {
885907
pending_config_ = std::move(*config);
886908
}
@@ -934,9 +956,9 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
934956
<< ": UpdateState(state=" << ConnectivityStateName(state)
935957
<< ", status=" << status << ", picker=" << picker.get() << ")";
936958
}
959+
if (wrapper_->is_shutdown_) return;
937960
{
938961
MutexLock lock(&wrapper_->lb_policy_->mu_);
939-
if (wrapper_->is_shutdown_) return;
940962
// TODO(roth): It looks like this ignores subsequent TF updates that
941963
// might change the status used to fail picks, which seems wrong.
942964
if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
@@ -946,7 +968,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
946968
wrapper_->connectivity_state_ = state;
947969
DCHECK(picker != nullptr);
948970
if (picker != nullptr) {
949-
wrapper_->picker_ = std::move(picker);
971+
// We want to unref the picker after we release the lock.
972+
wrapper_->picker_.swap(picker);
950973
}
951974
}
952975
wrapper_->lb_policy_->UpdatePickerLocked();
@@ -1194,18 +1217,19 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
11941217
lb_policy_->cache_.lru_list_.end(), key)) {}
11951218

11961219
void RlsLb::Cache::Entry::Orphan() {
1220+
// We should be holding RlsLB::mu_.
11971221
GRPC_TRACE_LOG(rls_lb, INFO)
11981222
<< "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
11991223
<< lru_iterator_->ToString() << ": cache entry evicted";
12001224
is_shutdown_ = true;
12011225
lb_policy_->cache_.lru_list_.erase(lru_iterator_);
12021226
lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
1227+
CHECK(child_policy_wrappers_.empty());
12031228
backoff_state_.reset();
12041229
if (backoff_timer_ != nullptr) {
12051230
backoff_timer_.reset();
12061231
lb_policy_->UpdatePickerAsync();
12071232
}
1208-
child_policy_wrappers_.clear();
12091233
Unref(DEBUG_LOCATION, "Orphan");
12101234
}
12111235

@@ -1284,7 +1308,8 @@ void RlsLb::Cache::Entry::MarkUsed() {
12841308

12851309
std::vector<RlsLb::ChildPolicyWrapper*>
12861310
RlsLb::Cache::Entry::OnRlsResponseLocked(
1287-
ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
1311+
ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
1312+
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
12881313
// Move the entry to the end of the LRU list.
12891314
MarkUsed();
12901315
// If the request failed, store the failed status and update the
@@ -1345,7 +1370,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked(
13451370
if (it == lb_policy_->child_policy_map_.end()) {
13461371
auto new_child = MakeRefCounted<ChildPolicyWrapper>(
13471372
lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
1348-
new_child->StartUpdate();
1373+
new_child->StartUpdate(child_policy_to_delete);
13491374
child_policies_to_finish_update.push_back(new_child.get());
13501375
new_child_policy_wrappers.emplace_back(std::move(new_child));
13511376
} else {
@@ -1382,12 +1407,15 @@ RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
13821407
return it->second.get();
13831408
}
13841409

1385-
RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
1410+
RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(
1411+
const RequestKey& key, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1412+
child_policy_wrappers_to_delete) {
13861413
auto it = map_.find(key);
13871414
// If not found, create new entry.
13881415
if (it == map_.end()) {
13891416
size_t entry_size = EntrySizeForKey(key);
1390-
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
1417+
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size),
1418+
child_policy_wrappers_to_delete);
13911419
Entry* entry = new Entry(
13921420
lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key);
13931421
map_.emplace(key, OrphanablePtr<Entry>(entry));
@@ -1405,11 +1433,13 @@ RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
14051433
return it->second.get();
14061434
}
14071435

1408-
void RlsLb::Cache::Resize(size_t bytes) {
1436+
void RlsLb::Cache::Resize(size_t bytes,
1437+
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1438+
child_policy_wrappers_to_delete) {
14091439
GRPC_TRACE_LOG(rls_lb, INFO)
14101440
<< "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes";
14111441
size_limit_ = bytes;
1412-
MaybeShrinkSize(size_limit_);
1442+
MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete);
14131443
}
14141444

14151445
void RlsLb::Cache::ResetAllBackoff() {
@@ -1419,7 +1449,12 @@ void RlsLb::Cache::ResetAllBackoff() {
14191449
lb_policy_->UpdatePickerAsync();
14201450
}
14211451

1422-
void RlsLb::Cache::Shutdown() {
1452+
std::vector<RefCountedPtr<RlsLb::ChildPolicyWrapper>> RlsLb::Cache::Shutdown() {
1453+
std::vector<RefCountedPtr<ChildPolicyWrapper>>
1454+
child_policy_wrappers_to_delete;
1455+
for (auto& entry : map_) {
1456+
entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
1457+
}
14231458
map_.clear();
14241459
lru_list_.clear();
14251460
if (cleanup_timer_handle_.has_value() &&
@@ -1429,6 +1464,7 @@ void RlsLb::Cache::Shutdown() {
14291464
<< "[rlslb " << lb_policy_ << "] cache cleanup timer canceled";
14301465
}
14311466
cleanup_timer_handle_.reset();
1467+
return child_policy_wrappers_to_delete;
14321468
}
14331469

14341470
void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
@@ -1464,12 +1500,15 @@ void RlsLb::Cache::StartCleanupTimer() {
14641500
void RlsLb::Cache::OnCleanupTimer() {
14651501
GRPC_TRACE_LOG(rls_lb, INFO)
14661502
<< "[rlslb " << lb_policy_ << "] cache cleanup timer fired";
1503+
std::vector<RefCountedPtr<ChildPolicyWrapper>>
1504+
child_policy_wrappers_to_delete;
14671505
MutexLock lock(&lb_policy_->mu_);
14681506
if (!cleanup_timer_handle_.has_value()) return;
14691507
if (lb_policy_->is_shutdown_) return;
14701508
for (auto it = map_.begin(); it != map_.end();) {
14711509
if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
14721510
size_ -= it->second->Size();
1511+
it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
14731512
it = map_.erase(it);
14741513
} else {
14751514
++it;
@@ -1483,7 +1522,9 @@ size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
14831522
return (key.Size() * 2) + sizeof(Entry);
14841523
}
14851524

1486-
void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
1525+
void RlsLb::Cache::MaybeShrinkSize(
1526+
size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
1527+
child_policy_wrappers_to_delete) {
14871528
while (size_ > bytes) {
14881529
auto lru_it = lru_list_.begin();
14891530
if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
@@ -1494,6 +1535,7 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
14941535
<< "[rlslb " << lb_policy_ << "] LRU eviction: removing entry "
14951536
<< map_it->second.get() << " " << lru_it->ToString();
14961537
size_ -= map_it->second->Size();
1538+
map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete);
14971539
map_.erase(map_it);
14981540
}
14991541
GRPC_TRACE_LOG(rls_lb, INFO)
@@ -1814,13 +1856,18 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
18141856
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
18151857
<< key_.ToString() << ": response info: " << response.ToString();
18161858
std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
1859+
std::vector<RefCountedPtr<ChildPolicyWrapper>>
1860+
child_policy_wrappers_to_delete;
1861+
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
18171862
{
18181863
MutexLock lock(&lb_policy_->mu_);
18191864
if (lb_policy_->is_shutdown_) return;
18201865
rls_channel_->ReportResponseLocked(response.status.ok());
1821-
Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
1866+
Cache::Entry* cache_entry =
1867+
lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete);
18221868
child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
1823-
std::move(response), std::move(backoff_state_));
1869+
std::move(response), std::move(backoff_state_),
1870+
&child_policy_to_delete);
18241871
lb_policy_->request_map_.erase(key_);
18251872
}
18261873
// Now that we've released the lock, finish the update on any newly
@@ -1999,6 +2046,9 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
19992046
}
20002047
}
20012048
// Now grab the lock to swap out the state it guards.
2049+
std::vector<RefCountedPtr<ChildPolicyWrapper>>
2050+
child_policy_wrappers_to_delete;
2051+
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
20022052
{
20032053
MutexLock lock(&mu_);
20042054
// Swap out RLS channel if needed.
@@ -2010,19 +2060,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
20102060
// Resize cache if needed.
20112061
if (old_config == nullptr ||
20122062
config_->cache_size_bytes() != old_config->cache_size_bytes()) {
2013-
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()));
2063+
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()),
2064+
&child_policy_wrappers_to_delete);
20142065
}
20152066
// Start update of child policies if needed.
20162067
if (update_child_policies) {
20172068
GRPC_TRACE_LOG(rls_lb, INFO)
20182069
<< "[rlslb " << this << "] starting child policy updates";
20192070
for (auto& p : child_policy_map_) {
2020-
p.second->StartUpdate();
2071+
p.second->StartUpdate(&child_policy_to_delete);
20212072
}
20222073
} else if (created_default_child) {
20232074
GRPC_TRACE_LOG(rls_lb, INFO)
20242075
<< "[rlslb " << this << "] starting default child policy update";
2025-
default_child_policy_->StartUpdate();
2076+
default_child_policy_->StartUpdate(&child_policy_to_delete);
20262077
}
20272078
}
20282079
// Now that we've released the lock, finish update of child policies.
@@ -2097,14 +2148,20 @@ void RlsLb::ResetBackoffLocked() {
20972148
void RlsLb::ShutdownLocked() {
20982149
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown";
20992150
registered_metric_callback_.reset();
2100-
MutexLock lock(&mu_);
2101-
is_shutdown_ = true;
2102-
config_.reset(DEBUG_LOCATION, "ShutdownLocked");
2151+
RefCountedPtr<ChildPolicyWrapper> child_policy_to_delete;
2152+
std::vector<RefCountedPtr<ChildPolicyWrapper>>
2153+
child_policy_wrappers_to_delete;
2154+
OrphanablePtr<RlsChannel> rls_channel_to_delete;
2155+
{
2156+
MutexLock lock(&mu_);
2157+
is_shutdown_ = true;
2158+
config_.reset(DEBUG_LOCATION, "ShutdownLocked");
2159+
child_policy_wrappers_to_delete = cache_.Shutdown();
2160+
request_map_.clear();
2161+
rls_channel_to_delete = std::move(rls_channel_);
2162+
child_policy_to_delete = std::move(default_child_policy_);
2163+
}
21032164
channel_args_ = ChannelArgs();
2104-
cache_.Shutdown();
2105-
request_map_.clear();
2106-
rls_channel_.reset();
2107-
default_child_policy_.reset();
21082165
}
21092166

21102167
void RlsLb::UpdatePickerAsync() {

0 commit comments

Comments
 (0)