Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/commands/rhoas_kafka_topic_consume.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 27 additions & 9 deletions pkg/cmd/kafka/topic/consume/consume.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package consume

import (
"errors"
"fmt"
"net/http"
"strconv"
"time"

kafkaflagutil "github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/flagutil"
Expand All @@ -23,7 +23,7 @@ import (
)

const (
DefaultOffset = 0
DefaultOffset = ""
DefaultLimit = 20
DefaultTimestamp = ""
FormatKeyValue = "key-value"
Expand All @@ -37,7 +37,7 @@ type options struct {
partition int32
from string
limit int32
offset int64
offset string
wait bool
outputFormat string

Expand Down Expand Up @@ -86,7 +86,7 @@ func NewConsumeTopicCommand(f *factory.Factory) *cobra.Command {
flags.Int32Var(&opts.partition, "partition", 0, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description"))
flags.StringVar(&opts.from, "from", DefaultTimestamp, f.Localizer.MustLocalize("kafka.topic.consume.flag.from.description"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we need 2 flags for Unix Epoch and Actual Date.
I would prefer 2 flags for simplicity of documentation and validation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how much epoch is used so I was thinking of some kind of falg to say that is what yuo are using.

For example, use --from as normal either ISO or unix time and just have another bool flag like --unix-time which tells the cli how to parse time default would be iso.

Copy link
Collaborator

@wtrocki wtrocki Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think flag modifying another flag can be hard to work on and confusing.

I would suggest:

--from-time (ISO 8601 date - short format)
--from-timestamp (epoch/unix timestamp)

I would love to save this discussion somewhere so we can recall this when building CLI guidelines.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw. That is my opinion and this is opinionated part so do not feel that you need to do it this way. It is more about my personal preference to keep docs clean.

flags.BoolVar(&opts.wait, "wait", false, f.Localizer.MustLocalize("kafka.topic.consume.flag.wait.description"))
flags.Int64Var(&opts.offset, "offset", DefaultOffset, f.Localizer.MustLocalize("kafka.topic.consume.flag.offset.description"))
flags.StringVar(&opts.offset, "offset", DefaultOffset, f.Localizer.MustLocalize("kafka.topic.consume.flag.offset.description"))
flags.Int32Var(&opts.limit, "limit", DefaultLimit, f.Localizer.MustLocalize("kafka.topic.consume.flag.limit.description"))
flags.StringVar(&opts.outputFormat, "format", FormatKeyValue, f.Localizer.MustLocalize("kafka.topic.produce.flag.format.description"))

Expand Down Expand Up @@ -114,6 +114,11 @@ func runCmd(opts *options) error {
return err
}

// check for flags that are exclusive to eachother
if opts.offset != DefaultOffset && opts.from != DefaultTimestamp {
return opts.f.Localizer.MustLocalizeError("kafka.topic.consume.error.offsetAndFromConflict")
}

if opts.wait {
err := consumeAndWait(opts, api, kafkaInstance)
if err != nil {
Expand Down Expand Up @@ -147,7 +152,7 @@ func consumeAndWait(opts *options, api *kafkainstanceclient.APIClient, kafkaInst
opts.from = time.Now().Format(time.RFC3339)
}

max_offset := opts.offset
var max_offset int64
first_consume := true
for true {

Expand All @@ -167,10 +172,9 @@ func consumeAndWait(opts *options, api *kafkainstanceclient.APIClient, kafkaInst
opts.from = DefaultTimestamp
first_consume = false
}
opts.offset = fmt.Sprint(max_offset)
}

opts.offset = max_offset

time.Sleep(1 * time.Second)
}

Expand All @@ -179,12 +183,26 @@ func consumeAndWait(opts *options, api *kafkainstanceclient.APIClient, kafkaInst

func consume(opts *options, api *kafkainstanceclient.APIClient, kafkaInstance *kafkamgmtclient.KafkaRequest) (*kafkainstanceclient.RecordList, error) {

request := api.RecordsApi.ConsumeRecords(opts.f.Context, opts.topicName).Limit(opts.limit).Partition(opts.partition).Offset(int32(opts.offset))
request := api.RecordsApi.ConsumeRecords(opts.f.Context, opts.topicName).Limit(opts.limit).Partition(opts.partition)

if opts.offset != DefaultOffset {
intOffset, err := strconv.ParseInt(opts.offset, 10, 64)
if err != nil {
return nil, opts.f.Localizer.MustLocalizeError("kafka.topic.comman.error.offsetInvalid", localize.NewEntry("Offset", opts.offset))
}

if intOffset < 0 {
return nil, opts.f.Localizer.MustLocalizeError("kafka.topic.comman.error.offsetNegative")
}

request = request.Offset(int32(intOffset))
}

if opts.from != DefaultTimestamp {

_, err := time.Parse(time.RFC3339, opts.from)
if err != nil {
return nil, errors.New(opts.f.Localizer.MustLocalize("kafka.topic.comman.error.timeFormat", localize.NewEntry("Time", opts.from)))
return nil, opts.f.Localizer.MustLocalizeError("kafka.topic.comman.error.timeFormat", localize.NewEntry("Time", opts.from))
}

request = request.Timestamp(opts.from)
Expand Down
12 changes: 12 additions & 0 deletions pkg/core/localize/locales/en/cmd/kafka.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,18 @@ one = 'Value given for offset will be ignored, using default value {{.Offset}}'
[kafka.topic.comman.error.timeFormat]
one = '"{{.Time}}" cannot be parsed as a valid time, must use YYYY-MM-DDThh:mm:ss.ssssZ'

[kafka.topic.comman.error.offsetNegative]
description = 'Error message when a negative offset is given'
one = 'Invalid offset given, offset must be a positive integer'

[kafka.topic.comman.error.offsetInvalid]
description = 'Error message when the offset given is not a number'
one = 'Invalid offset given, the value "{{.Offset}}" is not a number'

[kafka.topic.consume.error.offsetAndFromConflict]
description = 'Error message when the offset and from flags are both set'
one = 'Cannot use offset and from flags to filter messages'

[kafka.topic.common.input.partitions.description]
description = 'help for the Partitions input'
one = 'The number of partitions in the topic'
Expand Down