Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *DeleteAclsRequest) version() int16 {
return int16(d.Version)
}

func (c *DeleteAclsRequest) headerVersion() int16 {
func (d *DeleteAclsRequest) headerVersion() int16 {
return 1
}

Expand Down
8 changes: 4 additions & 4 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ func isErrNoController(err error) bool {
}

// retryOnError will repeatedly call the given (error-returning) func in the
// case that its response is non-nil and retriable (as determined by the
// provided retriable func) up to the maximum number of tries permitted by
// case that its response is non-nil and retryable (as determined by the
// provided retryable func) up to the maximum number of tries permitted by
// the admin client configuration
func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
var err error
for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
err = fn()
if err == nil || !retriable(err) {
if err == nil || !retryable(err) {
return err
}
Logger.Printf(
Expand Down
30 changes: 14 additions & 16 deletions decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,32 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
case CompressionNone:
return data, nil
case CompressionGZIP:
var (
err error
reader *gzip.Reader
readerIntf = gzipReaderPool.Get()
)
if readerIntf != nil {
reader = readerIntf.(*gzip.Reader)
} else {
var err error
reader, ok := gzipReaderPool.Get().(*gzip.Reader)
if !ok {
reader, err = gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
} else {
err = reader.Reset(bytes.NewReader(data))
}

defer gzipReaderPool.Put(reader)

if err := reader.Reset(bytes.NewReader(data)); err != nil {
if err != nil {
return nil, err
}

defer gzipReaderPool.Put(reader)

return ioutil.ReadAll(reader)
case CompressionSnappy:
return snappy.Decode(data)
case CompressionLZ4:
reader := lz4ReaderPool.Get().(*lz4.Reader)
reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
if !ok {
reader = lz4.NewReader(bytes.NewReader(data))
} else {
reader.Reset(bytes.NewReader(data))
}
defer lz4ReaderPool.Put(reader)

reader.Reset(bytes.NewReader(data))
return ioutil.ReadAll(reader)
case CompressionZSTD:
return zstdDecompress(nil, data)
Expand Down
4 changes: 2 additions & 2 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
return realEnc.raw, nil
}

// Decoder is the interface that wraps the basic Decode method.
// decoder is the interface that wraps the basic Decode method.
// Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
type decoder interface {
decode(pd packetDecoder) error
Expand All @@ -55,7 +55,7 @@ type versionedDecoder interface {
decode(pd packetDecoder, version int16) error
}

// Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder) error {
if buf == nil {
Expand Down
10 changes: 2 additions & 8 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,12 @@ func (m *Message) decode(pd packetDecoder) (err error) {
// for future metrics about the compression ratio in fetch requests
m.compressedSize = len(m.Value)

switch m.Codec {
case CompressionNone:
// nothing to do
default:
if m.Value == nil {
break
}

if m.Value != nil && m.Codec != CompressionNone {
m.Value, err = decompress(m.Codec, m.Value)
if err != nil {
return err
}

if err := m.decodeSet(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type RequestNotifierFunc func(bytesRead, bytesWritten int)
// to facilitate testing of higher level or specialized consumers and producers
// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
// but rather provides a facility to do that. It takes care of the TCP
// transport, request unmarshaling, response marshaling, and makes it the test
// transport, request unmarshalling, response marshalling, and makes it the test
// writer responsibility to program correct according to the Kafka API protocol
// MockBroker behaviour.
//
Expand Down
2 changes: 1 addition & 1 deletion partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PartitionerConstructor func(topic string) Partitioner

type manualPartitioner struct{}

// HashPartitionOption lets you modify default values of the partitioner
// HashPartitionerOption lets you modify default values of the partitioner
type HashPartitionerOption func(*hashPartitioner)

// WithAbsFirst means that the partitioner handles absolute values
Expand Down