Skip to content

Commit 30eb221

Browse files
feat(topic): add retention size flag for topic create (#563)
1 parent a9e9c8d commit 30eb221

11 files changed

Lines changed: 165 additions & 35 deletions

File tree

cmd/rhoas/pkged.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/commands/rhoas_kafka_topic_create.adoc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ $ rhoas kafka topic create topic-1
2727
=== Options
2828

2929
....
30-
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
31-
--partitions int32 The number of partitions in the topic (default 1)
32-
--retention-ms int The period of time in milliseconds the broker will retain a partition log before deleting it (default 604800000)
30+
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
31+
--partitions int32 The number of partitions in the topic (default 1)
32+
--retention-bytes int The maximum total size of a partition log segments before old log segments are deleted to free up space (default -1)
33+
--retention-ms int The period of time in milliseconds the broker will retain a partition log before deleting it (default 604800000)
3334
....
3435

3536
=== Options inherited from parent commands

docs/commands/rhoas_kafka_topic_update.adoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ $ rhoas kafka topic update topic-1 --replication-factor
2424
=== Options
2525

2626
....
27-
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
28-
--retention-ms string The period of time in milliseconds the broker will retain a partition log before deleting it
27+
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
28+
--retention-bytes string The maximum total size of a partition log segments before old log segments are deleted to free up space
29+
--retention-ms string The period of time in milliseconds the broker will retain a partition log before deleting it
2930
....
3031

3132
=== Options inherited from parent commands

locales/cmd/kafka/topic/common/active.en.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@ one = 'The number of partitions in the topic'
1111
one = 'The replication factor for the topic'
1212

1313
[kafka.topic.common.input.retentionMs.description]
14-
description = 'Description for the Retention input'
14+
description = 'Description for the Retention period input'
1515
one = 'The period of time in milliseconds the broker will retain a partition log before deleting it'
1616

17+
[kafka.topic.common.input.retentionBytes.description]
18+
description = 'Description for the Retention size input'
19+
one = 'The maximum total size of a partition log segments before old log segments are deleted to free up space'
20+
1721
[kafka.topic.common.error.noKafkaSelected]
1822
one = 'no Kafka instance is currently selected, run "rhoas kafka use" to set the current instance'
1923

@@ -55,6 +59,9 @@ one = 'invalid replication factor {{.ReplicationFactor}}, minimum value is {{.Mi
5559
[kafka.topic.common.validation.retentionPeriod.error.invalid]
5660
one = 'invalid retention period {{.RetentionPeriod}}, minimum value is -1'
5761

62+
[kafka.topic.common.validation.retentionSize.error.invalid]
63+
one = 'invalid retention size {{.RetentionSize}}, minimum value is -1'
64+
5865
[kafka.topic.common.input.name.message]
5966
description = 'title for the Name input'
6067
one = 'Name:'
@@ -70,3 +77,7 @@ one = 'invalid value for partitions: {{.Partition}}'
7077
[kafka.topic.common.input.retentionMs.error.invalid]
7178
description = 'Error message when an invalid retention period is entered'
7279
one = 'invalid value for retention period: {{.RetentionMs}}'
80+
81+
[kafka.topic.common.input.retentionBytes.error.invalid]
82+
description = 'Error message when an invalid retention size is entered'
83+
one = 'invalid value for retention size: {{.RetentionBytes}}'

locales/cmd/kafka/topic/create/active.en.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ one = 'topic "{{.TopicName}}" already exists in Kafka instance "{{.InstanceName}
2828
one = 'Topic "{{.TopicName}}" created in Kafka instance "{{.InstanceName}}":'
2929

3030
[kafka.topic.create.input.retentionMs.message]
31-
description = 'Message for the Retention input'
31+
description = 'Message for the Retention period input'
3232
one = 'Retention Period (ms):'
3333

34+
[kafka.topic.create.input.retentionBytes.message]
35+
description = 'Message for the Retention size input'
36+
one = 'Retention Size (bytes):'
37+
3438
[kafka.topic.create.input.partitions.message]
3539
description = 'Message for the Partitions input'
3640
one = 'Number of Partitions:'

locales/cmd/kafka/topic/update/active.en.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,17 @@ one = 'Nothing to update'
2828
one = 'Topic "{{.TopicName}}" in Kafka instance "{{.InstanceName}}" has been updated'
2929

3030
[kafka.topic.update.input.retentionMs.message]
31-
description = 'Message for the Retention input'
31+
description = 'Message for the Retention period input'
3232
one = 'Retention Period (ms) [optional]:'
3333

3434
[kafka.topic.update.input.retentionMs.help]
35-
description = 'Help for the Retention input'
36-
one = 'The period of time in milliseconds the broker will retain a partition log before deleting it. Leave blank to skip updating this value.'
35+
description = 'Help for the Retention period input'
36+
one = 'The period of time in milliseconds the broker will retain a partition log before deleting it. Leave blank to skip updating this value.'
37+
38+
[kafka.topic.update.input.retentionBytes.message]
39+
description = 'Message for the Retention size input'
40+
one = 'Retention Size (bytes) [optional]:'
41+
42+
[kafka.topic.update.input.retentionBytes.help]
43+
description = 'Help for the Retention size input'
44+
one = 'The maximum total size of a partition log segments before old log segments are deleted to free up space. Leave blank to skip updating this value.'

pkg/cmd/kafka/topic/create/create.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@ import (
2929

3030
const (
3131
defaultRetentionPeriodMS = 604800000
32+
defaultRetentionSize = -1
3233
)
3334

3435
type Options struct {
35-
topicName string
36-
partitions int32
37-
retentionMs int
38-
kafkaID string
39-
outputFormat string
40-
interactive bool
36+
topicName string
37+
partitions int32
38+
retentionMs int
39+
retentionBytes int
40+
kafkaID string
41+
outputFormat string
42+
interactive bool
4143

4244
IO *iostreams.IOStreams
4345
Config config.IConfig
@@ -90,6 +92,10 @@ func NewCreateTopicCommand(f *factory.Factory) *cobra.Command {
9092
if err = topicutil.ValidateMessageRetentionPeriod(opts.retentionMs); err != nil {
9193
return err
9294
}
95+
96+
if err = topicutil.ValidateMessageRetentionSize(opts.retentionBytes); err != nil {
97+
return err
98+
}
9399
}
94100

95101
if opts.kafkaID != "" {
@@ -116,6 +122,7 @@ func NewCreateTopicCommand(f *factory.Factory) *cobra.Command {
116122
}))
117123
cmd.Flags().Int32Var(&opts.partitions, "partitions", 1, localizer.MustLocalizeFromID("kafka.topic.common.input.partitions.description"))
118124
cmd.Flags().IntVar(&opts.retentionMs, "retention-ms", defaultRetentionPeriodMS, localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"))
125+
cmd.Flags().IntVar(&opts.retentionBytes, "retention-bytes", defaultRetentionSize, localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"))
119126

120127
return cmd
121128
}
@@ -268,13 +275,24 @@ func runInteractivePrompt(opts *Options) (err error) {
268275
return err
269276
}
270277

271-
retentionPrompt := &survey.Input{
278+
retentionMsPrompt := &survey.Input{
272279
Message: localizer.MustLocalizeFromID("kafka.topic.create.input.retentionMs.message"),
273280
Help: localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"),
274-
Default: fmt.Sprintf("%v", defaultRetentionPeriodMS),
281+
Default: strconv.Itoa(defaultRetentionPeriodMS),
282+
}
283+
284+
err = survey.AskOne(retentionMsPrompt, &opts.retentionMs, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
285+
if err != nil {
286+
return err
287+
}
288+
289+
retentionBytesPrompt := &survey.Input{
290+
Message: localizer.MustLocalizeFromID("kafka.topic.create.input.retentionBytes.message"),
291+
Help: localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"),
292+
Default: strconv.Itoa(defaultRetentionSize),
275293
}
276294

277-
err = survey.AskOne(retentionPrompt, &opts.retentionMs, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
295+
err = survey.AskOne(retentionBytesPrompt, &opts.retentionBytes, survey.WithValidator(topicutil.ValidateMessageRetentionSize))
278296
if err != nil {
279297
return err
280298
}
@@ -284,8 +302,10 @@ func runInteractivePrompt(opts *Options) (err error) {
284302

285303
func createConfigEntries(opts *Options) *[]strimziadminclient.ConfigEntry {
286304
retentionMsStr := strconv.Itoa(opts.retentionMs)
305+
retentionBytesStr := strconv.Itoa(opts.retentionBytes)
287306
configEntryMap := map[string]*string{
288-
topicutil.RetentionMsKey: &retentionMsStr,
307+
topicutil.RetentionMsKey: &retentionMsStr,
308+
topicutil.RetentionSizeKey: &retentionBytesStr,
289309
}
290310
return topicutil.CreateConfigEntries(configEntryMap)
291311
}

pkg/cmd/kafka/topic/list/list.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
89
topicutil "github.com/redhat-developer/app-services-cli/pkg/kafka/topic"
910

1011
"github.com/redhat-developer/app-services-cli/internal/localizer"
@@ -37,7 +38,7 @@ type topicRow struct {
3738
Name string `json:"name,omitempty" header:"Name"`
3839
PartitionsCount int `json:"partitions_count,omitempty" header:"Partitions"`
3940
RetentionTime string `json:"retention.ms,omitempty" header:"Retention time (ms)"`
40-
RetentionSize string `json:"retention.bytes,omitempty" header:"Retention size"`
41+
RetentionSize string `json:"retention.bytes,omitempty" header:"Retention size (bytes)"`
4142
}
4243

4344
// NewListTopicCommand gets a new command for getting kafkas.

pkg/cmd/kafka/topic/update/update.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@ import (
2929
)
3030

3131
var (
32-
partitionCount int32
33-
retentionPeriodMs int
32+
partitionCount int32
33+
retentionPeriodMs int
34+
retentionSizeBytes int
3435
)
3536

3637
type Options struct {
37-
topicName string
38-
partitionsStr string
39-
retentionMsStr string
40-
kafkaID string
41-
outputFormat string
42-
interactive bool
38+
topicName string
39+
partitionsStr string
40+
retentionMsStr string
41+
retentionBytesStr string
42+
kafkaID string
43+
outputFormat string
44+
interactive bool
4345

4446
IO *iostreams.IOStreams
4547
Config config.IConfig
@@ -81,14 +83,14 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
8183
return cmdutil.FilterValidTopicNameArgs(f, cfg.Services.Kafka.ClusterID, searchName)
8284
},
8385
RunE: func(cmd *cobra.Command, args []string) (err error) {
84-
if !opts.IO.CanPrompt() && opts.retentionMsStr == "" && opts.partitionsStr == "" {
86+
if !opts.IO.CanPrompt() && opts.retentionMsStr == "" && opts.partitionsStr == "" && opts.retentionBytesStr == "" {
8587
return fmt.Errorf(localizer.MustLocalize(&localizer.Config{
8688
MessageID: "argument.error.requiredWhenNonInteractive",
8789
TemplateData: map[string]interface{}{
8890
"Argument": "Name",
8991
},
9092
}))
91-
} else if opts.retentionMsStr == "" && opts.partitionsStr == "" {
93+
} else if opts.retentionMsStr == "" && opts.partitionsStr == "" && opts.retentionBytesStr == "" {
9294
opts.interactive = true
9395
}
9496

@@ -102,7 +104,7 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
102104
return err
103105
}
104106

105-
if opts.retentionMsStr == "" && opts.partitionsStr == "" {
107+
if opts.retentionMsStr == "" && opts.partitionsStr == "" && opts.retentionBytesStr == "" {
106108
logger.Info(localizer.MustLocalizeFromID("kafka.topic.update.log.info.nothingToUpdate"))
107109
return nil
108110
}
@@ -140,6 +142,17 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
140142
}
141143
}
142144

145+
if opts.retentionBytesStr != "" {
146+
retentionSizeBytes, err = topicutil.ConvertRetentionBytesToInt(opts.retentionBytesStr)
147+
if err != nil {
148+
return err
149+
}
150+
151+
if err = topicutil.ValidateMessageRetentionSize(retentionSizeBytes); err != nil {
152+
return err
153+
}
154+
}
155+
143156
cfg, err := opts.Config.Load()
144157
if err != nil {
145158
return err
@@ -159,6 +172,7 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
159172
MessageID: "kafka.topic.common.flag.output.description",
160173
}))
161174
cmd.Flags().StringVar(&opts.retentionMsStr, "retention-ms", "", localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"))
175+
cmd.Flags().StringVar(&opts.retentionBytesStr, "retention-bytes", "", localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"))
162176

163177
return cmd
164178
}
@@ -179,6 +193,13 @@ func runCmd(opts *Options) error {
179193
return err
180194
}
181195
}
196+
197+
if opts.retentionBytesStr != "" {
198+
retentionSizeBytes, err = topicutil.ConvertRetentionBytesToInt(opts.retentionBytesStr)
199+
if err != nil {
200+
return err
201+
}
202+
}
182203
}
183204

184205
conn, err := opts.Connection(connection.DefaultConfigRequireMasAuth)
@@ -227,6 +248,11 @@ func runCmd(opts *Options) error {
227248
configEntryMap[topicutil.RetentionMsKey] = &opts.retentionMsStr
228249
}
229250

251+
if opts.retentionBytesStr != "" {
252+
needsUpdate = true
253+
configEntryMap[topicutil.RetentionSizeKey] = &opts.retentionBytesStr
254+
}
255+
230256
if !needsUpdate {
231257
logger.Info(localizer.MustLocalizeFromID("kafka.topic.update.log.info.nothingToUpdate"))
232258
return nil
@@ -319,12 +345,22 @@ func runInteractivePrompt(opts *Options) (err error) {
319345

320346
logger.Debug(localizer.MustLocalizeFromID("common.log.debug.startingInteractivePrompt"))
321347

322-
retentionPrompt := &survey.Input{
348+
retentionMsPrompt := &survey.Input{
323349
Message: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionMs.message"),
324350
Help: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionMs.help"),
325351
}
326352

327-
err = survey.AskOne(retentionPrompt, &opts.retentionMsStr, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
353+
err = survey.AskOne(retentionMsPrompt, &opts.retentionMsStr, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
354+
if err != nil {
355+
return err
356+
}
357+
358+
retentionBytesPrompt := &survey.Input{
359+
Message: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionBytes.message"),
360+
Help: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionBytes.help"),
361+
}
362+
363+
err = survey.AskOne(retentionBytesPrompt, &opts.retentionBytesStr, survey.WithValidator(topicutil.ValidateMessageRetentionSize))
328364
if err != nil {
329365
return err
330366
}

pkg/kafka/topic/util.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,19 @@ func ConvertRetentionMsToInt(retentionMsStr string) (int, error) {
5858

5959
return retentionMsInt, nil
6060
}
61+
62+
// ConvertRetentionBytesToInt converts the value from "retention-bytes" to int
63+
func ConvertRetentionBytesToInt(retentionBytesStr string) (int, error) {
64+
retentionMsInt, err := strconv.Atoi(retentionBytesStr)
65+
66+
if err != nil {
67+
return 0, errors.New(localizer.MustLocalize(&localizer.Config{
68+
MessageID: "kafka.topic.common.input.retentionSize.error.invalid",
69+
TemplateData: map[string]interface{}{
70+
"RetentionBytes": retentionBytesStr,
71+
},
72+
}))
73+
}
74+
75+
return retentionMsInt, nil
76+
}

0 commit comments

Comments
 (0)