Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/rhoas/pkged.go

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions docs/commands/rhoas_kafka_topic_create.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ $ rhoas kafka topic create topic-1
=== Options

....
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
--partitions int32 The number of partitions in the topic (default 1)
--retention-ms int The period of time in milliseconds the broker will retain a partition log before deleting it (default 604800000)
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
--partitions int32 The number of partitions in the topic (default 1)
--retention-bytes int The maximum total size of a partition log segments before old log segments are deleted to free up space (default -1)
--retention-ms int The period of time in milliseconds the broker will retain a partition log before deleting it (default 604800000)
....

=== Options inherited from parent commands
Expand Down
5 changes: 3 additions & 2 deletions docs/commands/rhoas_kafka_topic_update.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ $ rhoas kafka topic update topic-1 --replication-factor
=== Options

....
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
--retention-ms string The period of time in milliseconds the broker will retain a partition log before deleting it
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
--retention-bytes string The maximum total size of a partition log segments before old log segments are deleted to free up space
--retention-ms string The period of time in milliseconds the broker will retain a partition log before deleting it
....

=== Options inherited from parent commands
Expand Down
13 changes: 12 additions & 1 deletion locales/cmd/kafka/topic/common/active.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ one = 'The number of partitions in the topic'
one = 'The replication factor for the topic'

[kafka.topic.common.input.retentionMs.description]
description = 'Description for the Retention input'
description = 'Description for the Retention period input'
one = 'The period of time in milliseconds the broker will retain a partition log before deleting it'

[kafka.topic.common.input.retentionBytes.description]
description = 'Description for the Retention size input'
one = 'The maximum total size of a partition log segments before old log segments are deleted to free up space'

[kafka.topic.common.error.noKafkaSelected]
one = 'no Kafka instance is currently selected, run "rhoas kafka use" to set the current instance'

Expand Down Expand Up @@ -55,6 +59,9 @@ one = 'invalid replication factor {{.ReplicationFactor}}, minimum value is {{.Mi
[kafka.topic.common.validation.retentionPeriod.error.invalid]
one = 'invalid retention period {{.RetentionPeriod}}, minimum value is -1'

[kafka.topic.common.validation.retentionSize.error.invalid]
one = 'invalid retention size {{.RetentionSize}}, minimum value is -1'

[kafka.topic.common.input.name.message]
description = 'title for the Name input'
one = 'Name:'
Expand All @@ -70,3 +77,7 @@ one = 'invalid value for partitions: {{.Partition}}'
[kafka.topic.common.input.retentionMs.error.invalid]
description = 'Error message when an invalid retention period is entered'
one = 'invalid value for retention period: {{.RetentionMs}}'

[kafka.topic.common.input.retentionBytes.error.invalid]
description = 'Error message when an invalid retention size is entered'
one = 'invalid value for retention size: {{.RetentionBytes}}'
6 changes: 5 additions & 1 deletion locales/cmd/kafka/topic/create/active.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ one = 'topic "{{.TopicName}}" already exists in Kafka instance "{{.InstanceName}
one = 'Topic "{{.TopicName}}" created in Kafka instance "{{.InstanceName}}":'

[kafka.topic.create.input.retentionMs.message]
description = 'Message for the Retention input'
description = 'Message for the Retention period input'
one = 'Retention Period (ms):'

[kafka.topic.create.input.retentionBytes.message]
description = 'Message for the Retention size input'
one = 'Retention Size (bytes):'

[kafka.topic.create.input.partitions.message]
description = 'Message for the Partitions input'
one = 'Number of Partitions:'
14 changes: 11 additions & 3 deletions locales/cmd/kafka/topic/update/active.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,17 @@ one = 'Nothing to update'
one = 'Topic "{{.TopicName}}" in Kafka instance "{{.InstanceName}}" has been updated'

[kafka.topic.update.input.retentionMs.message]
description = 'Message for the Retention input'
description = 'Message for the Retention period input'
one = 'Retention Period (ms) [optional]:'

[kafka.topic.update.input.retentionMs.help]
description = 'Help for the Retention input'
one = 'The period of time in milliseconds the broker will retain a partition log before deleting it. Leave blank to skip updating this value.'
description = 'Help for the Retention period input'
one = 'The period of time in milliseconds the broker will retain a partition log before deleting it. Leave blank to skip updating this value.'

[kafka.topic.update.input.retentionBytes.message]
description = 'Message for the Retention size input'
one = 'Retention Size (bytes) [optional]:'

[kafka.topic.update.input.retentionBytes.help]
description = 'Help for the Retention size input'
one = 'The maximum total size of a partition log segments before old log segments are deleted to free up space. Leave blank to skip updating this value.'
40 changes: 30 additions & 10 deletions pkg/cmd/kafka/topic/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import (

const (
defaultRetentionPeriodMS = 604800000
defaultRetentionSize = -1
)

type Options struct {
topicName string
partitions int32
retentionMs int
kafkaID string
outputFormat string
interactive bool
topicName string
partitions int32
retentionMs int
retentionBytes int
kafkaID string
outputFormat string
interactive bool

IO *iostreams.IOStreams
Config config.IConfig
Expand Down Expand Up @@ -90,6 +92,10 @@ func NewCreateTopicCommand(f *factory.Factory) *cobra.Command {
if err = topicutil.ValidateMessageRetentionPeriod(opts.retentionMs); err != nil {
return err
}

if err = topicutil.ValidateMessageRetentionSize(opts.retentionBytes); err != nil {
return err
}
}

if opts.kafkaID != "" {
Expand All @@ -116,6 +122,7 @@ func NewCreateTopicCommand(f *factory.Factory) *cobra.Command {
}))
cmd.Flags().Int32Var(&opts.partitions, "partitions", 1, localizer.MustLocalizeFromID("kafka.topic.common.input.partitions.description"))
cmd.Flags().IntVar(&opts.retentionMs, "retention-ms", defaultRetentionPeriodMS, localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"))
cmd.Flags().IntVar(&opts.retentionBytes, "retention-bytes", defaultRetentionSize, localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag value is not being validated anywhere, I can submit invalid values


return cmd
}
Expand Down Expand Up @@ -268,13 +275,24 @@ func runInteractivePrompt(opts *Options) (err error) {
return err
}

retentionPrompt := &survey.Input{
retentionMsPrompt := &survey.Input{
Message: localizer.MustLocalizeFromID("kafka.topic.create.input.retentionMs.message"),
Help: localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"),
Default: fmt.Sprintf("%v", defaultRetentionPeriodMS),
Default: strconv.Itoa(defaultRetentionPeriodMS),
}

err = survey.AskOne(retentionMsPrompt, &opts.retentionMs, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
if err != nil {
return err
}

retentionBytesPrompt := &survey.Input{
Message: localizer.MustLocalizeFromID("kafka.topic.create.input.retentionBytes.message"),
Help: localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"),
Default: strconv.Itoa(defaultRetentionSize),
}

err = survey.AskOne(retentionPrompt, &opts.retentionMs, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
err = survey.AskOne(retentionBytesPrompt, &opts.retentionBytes, survey.WithValidator(topicutil.ValidateMessageRetentionSize))
if err != nil {
return err
}
Expand All @@ -284,8 +302,10 @@ func runInteractivePrompt(opts *Options) (err error) {

func createConfigEntries(opts *Options) *[]strimziadminclient.ConfigEntry {
retentionMsStr := strconv.Itoa(opts.retentionMs)
retentionBytesStr := strconv.Itoa(opts.retentionBytes)
configEntryMap := map[string]*string{
topicutil.RetentionMsKey: &retentionMsStr,
topicutil.RetentionMsKey: &retentionMsStr,
topicutil.RetentionSizeKey: &retentionBytesStr,
}
return topicutil.CreateConfigEntries(configEntryMap)
}
3 changes: 2 additions & 1 deletion pkg/cmd/kafka/topic/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"

topicutil "github.com/redhat-developer/app-services-cli/pkg/kafka/topic"

"github.com/redhat-developer/app-services-cli/internal/localizer"
Expand Down Expand Up @@ -37,7 +38,7 @@ type topicRow struct {
Name string `json:"name,omitempty" header:"Name"`
PartitionsCount int `json:"partitions_count,omitempty" header:"Partitions"`
RetentionTime string `json:"retention.ms,omitempty" header:"Retention time (ms)"`
RetentionSize string `json:"retention.bytes,omitempty" header:"Retention size"`
RetentionSize string `json:"retention.bytes,omitempty" header:"Retention size (bytes)"`
}

// NewListTopicCommand gets a new command for getting kafkas.
Expand Down
62 changes: 49 additions & 13 deletions pkg/cmd/kafka/topic/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ import (
)

var (
partitionCount int32
retentionPeriodMs int
partitionCount int32
retentionPeriodMs int
retentionSizeBytes int
)

type Options struct {
topicName string
partitionsStr string
retentionMsStr string
kafkaID string
outputFormat string
interactive bool
topicName string
partitionsStr string
retentionMsStr string
retentionBytesStr string
kafkaID string
outputFormat string
interactive bool

IO *iostreams.IOStreams
Config config.IConfig
Expand Down Expand Up @@ -81,14 +83,14 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
return cmdutil.FilterValidTopicNameArgs(f, cfg.Services.Kafka.ClusterID, searchName)
},
RunE: func(cmd *cobra.Command, args []string) (err error) {
if !opts.IO.CanPrompt() && opts.retentionMsStr == "" && opts.partitionsStr == "" {
if !opts.IO.CanPrompt() && opts.retentionMsStr == "" && opts.partitionsStr == "" && opts.retentionBytesStr == "" {
return fmt.Errorf(localizer.MustLocalize(&localizer.Config{
MessageID: "argument.error.requiredWhenNonInteractive",
TemplateData: map[string]interface{}{
"Argument": "Name",
},
}))
} else if opts.retentionMsStr == "" && opts.partitionsStr == "" {
} else if opts.retentionMsStr == "" && opts.partitionsStr == "" && opts.retentionBytesStr == "" {
opts.interactive = true
}

Expand All @@ -102,7 +104,7 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
return err
}

if opts.retentionMsStr == "" && opts.partitionsStr == "" {
if opts.retentionMsStr == "" && opts.partitionsStr == "" && opts.retentionBytesStr == "" {
logger.Info(localizer.MustLocalizeFromID("kafka.topic.update.log.info.nothingToUpdate"))
return nil
}
Expand Down Expand Up @@ -140,6 +142,17 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
}
}

if opts.retentionBytesStr != "" {
retentionSizeBytes, err = topicutil.ConvertRetentionBytesToInt(opts.retentionBytesStr)
if err != nil {
return err
}

if err = topicutil.ValidateMessageRetentionSize(retentionSizeBytes); err != nil {
return err
}
}

cfg, err := opts.Config.Load()
if err != nil {
return err
Expand All @@ -159,6 +172,7 @@ func NewUpdateTopicCommand(f *factory.Factory) *cobra.Command {
MessageID: "kafka.topic.common.flag.output.description",
}))
cmd.Flags().StringVar(&opts.retentionMsStr, "retention-ms", "", localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"))
cmd.Flags().StringVar(&opts.retentionBytesStr, "retention-bytes", "", localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"))

return cmd
}
Expand All @@ -179,6 +193,13 @@ func runCmd(opts *Options) error {
return err
}
}

if opts.retentionBytesStr != "" {
retentionSizeBytes, err = topicutil.ConvertRetentionBytesToInt(opts.retentionBytesStr)
if err != nil {
return err
}
}
}

conn, err := opts.Connection(connection.DefaultConfigRequireMasAuth)
Expand Down Expand Up @@ -227,6 +248,11 @@ func runCmd(opts *Options) error {
configEntryMap[topicutil.RetentionMsKey] = &opts.retentionMsStr
}

if opts.retentionBytesStr != "" {
needsUpdate = true
configEntryMap[topicutil.RetentionSizeKey] = &opts.retentionBytesStr
}

if !needsUpdate {
logger.Info(localizer.MustLocalizeFromID("kafka.topic.update.log.info.nothingToUpdate"))
return nil
Expand Down Expand Up @@ -319,12 +345,22 @@ func runInteractivePrompt(opts *Options) (err error) {

logger.Debug(localizer.MustLocalizeFromID("common.log.debug.startingInteractivePrompt"))

retentionPrompt := &survey.Input{
retentionMsPrompt := &survey.Input{
Message: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionMs.message"),
Help: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionMs.help"),
}

err = survey.AskOne(retentionPrompt, &opts.retentionMsStr, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
err = survey.AskOne(retentionMsPrompt, &opts.retentionMsStr, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
if err != nil {
return err
}

retentionBytesPrompt := &survey.Input{
Message: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionBytes.message"),
Help: localizer.MustLocalizeFromID("kafka.topic.update.input.retentionBytes.help"),
}

err = survey.AskOne(retentionBytesPrompt, &opts.retentionBytesStr, survey.WithValidator(topicutil.ValidateMessageRetentionSize))
if err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/kafka/topic/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,19 @@ func ConvertRetentionMsToInt(retentionMsStr string) (int, error) {

return retentionMsInt, nil
}

// ConvertRetentionBytesToInt converts the value from "retention-bytes" to int
func ConvertRetentionBytesToInt(retentionBytesStr string) (int, error) {
retentionMsInt, err := strconv.Atoi(retentionBytesStr)

if err != nil {
return 0, errors.New(localizer.MustLocalize(&localizer.Config{
MessageID: "kafka.topic.common.input.retentionSize.error.invalid",
TemplateData: map[string]interface{}{
"RetentionBytes": retentionBytesStr,
},
}))
}

return retentionMsInt, nil
}
32 changes: 32 additions & 0 deletions pkg/kafka/topic/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,38 @@ func ValidateMessageRetentionPeriod(v interface{}) error {
return nil
}

// ValidateMessageRetentionSize validates the value (bytes) of the retention size
// the valid values can range from [-1,...]
func ValidateMessageRetentionSize(v interface{}) error {
retentionSizeStr := fmt.Sprintf("%v", v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fmt.Sprintf is slow in comparison to other string conversions methods.

https://gist.github.com/evalphobia/caee1602969a640a4530


if retentionSizeStr == "" {
return nil
}

retentionPeriodBytes, err := strconv.Atoi(retentionSizeStr)
if err != nil {
return errors.New(localizer.MustLocalize(&localizer.Config{
MessageID: "common.error.castError",
TemplateData: map[string]interface{}{
"Value": v,
"Type": "int",
},
}))
}

if retentionPeriodBytes < -1 {
return errors.New(localizer.MustLocalize(&localizer.Config{
MessageID: "kafka.topic.common.validation.retentionSize.error.invalid",
TemplateData: map[string]interface{}{
"RetentionSize": retentionPeriodBytes,
},
}))
}

return nil
}

// ValidateNameIsAvailable checks if a topic with the given name already exists
func ValidateNameIsAvailable(api strimziadminclient.DefaultApi, instance string) func(v interface{}) error {
return func(v interface{}) error {
Expand Down