Skip to content

Commit 13f46eb

Browse files
authored
pubsub: simplify and improve batch sizing, especially for low message rates
1 parent 8f2c2b9 commit 13f46eb

File tree

2 files changed

+15
-27
lines changed

2 files changed

+15
-27
lines changed

pubsub/benchmark_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,13 @@ func TestReceivePerformance(t *testing.T) {
159159
return d
160160
},
161161
},
162+
{
163+
description: "intermittent",
164+
receiveProfile: func(_ bool, maxMessages int) (int, time.Duration) {
165+
n := rand.Int() % 2
166+
return n, 250 * time.Millisecond
167+
},
168+
},
162169
}
163170

164171
for _, test := range tests {

pubsub/pubsub.go

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -386,8 +386,7 @@ type Subscription struct {
386386
unreportedAckErr error // permanent error from background SendAcks that hasn't been returned to the user yet
387387
waitc chan struct{} // for goroutines waiting on ReceiveBatch
388388
runningBatchSize float64 // running number of messages to request via ReceiveBatch
389-
throughputStart time.Time // start time for throughput measurement, or the zero Time if queue is empty
390-
throughputEnd time.Time // end time for throughput measurement, or the zero Time if queue is not empty
389+
throughputStart time.Time // start time for throughput measurement
391390
throughputCount int // number of msgs given out via Receive since throughputStart
392391

393392
// Used in tests.
@@ -471,14 +470,11 @@ func (s *Subscription) updateBatchSize() int {
471470
} else {
472471
// Update s.runningBatchSize based on throughput since our last time here,
473472
// as measured by the ratio of the number of messages returned to elapsed
474-
// time when there were messages available in the queue.
475-
if s.throughputEnd.IsZero() {
476-
s.throughputEnd = now
477-
}
478-
elapsed := s.throughputEnd.Sub(s.throughputStart)
479-
if elapsed == 0 {
480-
// Avoid divide-by-zero.
481-
elapsed = 1 * time.Millisecond
473+
// time.
474+
elapsed := now.Sub(s.throughputStart)
475+
if elapsed < 100*time.Millisecond {
476+
// Avoid divide-by-zero and huge numbers.
477+
elapsed = 100 * time.Millisecond
482478
}
483479
msgsPerSec := float64(s.throughputCount) / elapsed.Seconds()
484480

@@ -500,13 +496,7 @@ func (s *Subscription) updateBatchSize() int {
500496
}
501497

502498
// Reset throughput measurement markers.
503-
if len(s.q) > 0 {
504-
s.throughputStart = now
505-
} else {
506-
// Will get set to non-zero value when we receive some messages.
507-
s.throughputStart = time.Time{}
508-
}
509-
s.throughputEnd = time.Time{}
499+
s.throughputStart = now
510500
s.throughputCount = 0
511501

512502
// Using Ceil guarantees at least one message.
@@ -569,6 +559,7 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
569559
// waiting goroutines, by closing s.waitc.
570560
s.waitc = make(chan struct{})
571561
batchSize := s.updateBatchSize()
562+
// log.Printf("BATCH SIZE %d", batchSize)
572563

573564
go func() {
574565
if s.preReceiveBatchHook != nil {
@@ -583,12 +574,6 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
583574
} else if len(msgs) > 0 {
584575
s.q = append(s.q, msgs...)
585576
}
586-
// Set the start time for measuring throughput even if we didn't get
587-
// any messages; this allows batch size to decay over time if there
588-
// aren't any message available.
589-
if s.throughputStart.IsZero() {
590-
s.throughputStart = time.Now()
591-
}
592577
close(s.waitc)
593578
s.waitc = nil
594579
}()
@@ -638,10 +623,6 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
638623
})
639624
return m2, nil
640625
}
641-
// No messages are available. Close the interval for throughput measurement.
642-
if s.throughputEnd.IsZero() && !s.throughputStart.IsZero() && s.throughputCount > 0 {
643-
s.throughputEnd = time.Now()
644-
}
645626
// A call to ReceiveBatch must be in flight. Wait for it.
646627
waitc := s.waitc
647628
s.mu.Unlock()

0 commit comments

Comments
 (0)