Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/rhoas/pkged.go

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions docs/commands/rhoas_kafka_topic_create.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ $ rhoas kafka topic create topic-1
=== Options

....
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
--partitions int32 The number of partitions in the topic (default 1)
--retention-ms int The period of time in milliseconds the broker will retain a partition log before deleting it (default 604800000)
-o, --output string Format in which to display the Kafka topic. Choose from: "json", "yml", "yaml" (default "json")
--partitions int32 The number of partitions in the topic (default 1)
--retention-bytes int The maximum total size of a partition log segments before old log segments are deleted to free up space (default -1)
--retention-ms int The period of time in milliseconds the broker will retain a partition log before deleting it (default 604800000)
....

=== Options inherited from parent commands
Expand Down
9 changes: 8 additions & 1 deletion locales/cmd/kafka/topic/common/active.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ one = 'The number of partitions in the topic'
one = 'The replication factor for the topic'

[kafka.topic.common.input.retentionMs.description]
description = 'Description for the Retention input'
description = 'Description for the Retention period input'
one = 'The period of time in milliseconds the broker will retain a partition log before deleting it'

[kafka.topic.common.input.retentionBytes.description]
description = 'Description for the Retention size input'
one = 'The maximum total size of a partition log segments before old log segments are deleted to free up space'

[kafka.topic.common.error.noKafkaSelected]
one = 'no Kafka instance is currently selected, run "rhoas kafka use" to set the current instance'

Expand Down Expand Up @@ -55,6 +59,9 @@ one = 'invalid replication factor {{.ReplicationFactor}}, minimum value is {{.Mi
[kafka.topic.common.validation.retentionPeriod.error.invalid]
one = 'invalid retention period {{.RetentionPeriod}}, minimum value is -1'

[kafka.topic.common.validation.retentionSize.error.invalid]
one = 'invalid retention size {{.RetentionSize}}, minimum value is -1'

[kafka.topic.common.input.name.message]
description = 'title for the Name input'
one = 'Name:'
Expand Down
6 changes: 5 additions & 1 deletion locales/cmd/kafka/topic/create/active.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ one = 'topic "{{.TopicName}}" already exists in Kafka instance "{{.InstanceName}
one = 'Topic "{{.TopicName}}" created in Kafka instance "{{.InstanceName}}":'

[kafka.topic.create.input.retentionMs.message]
description = 'Message for the Retention input'
description = 'Message for the Retention period input'
one = 'Retention Period (ms):'

[kafka.topic.create.input.retentionBytes.message]
description = 'Message for the Retention size input'
one = 'Retention Size (bytes):'

[kafka.topic.create.input.partitions.message]
description = 'Message for the Partitions input'
one = 'Number of Partitions:'
34 changes: 25 additions & 9 deletions pkg/cmd/kafka/topic/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import (

const (
defaultRetentionPeriodMS = 604800000
defaultRetentionSize = -1
)

type Options struct {
topicName string
partitions int32
retentionMs int
kafkaID string
outputFormat string
interactive bool
topicName string
partitions int32
retentionMs int
retentionBytes int
kafkaID string
outputFormat string
interactive bool

IO *iostreams.IOStreams
Config config.IConfig
Expand Down Expand Up @@ -116,6 +118,7 @@ func NewCreateTopicCommand(f *factory.Factory) *cobra.Command {
}))
cmd.Flags().Int32Var(&opts.partitions, "partitions", 1, localizer.MustLocalizeFromID("kafka.topic.common.input.partitions.description"))
cmd.Flags().IntVar(&opts.retentionMs, "retention-ms", defaultRetentionPeriodMS, localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"))
cmd.Flags().IntVar(&opts.retentionBytes, "retention-bytes", defaultRetentionSize, localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag value is not being validated anywhere, I can submit invalid values


return cmd
}
Expand Down Expand Up @@ -268,13 +271,24 @@ func runInteractivePrompt(opts *Options) (err error) {
return err
}

retentionPrompt := &survey.Input{
retentionMsPrompt := &survey.Input{
Message: localizer.MustLocalizeFromID("kafka.topic.create.input.retentionMs.message"),
Help: localizer.MustLocalizeFromID("kafka.topic.common.input.retentionMs.description"),
Default: fmt.Sprintf("%v", defaultRetentionPeriodMS),
}

err = survey.AskOne(retentionPrompt, &opts.retentionMs, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
err = survey.AskOne(retentionMsPrompt, &opts.retentionMs, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
if err != nil {
return err
}

retentionBytesPrompt := &survey.Input{
Message: localizer.MustLocalizeFromID("kafka.topic.create.input.retentionBytes.message"),
Help: localizer.MustLocalizeFromID("kafka.topic.common.input.retentionBytes.description"),
Default: fmt.Sprintf("%v", defaultRetentionSize),
}

err = survey.AskOne(retentionBytesPrompt, &opts.retentionBytes, survey.WithValidator(topicutil.ValidateMessageRetentionPeriod))
if err != nil {
return err
}
Expand All @@ -284,8 +298,10 @@ func runInteractivePrompt(opts *Options) (err error) {

func createConfigEntries(opts *Options) *[]strimziadminclient.ConfigEntry {
retentionMsStr := strconv.Itoa(opts.retentionMs)
retentionBytesStr := strconv.Itoa(opts.retentionBytes)
configEntryMap := map[string]*string{
topicutil.RetentionMsKey: &retentionMsStr,
topicutil.RetentionMsKey: &retentionMsStr,
topicutil.RetentionSizeKey: &retentionBytesStr,
}
return topicutil.CreateConfigEntries(configEntryMap)
}
3 changes: 2 additions & 1 deletion pkg/cmd/kafka/topic/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"

topicutil "github.com/redhat-developer/app-services-cli/pkg/kafka/topic"

"github.com/redhat-developer/app-services-cli/internal/localizer"
Expand Down Expand Up @@ -37,7 +38,7 @@ type topicRow struct {
Name string `json:"name,omitempty" header:"Name"`
PartitionsCount int `json:"partitions_count,omitempty" header:"Partitions"`
RetentionTime string `json:"retention.ms,omitempty" header:"Retention time (ms)"`
RetentionSize string `json:"retention.bytes,omitempty" header:"Retention size"`
RetentionSize string `json:"retention.bytes,omitempty" header:"Retention size (bytes)"`
}

// NewListTopicCommand gets a new command for getting kafkas.
Expand Down
32 changes: 32 additions & 0 deletions pkg/kafka/topic/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,38 @@ func ValidateMessageRetentionPeriod(v interface{}) error {
return nil
}

// ValidateMessageRetentionPeriod validates the value (bytes) of the retention size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ValidateMessageRetentionPeriod validates the value (bytes) of the retention size
// ValidateMessageRetentionSize validates the value (bytes) of the retention size

// the valid values can range from [-1,...]
func ValidateMessageRetentionSize(v interface{}) error {
retentionSizeStr := fmt.Sprintf("%v", v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fmt.Sprintf is slow in comparison to other string conversions methods.

https://gist.github.com/evalphobia/caee1602969a640a4530


if retentionSizeStr == "" {
return nil
}

retentionPeriodBytes, err := strconv.Atoi(retentionSizeStr)
if err != nil {
return errors.New(localizer.MustLocalize(&localizer.Config{
MessageID: "common.error.castError",
TemplateData: map[string]interface{}{
"Value": v,
"Type": "int",
},
}))
}

if retentionPeriodBytes < -1 {
return errors.New(localizer.MustLocalize(&localizer.Config{
MessageID: "kafka.topic.common.validation.retentionSize.error.invalid",
TemplateData: map[string]interface{}{
"RetentionSize": retentionPeriodBytes,
},
}))
}

return nil
}

// ValidateNameIsAvailable checks if a topic with the given name already exists
func ValidateNameIsAvailable(api strimziadminclient.DefaultApi, instance string) func(v interface{}) error {
return func(v interface{}) error {
Expand Down