We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent bfe4374 commit fd4ed0bCopy full SHA for fd4ed0b
consumer.go
@@ -126,16 +126,6 @@ func (s *Consumer) Shutdown() error {
126
return nil
127
}
128
129
-// Capacity for channel
130
-func (s *Consumer) Capacity() int {
131
- return cap(s.taskQueue)
132
-}
133
-
134
-// Usage for count of channel usage
135
-func (s *Consumer) Usage() int {
136
- return len(s.taskQueue)
137
138
139
// Queue send notification to queue
140
func (s *Consumer) Queue(task QueuedMessage) error {
141
if atomic.LoadInt32(&s.stopFlag) == 1 {
@@ -155,7 +145,7 @@ func (s *Consumer) Request() (QueuedMessage, error) {
155
145
case task := <-s.taskQueue:
156
146
return task, nil
157
147
default:
158
- return nil, errors.New("no message in queue")
148
+ return nil, errors.New("no task in queue")
159
149
160
150
161
151
consumer_test.go
@@ -12,26 +12,12 @@ import (
12
"github.com/stretchr/testify/assert"
13
)
14
15
-func TestQueueUsage(t *testing.T) {
16
- w := NewConsumer()
17
- assert.Equal(t, defaultQueueSize, w.Capacity())
18
- assert.Equal(t, 0, w.Usage())
19
20
- assert.NoError(t, w.Queue(&mockMessage{}))
21
- assert.Equal(t, 1, w.Usage())
22
23
24
func TestMaxCapacity(t *testing.T) {
25
w := NewConsumer(WithQueueSize(2))
26
- assert.Equal(t, 2, w.Capacity())
27
28
29
assert.NoError(t, w.Queue(&mockMessage{}))
30
31
32
- assert.Equal(t, 2, w.Usage())
33
assert.Error(t, w.Queue(&mockMessage{}))
34
35
36
err := w.Queue(&mockMessage{})
37
assert.Equal(t, errMaxCapacity, err)
queue.go
@@ -272,30 +272,32 @@ func (q *Queue) start() {
272
case task = <-tasks:
273
// queue task before shutdown the service
274
if err := q.worker.Queue(task); err != nil {
275
- q.logger.Errorf("can't re-queue message: %v", err)
+ q.logger.Errorf("can't re-queue task: %v", err)
276
277
278
279
return
280
281
282
// check worker number
283
+ q.Lock()
284
if q.BusyWorkers() < q.workerCount {
285
q.schedule()
286
287
+ q.Unlock()
288
289
// get worker to execute new task
290
select {
291
case <-q.quit:
292
293
294
295
296
case <-q.ready:
297
298
299
300
301
302
303
0 commit comments