Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/commands/rhoas_kafka_topic_create.adoc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions pkg/cmd/kafka/topic/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
const (
defaultRetentionPeriodMS = 604800000
defaultRetentionSize = -1
defaultCleanupPolicy = "delete"
)

type Options struct {
Expand All @@ -41,6 +42,7 @@ type Options struct {
retentionBytes int
kafkaID string
outputFormat string
cleanupPolicy string
interactive bool

IO *iostreams.IOStreams
Expand Down Expand Up @@ -77,6 +79,12 @@ func NewCreateTopicCommand(f *factory.Factory) *cobra.Command {
return err
}

// check that a valid --cleanup-policy flag value is used
validPolicy := flagutil.IsValidInput(opts.cleanupPolicy, topicutil.ValidCleanupPolicies...)
if !validPolicy {
return flag.InvalidValueError("cleanup-policy", opts.cleanupPolicy, topicutil.ValidCleanupPolicies...)
}

if !opts.interactive {

validator := &topicutil.Validator{
Expand Down Expand Up @@ -125,9 +133,12 @@ func NewCreateTopicCommand(f *factory.Factory) *cobra.Command {
cmd.Flags().Int32Var(&opts.partitions, "partitions", 1, opts.localizer.MustLocalize("kafka.topic.common.input.partitions.description"))
cmd.Flags().IntVar(&opts.retentionMs, "retention-ms", defaultRetentionPeriodMS, opts.localizer.MustLocalize("kafka.topic.common.input.retentionMs.description"))
cmd.Flags().IntVar(&opts.retentionBytes, "retention-bytes", defaultRetentionSize, opts.localizer.MustLocalize("kafka.topic.common.input.retentionBytes.description"))
cmd.Flags().StringVar(&opts.cleanupPolicy, "cleanup-policy", defaultCleanupPolicy, opts.localizer.MustLocalize("kafka.topic.common.input.cleanupPolicy.description"))

flagutil.EnableOutputFlagCompletion(cmd)

flagutil.EnableStaticFlagCompletion(cmd, "cleanup-policy", topicutil.ValidCleanupPolicies)

return cmd
}

Expand Down Expand Up @@ -271,15 +282,29 @@ func runInteractivePrompt(opts *Options) (err error) {
return err
}

cleanupPolicyPrompt := &survey.Select{
Message: opts.localizer.MustLocalize("kafka.topic.create.input.cleanupPolicy.message"),
Help: opts.localizer.MustLocalize("kafka.topic.common.input.cleanupPolicy.description"),
Options: topicutil.ValidCleanupPolicies,
Default: defaultCleanupPolicy,
}

err = survey.AskOne(cleanupPolicyPrompt, &opts.cleanupPolicy)
if err != nil {
return err
}

return nil
}

func createConfigEntries(opts *Options) *[]kafkainstanceclient.ConfigEntry {
retentionMsStr := strconv.Itoa(opts.retentionMs)
retentionBytesStr := strconv.Itoa(opts.retentionBytes)
cleanupPolicyStr := opts.cleanupPolicy
configEntryMap := map[string]*string{
topicutil.RetentionMsKey: &retentionMsStr,
topicutil.RetentionSizeKey: &retentionBytesStr,
topicutil.CleanupPolicy: &cleanupPolicyStr,
}
return topicutil.CreateConfigEntries(configEntryMap)
}
4 changes: 4 additions & 0 deletions pkg/kafka/topic/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (

var RetentionMsKey string = "retention.ms"
var RetentionSizeKey string = "retention.bytes"
var PartitionsKey string = "partitions"
var CleanupPolicy string = "cleanup.policy"

var ValidCleanupPolicies = []string{"delete", "compact", "compact, delete"}

// CreateConfigEntries converts a key value map of config entries to an array of config entries
func CreateConfigEntries(entryMap map[string]*string) *[]kafkainstanceclient.ConfigEntry {
Expand Down
4 changes: 4 additions & 0 deletions pkg/localize/locales/en/cmd/kafka_topic_common.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ one = 'The period of time in milliseconds the broker will retain a partition log
description = 'Description for the Retention size input'
one = 'The maximum total size of a partition log segments before old log segments are deleted to free up space'

[kafka.topic.common.input.cleanupPolicy.description]
description = 'Description for the Cleanup policy input'
one = 'Determines whether log messages are deleted, compacted, or both'

[kafka.topic.common.error.noKafkaSelected]
one = 'no Kafka instance is currently selected, run "rhoas kafka use" to set the current instance'

Expand Down
6 changes: 5 additions & 1 deletion pkg/localize/locales/en/cmd/kafka_topic_create.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ one = 'Retention Size (bytes):'

[kafka.topic.create.input.partitions.message]
description = 'Message for the Partitions input'
one = 'Number of Partitions:'
one = 'Number of Partitions:'

[kafka.topic.create.input.cleanupPolicy.message]
description = 'Message for the Cleanup Policy input'
one = 'Cleanup Policy:'