Skip to content

Commit b4b4df3

Browse files
authored
feat: fix push consumer pause data race (#1217)
1 parent 33fe267 commit b4b4df3

File tree

3 files changed

+24
-23
lines changed

3 files changed

+24
-23
lines changed

consumer/consumer.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ import (
2525
"strconv"
2626
"strings"
2727
"sync"
28-
"sync/atomic"
2928
"time"
3029

3130
jsoniter "github.com/json-iterator/go"
3231
"github.com/tidwall/gjson"
32+
"go.uber.org/atomic"
3333

3434
"github.com/apache/rocketmq-client-go/v2/errors"
3535
"github.com/apache/rocketmq-client-go/v2/hooks"
@@ -250,8 +250,8 @@ type defaultConsumer struct {
250250
cType ConsumeType
251251
client internal.RMQClient
252252
mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
253-
state int32
254-
pause bool
253+
state *atomic.Int32
254+
pause *atomic.Bool
255255
once sync.Once
256256
option consumerOptions
257257
// key: primitive.MessageQueue
@@ -289,14 +289,14 @@ func (dc *defaultConsumer) start() error {
289289
}
290290

291291
dc.client.Start()
292-
atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
292+
dc.state.Store(int32(internal.StateRunning))
293293
dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
294294
dc.stat = NewStatsManager()
295295
return nil
296296
}
297297

298298
func (dc *defaultConsumer) shutdown() error {
299-
atomic.StoreInt32(&dc.state, int32(internal.StateShutdown))
299+
dc.state.Store(int32(internal.StateShutdown))
300300

301301
mqs := make([]*primitive.MessageQueue, 0)
302302
dc.processQueueTable.Range(func(key, value interface{}) bool {
@@ -317,11 +317,11 @@ func (dc *defaultConsumer) shutdown() error {
317317
}
318318

319319
func (dc *defaultConsumer) isRunning() bool {
320-
return atomic.LoadInt32(&dc.state) == int32(internal.StateRunning)
320+
return dc.state.Load() == int32(internal.StateRunning)
321321
}
322322

323323
func (dc *defaultConsumer) isStopped() bool {
324-
return atomic.LoadInt32(&dc.state) == int32(internal.StateShutdown)
324+
return dc.state.Load() == int32(internal.StateShutdown)
325325
}
326326

327327
func (dc *defaultConsumer) persistConsumerOffset() error {
@@ -371,7 +371,7 @@ func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic string) bool {
371371
}
372372

373373
func (dc *defaultConsumer) doBalanceIfNotPaused() {
374-
if dc.pause {
374+
if dc.pause.Load() {
375375
rlog.Info("[BALANCE-SKIP] since consumer paused", map[string]interface{}{
376376
rlog.LogKeyConsumerGroup: dc.consumerGroup,
377377
})
@@ -483,8 +483,8 @@ func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData {
483483
}
484484

485485
func (dc *defaultConsumer) makeSureStateOK() error {
486-
if atomic.LoadInt32(&dc.state) != int32(internal.StateRunning) {
487-
return fmt.Errorf("state not running, actually: %v", dc.state)
486+
if dc.state.Load() != int32(internal.StateRunning) {
487+
return fmt.Errorf("state not running, actually: %v", dc.state.Load())
488488
}
489489
return nil
490490
}

consumer/pull_consumer.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
3232
"github.com/apache/rocketmq-client-go/v2/internal/remote"
3333
"github.com/apache/rocketmq-client-go/v2/internal/utils"
34+
atomic2 "go.uber.org/atomic"
3435

3536
"github.com/pkg/errors"
3637

@@ -111,7 +112,7 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
111112
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
112113
consumerGroup: utils.WrapNamespace(defaultOpts.Namespace, defaultOpts.GroupName),
113114
cType: _PullConsume,
114-
state: int32(internal.StateCreateJust),
115+
state: atomic2.NewInt32(int32(internal.StateCreateJust)),
115116
prCh: make(chan PullRequest, 4),
116117
model: defaultOpts.ConsumerModel,
117118
option: defaultOpts,
@@ -149,8 +150,8 @@ func (pc *defaultPullConsumer) GetTopicRouteInfo(topic string) ([]*primitive.Mes
149150
}
150151

151152
func (pc *defaultPullConsumer) Subscribe(topic string, selector MessageSelector) error {
152-
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
153-
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
153+
if pc.state.Load() == int32(internal.StateStartFailed) ||
154+
pc.state.Load() == int32(internal.StateShutdown) {
154155
return errors2.ErrStartTopic
155156
}
156157
if pc.SubType == Assign {
@@ -247,7 +248,7 @@ func (pc *defaultPullConsumer) Start() error {
247248
if err != nil {
248249
return
249250
}
250-
atomic.StoreInt32(&pc.state, int32(internal.StateRunning))
251+
pc.state.Store(int32(internal.StateRunning))
251252
go func() {
252253
for {
253254
select {
@@ -837,7 +838,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
837838
goto NEXT
838839
}
839840

840-
if pc.pause {
841+
if pc.pause.Load() {
841842
rlog.Debug(fmt.Sprintf("defaultPullConsumer [%s] of [%s] was paused, execute pull request [%s] later",
842843
pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
843844
sleepTime = _PullDelayTimeWhenSuspend

consumer/push_consumer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import (
2525
"strconv"
2626
"strings"
2727
"sync"
28-
"sync/atomic"
2928
"time"
3029

3130
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
31+
"go.uber.org/atomic"
3232

3333
"github.com/pkg/errors"
3434

@@ -97,7 +97,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
9797
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
9898
consumerGroup: defaultOpts.GroupName,
9999
cType: _PushConsume,
100-
state: int32(internal.StateCreateJust),
100+
state: atomic.NewInt32(int32(internal.StateCreateJust)),
101101
prCh: make(chan PullRequest, 4),
102102
model: defaultOpts.ConsumerModel,
103103
consumeOrderly: defaultOpts.ConsumeOrderly,
@@ -138,7 +138,7 @@ func (pc *pushConsumer) Start() error {
138138
"messageModel": pc.model,
139139
"unitMode": pc.unitMode,
140140
})
141-
atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
141+
pc.state.Store(int32(internal.StateStartFailed))
142142
err = pc.validate()
143143
if err != nil {
144144
rlog.Error("the consumer group option validate fail", map[string]interface{}{
@@ -289,8 +289,8 @@ func (pc *pushConsumer) Shutdown() error {
289289

290290
func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
291291
f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)) error {
292-
if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
293-
atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
292+
if pc.state.Load() == int32(internal.StateStartFailed) ||
293+
pc.state.Load() == int32(internal.StateShutdown) {
294294
return errors2.ErrStartTopic
295295
}
296296

@@ -685,7 +685,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
685685
goto NEXT
686686
}
687687

688-
if pc.pause {
688+
if pc.pause.Load() {
689689
rlog.Debug(fmt.Sprintf("consumer [%s] of [%s] was paused, execute pull request [%s] later",
690690
pc.option.InstanceName, pc.consumerGroup, request.String()), nil)
691691
sleepTime = _PullDelayTimeWhenSuspend
@@ -945,12 +945,12 @@ func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLev
945945
}
946946

947947
func (pc *pushConsumer) suspend() {
948-
pc.pause = true
948+
pc.pause.Store(true)
949949
rlog.Info(fmt.Sprintf("suspend consumer: %s", pc.consumerGroup), nil)
950950
}
951951

952952
func (pc *pushConsumer) resume() {
953-
pc.pause = false
953+
pc.pause.Store(false)
954954
pc.doBalance()
955955
rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
956956
}

0 commit comments

Comments
 (0)