Skip to content

Commit fab646d

Browse files
committed
feat: support ApiVersionsRequest V3 protocol
Add support for the ApiVersionsRequest V3 protocol which includes ClientSoftwareName and ClientSoftwareVersion as specified by KIP-511 Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
1 parent 12c9785 commit fab646d

6 files changed

Lines changed: 245 additions & 64 deletions

api_versions_request.go

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,69 @@
11
package sarama
22

3-
// ApiVersionsRequest ...
4-
type ApiVersionsRequest struct{}
3+
const defaultClientSoftwareName = "sarama"
4+
5+
type ApiVersionsRequest struct {
6+
// Version defines the protocol version to use for encode and decode
7+
Version int16
8+
// ClientSoftwareName contains the name of the client.
9+
ClientSoftwareName string
10+
// ClientSoftwareVersion contains the version of the client.
11+
ClientSoftwareVersion string
12+
}
13+
14+
func (r *ApiVersionsRequest) encode(pe packetEncoder) (err error) {
15+
if r.Version >= 3 {
16+
if err := pe.putCompactString(r.ClientSoftwareName); err != nil {
17+
return err
18+
}
19+
if err := pe.putCompactString(r.ClientSoftwareVersion); err != nil {
20+
return err
21+
}
22+
pe.putEmptyTaggedFieldArray()
23+
}
524

6-
func (a *ApiVersionsRequest) encode(pe packetEncoder) error {
725
return nil
826
}
927

10-
func (a *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
28+
func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
29+
r.Version = version
30+
if r.Version >= 3 {
31+
if r.ClientSoftwareName, err = pd.getCompactString(); err != nil {
32+
return err
33+
}
34+
if r.ClientSoftwareVersion, err = pd.getCompactString(); err != nil {
35+
return err
36+
}
37+
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
38+
return err
39+
}
40+
}
41+
1142
return nil
1243
}
1344

14-
func (a *ApiVersionsRequest) key() int16 {
45+
func (r *ApiVersionsRequest) key() int16 {
1546
return 18
1647
}
1748

18-
func (a *ApiVersionsRequest) version() int16 {
19-
return 0
49+
func (r *ApiVersionsRequest) version() int16 {
50+
return r.Version
2051
}
2152

22-
func (a *ApiVersionsRequest) headerVersion() int16 {
53+
func (r *ApiVersionsRequest) headerVersion() int16 {
54+
if r.Version >= 3 {
55+
return 2
56+
}
2357
return 1
2458
}
2559

26-
func (a *ApiVersionsRequest) requiredVersion() KafkaVersion {
27-
return V0_10_0_0
60+
func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
61+
switch r.Version {
62+
case 0:
63+
return V0_10_0_0
64+
case 3:
65+
return V2_4_0_0
66+
default:
67+
return V0_10_0_0
68+
}
2869
}

api_versions_request_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,25 @@ package sarama
22

33
import "testing"
44

5-
var apiVersionRequest []byte
5+
var (
6+
apiVersionRequest []byte
7+
8+
apiVersionRequestV3 = []byte{
9+
0x07, 's', 'a', 'r', 'a', 'm', 'a',
10+
0x07, '0', '.', '1', '0', '.', '0',
11+
0x00,
12+
}
13+
)
614

715
func TestApiVersionsRequest(t *testing.T) {
816
request := new(ApiVersionsRequest)
917
testRequest(t, "basic", request, apiVersionRequest)
1018
}
19+
20+
func TestApiVersionsRequestV3(t *testing.T) {
21+
request := new(ApiVersionsRequest)
22+
request.Version = 3
23+
request.ClientSoftwareName = "sarama"
24+
request.ClientSoftwareVersion = "0.10.0"
25+
testRequest(t, "v3", request, apiVersionRequestV3)
26+
}

api_versions_response.go

Lines changed: 100 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,130 @@
11
package sarama
22

3-
// ApiVersionsResponseBlock is an api version response block type
4-
type ApiVersionsResponseBlock struct {
5-
ApiKey int16
3+
// ApiVersionsResponseKey contains the APIs supported by the broker.
4+
type ApiVersionsResponseKey struct {
5+
// Version defines the protocol version to use for encode and decode
6+
Version int16
7+
// ApiKey contains the API index.
8+
ApiKey int16
9+
// MinVersion contains the minimum supported version, inclusive.
610
MinVersion int16
11+
// MaxVersion contains the maximum supported version, inclusive.
712
MaxVersion int16
813
}
914

10-
func (b *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
11-
pe.putInt16(b.ApiKey)
12-
pe.putInt16(b.MinVersion)
13-
pe.putInt16(b.MaxVersion)
15+
func (a *ApiVersionsResponseKey) encode(pe packetEncoder, version int16) (err error) {
16+
a.Version = version
17+
pe.putInt16(a.ApiKey)
18+
19+
pe.putInt16(a.MinVersion)
20+
21+
pe.putInt16(a.MaxVersion)
22+
23+
if version >= 3 {
24+
pe.putEmptyTaggedFieldArray()
25+
}
26+
1427
return nil
1528
}
1629

17-
func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
18-
var err error
19-
20-
if b.ApiKey, err = pd.getInt16(); err != nil {
30+
func (a *ApiVersionsResponseKey) decode(pd packetDecoder, version int16) (err error) {
31+
a.Version = version
32+
if a.ApiKey, err = pd.getInt16(); err != nil {
2133
return err
2234
}
2335

24-
if b.MinVersion, err = pd.getInt16(); err != nil {
36+
if a.MinVersion, err = pd.getInt16(); err != nil {
2537
return err
2638
}
2739

28-
if b.MaxVersion, err = pd.getInt16(); err != nil {
40+
if a.MaxVersion, err = pd.getInt16(); err != nil {
2941
return err
3042
}
3143

44+
if version >= 3 {
45+
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
46+
return err
47+
}
48+
}
49+
3250
return nil
3351
}
3452

35-
// ApiVersionsResponse is an api version response type
3653
type ApiVersionsResponse struct {
37-
Err KError
38-
ApiVersions []*ApiVersionsResponseBlock
54+
// Version defines the protocol version to use for encode and decode
55+
Version int16
56+
// ErrorCode contains the top-level error code.
57+
ErrorCode int16
58+
// ApiKeys contains the APIs supported by the broker.
59+
ApiKeys []ApiVersionsResponseKey
60+
// ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
61+
ThrottleTimeMs int32
3962
}
4063

41-
func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
42-
pe.putInt16(int16(r.Err))
43-
if err := pe.putArrayLength(len(r.ApiVersions)); err != nil {
44-
return err
64+
func (r *ApiVersionsResponse) encode(pe packetEncoder) (err error) {
65+
pe.putInt16(r.ErrorCode)
66+
67+
if r.Version >= 3 {
68+
pe.putCompactArrayLength(len(r.ApiKeys))
69+
} else {
70+
if err := pe.putArrayLength(len(r.ApiKeys)); err != nil {
71+
return err
72+
}
4573
}
46-
for _, apiVersion := range r.ApiVersions {
47-
if err := apiVersion.encode(pe); err != nil {
74+
for _, block := range r.ApiKeys {
75+
if err := block.encode(pe, r.Version); err != nil {
4876
return err
4977
}
5078
}
79+
80+
if r.Version >= 1 {
81+
pe.putInt32(r.ThrottleTimeMs)
82+
}
83+
84+
if r.Version >= 3 {
85+
pe.putEmptyTaggedFieldArray()
86+
}
87+
5188
return nil
5289
}
5390

54-
func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error {
55-
kerr, err := pd.getInt16()
56-
if err != nil {
91+
func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) (err error) {
92+
r.Version = version
93+
if r.ErrorCode, err = pd.getInt16(); err != nil {
5794
return err
5895
}
5996

60-
r.Err = KError(kerr)
97+
var numApiKeys int
98+
if r.Version >= 3 {
99+
numApiKeys, err = pd.getCompactArrayLength()
100+
if err != nil {
101+
return err
102+
}
103+
} else {
104+
numApiKeys, err = pd.getArrayLength()
105+
if err != nil {
106+
return err
107+
}
108+
}
109+
r.ApiKeys = make([]ApiVersionsResponseKey, numApiKeys)
110+
for i := 0; i < numApiKeys; i++ {
111+
var block ApiVersionsResponseKey
112+
if err = block.decode(pd, r.Version); err != nil {
113+
return err
114+
}
115+
r.ApiKeys[i] = block
116+
}
61117

62-
numBlocks, err := pd.getArrayLength()
63-
if err != nil {
64-
return err
118+
if r.Version >= 1 {
119+
if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
120+
return err
121+
}
65122
}
66123

67-
r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks)
68-
for i := 0; i < numBlocks; i++ {
69-
block := new(ApiVersionsResponseBlock)
70-
if err := block.decode(pd); err != nil {
124+
if r.Version >= 3 {
125+
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
71126
return err
72127
}
73-
r.ApiVersions[i] = block
74128
}
75129

76130
return nil
@@ -81,13 +135,22 @@ func (r *ApiVersionsResponse) key() int16 {
81135
}
82136

83137
func (r *ApiVersionsResponse) version() int16 {
84-
return 0
138+
return r.Version
85139
}
86140

87-
func (a *ApiVersionsResponse) headerVersion() int16 {
141+
func (r *ApiVersionsResponse) headerVersion() int16 {
142+
// ApiVersionsResponse always includes a v0 header.
143+
// See KIP-511 for details
88144
return 0
89145
}
90146

91147
func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
92-
return V0_10_0_0
148+
switch r.Version {
149+
case 0:
150+
return V0_10_0_0
151+
case 3:
152+
return V2_4_0_0
153+
default:
154+
return V0_10_0_0
155+
}
93156
}

api_versions_response_test.go

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,58 @@ package sarama
22

33
import "testing"
44

5-
var apiVersionResponse = []byte{
6-
0x00, 0x00,
7-
0x00, 0x00, 0x00, 0x01,
8-
0x00, 0x03,
9-
0x00, 0x02,
10-
0x00, 0x01,
11-
}
5+
var (
6+
apiVersionResponse = []byte{
7+
0x00, 0x00,
8+
0x00, 0x00, 0x00, 0x01,
9+
0x00, 0x03,
10+
0x00, 0x02,
11+
0x00, 0x01,
12+
}
13+
14+
apiVersionResponseV3 = []byte{
15+
0x00, 0x00, // no error
16+
0x02, // compact array length 1
17+
0x00, 0x03,
18+
0x00, 0x02,
19+
0x00, 0x01,
20+
0x00, // tagged fields
21+
0x00, 0x00, 0x00, 0x00, // throttle time
22+
0x00, // tagged fields
23+
}
24+
)
1225

1326
func TestApiVersionsResponse(t *testing.T) {
1427
response := new(ApiVersionsResponse)
1528
testVersionDecodable(t, "no error", response, apiVersionResponse, 0)
16-
if response.Err != ErrNoError {
17-
t.Error("Decoding error failed: no error expected but found", response.Err)
29+
if response.ErrorCode != int16(ErrNoError) {
30+
t.Error("Decoding error failed: no error expected but found", response.ErrorCode)
31+
}
32+
if response.ApiKeys[0].ApiKey != 0x03 {
33+
t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey)
34+
}
35+
if response.ApiKeys[0].MinVersion != 0x02 {
36+
t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion)
37+
}
38+
if response.ApiKeys[0].MaxVersion != 0x01 {
39+
t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion)
40+
}
41+
}
42+
43+
func TestApiVersionsResponseV3(t *testing.T) {
44+
response := new(ApiVersionsResponse)
45+
response.Version = 3
46+
testVersionDecodable(t, "no error", response, apiVersionResponseV3, 3)
47+
if response.ErrorCode != int16(ErrNoError) {
48+
t.Error("Decoding error failed: no error expected but found", response.ErrorCode)
1849
}
19-
if response.ApiVersions[0].ApiKey != 0x03 {
20-
t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey)
50+
if response.ApiKeys[0].ApiKey != 0x03 {
51+
t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey)
2152
}
22-
if response.ApiVersions[0].MinVersion != 0x02 {
23-
t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion)
53+
if response.ApiKeys[0].MinVersion != 0x02 {
54+
t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion)
2455
}
25-
if response.ApiVersions[0].MaxVersion != 0x01 {
26-
t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion)
56+
if response.ApiKeys[0].MaxVersion != 0x01 {
57+
t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion)
2758
}
2859
}

0 commit comments

Comments
 (0)