Skip to content

Commit 40ab0cd

Browse files
authored
Support request pipelining in AsyncProducer (#2094)
* Support request pipelining in AsyncProducer - introduce Broker.AsyncProduce with a callback to have a mostly non blocking way to produce to a single broker - refactor Broker.send and Broker.responseReceiver to support callback - factorize throttleTime metrics into Broker.updateThrottleMetric - add Config.Produce.Pipeline parameter (defaults to false for backward compatibility) - honor MaxOpenRequests when using AsyncProducer and conf.Produce.Pipeline is enabled - add unit and functional tests * Use Broker.AsyncProduce in AsyncProducer
1 parent fba46c9 commit 40ab0cd

File tree

4 files changed

+335
-75
lines changed

4 files changed

+335
-75
lines changed

async_producer.go

Lines changed: 22 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -670,20 +670,12 @@ func (pp *partitionProducer) updateLeader() error {
670670
})
671671
}
672672

673-
type pendingResponse struct {
674-
set *produceSet
675-
version int16
676-
promise *responsePromise
677-
response *ProduceResponse
678-
}
679-
680673
// one per broker; also constructs an associated flusher
681674
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
682675
var (
683676
input = make(chan *ProducerMessage)
684677
bridge = make(chan *produceSet)
685678
responses = make(chan *brokerProducerResponse)
686-
pendings = make(chan *pendingResponse, p.conf.Net.MaxOpenRequests-1)
687679
)
688680

689681
bp := &brokerProducer{
@@ -700,54 +692,36 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
700692

701693
// minimal bridge to make the network response `select`able
702694
go withRecover(func() {
703-
defer close(pendings)
704695
for set := range bridge {
705-
set := set
706-
var err error
707-
var response *ProduceResponse
708-
var promise *responsePromise
709-
710696
request := set.buildRequest()
711-
if request.RequiredAcks != NoResponse {
712-
response = new(ProduceResponse)
713-
}
714697

715-
responseHeaderVersion := int16(-1)
716-
if response != nil {
717-
responseHeaderVersion = response.headerVersion()
718-
}
719-
720-
promise, err = broker.send(request, response != nil, responseHeaderVersion)
721-
722-
// return quickly if failed or ackMode: NoResponse
723-
if err != nil || promise == nil {
724-
responses <- &brokerProducerResponse{
725-
set: set,
726-
err: err,
727-
res: response,
698+
// Capture the current set to forward in the callback
699+
sendResponse := func(set *produceSet) ProduceCallback {
700+
return func(response *ProduceResponse, err error) {
701+
responses <- &brokerProducerResponse{
702+
set: set,
703+
err: err,
704+
res: response,
705+
}
728706
}
729-
continue
730-
}
731-
pending := &pendingResponse{set: set, version: request.version(), response: response, promise: promise}
732-
pendings <- pending
733-
}
734-
})
707+
}(set)
735708

736-
go withRecover(func() {
737-
defer close(responses)
738-
for pending := range pendings {
739-
var err error
740-
select {
741-
case buf := <-pending.promise.packets:
742-
err = versionedDecode(buf, pending.response, pending.version)
743-
case err = <-pending.promise.errors:
709+
// Use AsyncProduce vs Produce to not block waiting for the response
710+
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
711+
// https://kafka.apache.org/protocol#protocol_network
712+
err := broker.AsyncProduce(request, sendResponse)
713+
if err != nil {
714+
// Request failed to be sent
715+
sendResponse(nil, err)
716+
continue
744717
}
745-
responses <- &brokerProducerResponse{
746-
set: pending.set,
747-
err: err,
748-
res: pending.response,
718+
// Callback is not called when using NoResponse
719+
if p.conf.Producer.RequiredAcks == NoResponse {
720+
// Provide the expected nil response
721+
sendResponse(nil, nil)
749722
}
750723
}
724+
close(responses)
751725
})
752726

753727
if p.conf.Producer.Retry.Max <= 0 {

broker.go

Lines changed: 114 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Broker struct {
2828
connErr error
2929
lock sync.Mutex
3030
opened int32
31-
responses chan responsePromise
31+
responses chan *responsePromise
3232
done chan bool
3333

3434
registeredMetrics []string
@@ -121,10 +121,25 @@ type responsePromise struct {
121121
requestTime time.Time
122122
correlationID int32
123123
headerVersion int16
124+
handler func([]byte, error)
124125
packets chan []byte
125126
errors chan error
126127
}
127128

129+
func (p *responsePromise) handle(packets []byte, err error) {
130+
// Use callback when provided
131+
if p.handler != nil {
132+
p.handler(packets, err)
133+
return
134+
}
135+
// Otherwise fallback to using channels
136+
if err != nil {
137+
p.errors <- err
138+
return
139+
}
140+
p.packets <- packets
141+
}
142+
128143
// NewBroker creates and returns a Broker targeting the given host:port address.
129144
// This does not attempt to actually connect, you have to call Open() for that.
130145
func NewBroker(addr string) *Broker {
@@ -219,7 +234,7 @@ func (b *Broker) Open(conf *Config) error {
219234
}
220235

221236
b.done = make(chan bool)
222-
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
237+
b.responses = make(chan *responsePromise, b.conf.Net.MaxOpenRequests-1)
223238

224239
if b.id >= 0 {
225240
DebugLogger.Printf("Connected to broker at %s (registered as #%d)\n", b.addr, b.id)
@@ -342,7 +357,55 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e
342357
return response, nil
343358
}
344359

345-
// Produce returns a produce response or error
360+
// ProduceCallback function is called once the produce response has been parsed
361+
// or could not be read.
362+
type ProduceCallback func(*ProduceResponse, error)
363+
364+
// AsyncProduce sends a produce request and eventually call the provided callback
365+
// with a produce response or an error.
366+
//
367+
// Waiting for the response is generally not blocking on the contrary to using Produce.
368+
// If the maximum number of in flight request configured is reached then
369+
// the request will be blocked till a previous response is received.
370+
//
371+
// When configured with RequiredAcks == NoResponse, the callback will not be invoked.
372+
// If an error is returned because the request could not be sent then the callback
373+
// will not be invoked either.
374+
func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
375+
needAcks := request.RequiredAcks != NoResponse
376+
// Use a nil promise when no acks is required
377+
var promise *responsePromise
378+
379+
if needAcks {
380+
// Create ProduceResponse early to provide the header version
381+
res := new(ProduceResponse)
382+
promise = &responsePromise{
383+
headerVersion: res.headerVersion(),
384+
// Packets will be converted to a ProduceResponse in the responseReceiver goroutine
385+
handler: func(packets []byte, err error) {
386+
if err != nil {
387+
// Failed request
388+
cb(nil, err)
389+
return
390+
}
391+
392+
if err := versionedDecode(packets, res, request.version()); err != nil {
393+
// Malformed response
394+
cb(nil, err)
395+
return
396+
}
397+
398+
// Wellformed response
399+
b.updateThrottleMetric(res.ThrottleTime)
400+
cb(res, nil)
401+
},
402+
}
403+
}
404+
405+
return b.sendWithPromise(request, promise)
406+
}
407+
408+
//Produce returns a produce response or error
346409
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
347410
var (
348411
response *ProduceResponse
@@ -354,15 +417,7 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
354417
} else {
355418
response = new(ProduceResponse)
356419
err = b.sendAndReceive(request, response)
357-
if response.ThrottleTime != time.Duration(0) {
358-
DebugLogger.Printf(
359-
"producer/broker/%d ProduceResponse throttled %v\n",
360-
b.ID(), response.ThrottleTime)
361-
if b.brokerThrottleTime != nil {
362-
throttleTimeInMs := int64(response.ThrottleTime / time.Millisecond)
363-
b.brokerThrottleTime.Update(throttleTimeInMs)
364-
}
365-
}
420+
b.updateThrottleMetric(response.ThrottleTime)
366421
}
367422

368423
if err != nil {
@@ -807,24 +862,43 @@ func (b *Broker) write(buf []byte) (n int, err error) {
807862
}
808863

809864
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
865+
var promise *responsePromise
866+
if promiseResponse {
867+
// Packets or error will be sent to the following channels
868+
// once the response is received
869+
promise = &responsePromise{
870+
headerVersion: responseHeaderVersion,
871+
packets: make(chan []byte),
872+
errors: make(chan error),
873+
}
874+
}
875+
876+
if err := b.sendWithPromise(rb, promise); err != nil {
877+
return nil, err
878+
}
879+
880+
return promise, nil
881+
}
882+
883+
func (b *Broker) sendWithPromise(rb protocolBody, promise *responsePromise) error {
810884
b.lock.Lock()
811885
defer b.lock.Unlock()
812886

813887
if b.conn == nil {
814888
if b.connErr != nil {
815-
return nil, b.connErr
889+
return b.connErr
816890
}
817-
return nil, ErrNotConnected
891+
return ErrNotConnected
818892
}
819893

820894
if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
821-
return nil, ErrUnsupportedVersion
895+
return ErrUnsupportedVersion
822896
}
823897

824898
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
825899
buf, err := encode(req, b.conf.MetricRegistry)
826900
if err != nil {
827-
return nil, err
901+
return err
828902
}
829903

830904
requestTime := time.Now()
@@ -834,20 +908,21 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersi
834908
b.updateOutgoingCommunicationMetrics(bytes)
835909
if err != nil {
836910
b.addRequestInFlightMetrics(-1)
837-
return nil, err
911+
return err
838912
}
839913
b.correlationID++
840914

841-
if !promiseResponse {
915+
if promise == nil {
842916
// Record request latency without the response
843917
b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
844-
return nil, nil
918+
return nil
845919
}
846920

847-
promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
921+
promise.requestTime = requestTime
922+
promise.correlationID = req.correlationID
848923
b.responses <- promise
849924

850-
return &promise, nil
925+
return nil
851926
}
852927

853928
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
@@ -942,7 +1017,7 @@ func (b *Broker) responseReceiver() {
9421017
// This was previously incremented in send() and
9431018
// we are not calling updateIncomingCommunicationMetrics()
9441019
b.addRequestInFlightMetrics(-1)
945-
response.errors <- dead
1020+
response.handle(nil, dead)
9461021
continue
9471022
}
9481023

@@ -954,7 +1029,7 @@ func (b *Broker) responseReceiver() {
9541029
if err != nil {
9551030
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
9561031
dead = err
957-
response.errors <- err
1032+
response.handle(nil, err)
9581033
continue
9591034
}
9601035

@@ -963,15 +1038,15 @@ func (b *Broker) responseReceiver() {
9631038
if err != nil {
9641039
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
9651040
dead = err
966-
response.errors <- err
1041+
response.handle(nil, err)
9671042
continue
9681043
}
9691044
if decodedHeader.correlationID != response.correlationID {
9701045
b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
9711046
// TODO if decoded ID < cur ID, discard until we catch up
9721047
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
9731048
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
974-
response.errors <- dead
1049+
response.handle(nil, dead)
9751050
continue
9761051
}
9771052

@@ -980,11 +1055,11 @@ func (b *Broker) responseReceiver() {
9801055
b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
9811056
if err != nil {
9821057
dead = err
983-
response.errors <- err
1058+
response.handle(nil, err)
9841059
continue
9851060
}
9861061

987-
response.packets <- buf
1062+
response.handle(buf, nil)
9881063
}
9891064
close(b.done)
9901065
}
@@ -1545,6 +1620,18 @@ func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
15451620
}
15461621
}
15471622

1623+
func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
1624+
if throttleTime != time.Duration(0) {
1625+
DebugLogger.Printf(
1626+
"producer/broker/%d ProduceResponse throttled %v\n",
1627+
b.ID(), throttleTime)
1628+
if b.brokerThrottleTime != nil {
1629+
throttleTimeInMs := int64(throttleTime / time.Millisecond)
1630+
b.brokerThrottleTime.Update(throttleTimeInMs)
1631+
}
1632+
}
1633+
}
1634+
15481635
func (b *Broker) registerMetrics() {
15491636
b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
15501637
b.brokerRequestRate = b.registerMeter("request-rate")

0 commit comments

Comments
 (0)