fix: allow sharing RocketMQ MQClientInstance to avoid thread explosion#4311
Open
daguimu wants to merge 2 commits into
Open
fix: allow sharing RocketMQ MQClientInstance to avoid thread explosion#4311daguimu wants to merge 2 commits into
daguimu wants to merge 2 commits into
Conversation
Introduce an opt-in shareClientInstance flag on RocketMQ common properties. When enabled, RocketMQUtils.getInstanceName omits the per-call nanoTime suffix so consumers and producers connecting to the same name server can reuse one MQClientInstance and its worker threads instead of each spawning ~20 threads. The flag defaults to false to preserve the historical per-client isolation, and propagates from binder-level configuration down to individual consumer/producer properties. Fixes alibaba#4291
…antics - Switch shareClientInstance from a primitive to @nullable Boolean so an explicit consumer/producer-level false can override a binder-level true (important for ordered/transactional bindings that need isolated MQClientInstance). - Rename isShareClientInstance()/setShareClientInstance(boolean) to getShareClientInstance()/setShareClientInstance(Boolean) to match the getXxx() convention already used by the other boolean fields in RocketMQCommonProperties. - Change mergeRocketMQProperties to the default-only merge pattern already used for string fields: propagate the binder value only when the extension has not set one explicitly. - Update consumer/producer factories to unbox via Boolean.TRUE.equals. - Extend RocketMQUtilsTest with explicit-false-wins, producer merge, explicit-true preservation, and both-unset cases.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
RocketMQUtils.getInstanceNameappendsSystem.nanoTime()to every generated client name, so every consumer and producer ends up with a uniqueinstanceNameand therefore a uniqueclientId(ip@instanceName). RocketMQ'sMQClientManager.getOrCreateMQClientInstancecreates a separateMQClientInstanceper uniqueclientId, and each instance spawns ~20-24 threads (Netty EventLoop, PullMessageService, RebalanceService, ScheduledExecutor, etc.). With many stream bindings (e.g. 70+ consumers in the reported case) this multiplies into ~1,700 RocketMQ threads where one shared instance would be sufficient.Root Cause
Consumers with different groups connecting to the same name server can safely share a single
MQClientInstance— theconsumerTableinsideMQClientInstanceis keyed by group. ThenanoTimesuffix defeats that sharing unconditionally, even when the binding topology would tolerate it.Fix
shareClientInstanceboolean toRocketMQCommonProperties(defaultfalse, so behavior is unchanged unless the user sets it).RocketMQUtils.getInstanceName(rpcHook, identify, shareClientInstance). WhenshareClientInstanceistrue, thenanoTimesegment is omitted and the generated name becomes deterministic ([ak|]identify|pid), which lets RocketMQ reuse a singleMQClientInstanceacross multiple consumers/producers in the same JVM.getInstanceNameas a thin delegate so any external callers and the existing default behavior are preserved.shareClientInstancefromRocketMQBinderConfigurationPropertiesto the consumer/producer properties insidemergeRocketMQPropertiesusing OR-logic, so users can toggle the flag once at the binder level.RocketMQConsumerFactory#initPushConsumer,RocketMQConsumerFactory#initPullConsumer, andRocketMQProduceFactoryso every produced/consumed client honors it.Users who hit the thread explosion can now set:
or scope it per-binding on consumer/producer properties.
Tests Added
New unit tests in
RocketMQUtilsTest:getInstanceNameWithoutSharingProducesDistinctNames,getInstanceNameWithoutSharingIncludesNanoTimeSuffixshareClientInstance=truemode yields deterministic namesgetInstanceNameWithSharingProducesDeterministicNameForSameIdentify,getInstanceNameWithSharingOmitsNanoTimeSuffixidentifygetInstanceNameWithSharingStillDifferentiatesByIdentifyAclClientRPCHookis suppliedgetInstanceNameWithRpcHookPrependsAccessKeydefaultOverloadDelegatesToUnsharedBehaviormergeRocketMQPropertiesOR-logic for the new flagmergeRocketMQPropertiesPropagatesShareClientInstanceFromBinder,mergeRocketMQPropertiesPreservesConsumerShareClientInstanceWhenBinderIsFalse,mergeRocketMQPropertiesDefaultLeavesShareClientInstanceFalseImpact
shareClientInstance=false), so existing applications pick up no behavioral drift.MQClientInstancecopies into one, recovering the ~20 threads each duplicate would spawn.Fixes #4291