Skip to content

Commit bd79dfa

Browse files
committed
fix: update constructors to use getMaxAPIVersionForKafkaVersion
Signed-off-by: Giorgio Pellero <[email protected]>
1 parent b7a9db2 commit bd79dfa

File tree

4 files changed

+14
-57
lines changed

4 files changed

+14
-57
lines changed

create_topics_request.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,7 @@ func NewCreateTopicsRequest(version KafkaVersion, topicDetails map[string]*Topic
2020
r := &CreateTopicsRequest{
2121
TopicDetails: topicDetails,
2222
Timeout: timeout,
23-
}
24-
if version.IsAtLeast(V2_0_0_0) {
25-
r.Version = 3
26-
} else if version.IsAtLeast(V0_11_0_0) {
27-
r.Version = 2
28-
} else if version.IsAtLeast(V0_10_2_0) {
29-
r.Version = 1
23+
Version: getMaxAPIVersionForKafkaVersion(APIKeyCreateTopics, version),
3024
}
3125
return r
3226
}

delete_topics_request.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,7 @@ func NewDeleteTopicsRequest(version KafkaVersion, topics []string, timeout time.
1212
d := &DeleteTopicsRequest{
1313
Topics: topics,
1414
Timeout: timeout,
15-
}
16-
if version.IsAtLeast(V2_1_0_0) {
17-
d.Version = 3
18-
} else if version.IsAtLeast(V2_0_0_0) {
19-
d.Version = 2
20-
} else if version.IsAtLeast(V0_11_0_0) {
21-
d.Version = 1
15+
Version: getMaxAPIVersionForKafkaVersion(APIKeyDeleteTopics, version),
2216
}
2317
return d
2418
}

metadata_request.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,9 @@ type MetadataRequest struct {
2222
}
2323

2424
func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest {
25-
m := &MetadataRequest{Topics: topics}
26-
if version.IsAtLeast(V2_8_0_0) {
27-
m.Version = 10
28-
} else if version.IsAtLeast(V2_4_0_0) {
29-
m.Version = 9
30-
} else if version.IsAtLeast(V2_4_0_0) {
31-
m.Version = 8
32-
} else if version.IsAtLeast(V2_1_0_0) {
33-
m.Version = 7
34-
} else if version.IsAtLeast(V2_0_0_0) {
35-
m.Version = 6
36-
} else if version.IsAtLeast(V1_0_0_0) {
37-
m.Version = 5
38-
} else if version.IsAtLeast(V0_11_0_0) {
39-
m.Version = 4
40-
} else if version.IsAtLeast(V0_10_1_0) {
41-
m.Version = 2
42-
} else if version.IsAtLeast(V0_10_0_0) {
43-
m.Version = 1
25+
m := &MetadataRequest{
26+
Topics: topics,
27+
Version: getMaxAPIVersionForKafkaVersion(APIKeyMetadata, version),
4428
}
4529
return m
4630
}

offset_fetch_request.go

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,18 @@ func NewOffsetFetchRequest(
1212
group string,
1313
partitions map[string][]int32,
1414
) *OffsetFetchRequest {
15+
// Version history:
16+
// - Version 7: Adding the require stable flag (V2_5_0_0+)
17+
// - Version 6: First flexible version (V2_4_0_0+)
18+
// - Versions 3, 4, and 5: Same as version 2 (V2_1_0_0+)
19+
// - Version 2: Request can contain null topics array to indicate offsets for all topics
20+
// should be fetched. Also returns top level error code for group or coordinator level errors (V0_10_2_0+)
21+
// - Version 1: Broker supports fetching offsets from internal __consumer_offsets topic (V0_8_2_0+)
22+
// - Version 0: Request read offsets from ZK
1523
request := &OffsetFetchRequest{
1624
ConsumerGroup: group,
1725
partitions: partitions,
18-
}
19-
if version.IsAtLeast(V2_5_0_0) {
20-
// Version 7 is adding the require stable flag.
21-
request.Version = 7
22-
} else if version.IsAtLeast(V2_4_0_0) {
23-
// Version 6 is the first flexible version.
24-
request.Version = 6
25-
} else if version.IsAtLeast(V2_1_0_0) {
26-
// Version 3, 4, and 5 are the same as version 2.
27-
request.Version = 5
28-
} else if version.IsAtLeast(V2_0_0_0) {
29-
request.Version = 4
30-
} else if version.IsAtLeast(V0_11_0_0) {
31-
request.Version = 3
32-
} else if version.IsAtLeast(V0_10_2_0) {
33-
// Starting in version 2, the request can contain a null topics array to indicate that offsets
34-
// for all topics should be fetched. It also returns a top level error code
35-
// for group or coordinator level errors.
36-
request.Version = 2
37-
} else if version.IsAtLeast(V0_8_2_0) {
38-
// In version 0, the request read offsets from ZK.
39-
//
40-
// Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
41-
request.Version = 1
26+
Version: getMaxAPIVersionForKafkaVersion(APIKeyOffsetFetch, version),
4227
}
4328

4429
return request

0 commit comments

Comments
 (0)