Skip to content

Commit 6af7dc9

Browse files
committed
feat: unix timestamp support for consume
1 parent 562364a commit 6af7dc9

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

pkg/cmd/kafka/topic/consume/consume.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type options struct {
3636
kafkaID string
3737
partition int32
3838
from string
39+
unix bool
3940
limit int32
4041
offset string
4142
wait bool
@@ -85,6 +86,7 @@ func NewConsumeTopicCommand(f *factory.Factory) *cobra.Command {
8586
flags.StringVar(&opts.topicName, "name", "", f.Localizer.MustLocalize("kafka.topic.common.flag.name.description"))
8687
flags.Int32Var(&opts.partition, "partition", 0, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description"))
8788
flags.StringVar(&opts.from, "from", DefaultTimestamp, f.Localizer.MustLocalize("kafka.topic.consume.flag.from.description"))
89+
flags.BoolVar(&opts.unix, "unix-time", false, "Use unix timestamp")
8890
flags.BoolVar(&opts.wait, "wait", false, f.Localizer.MustLocalize("kafka.topic.consume.flag.wait.description"))
8991
flags.StringVar(&opts.offset, "offset", DefaultOffset, f.Localizer.MustLocalize("kafka.topic.consume.flag.offset.description"))
9092
flags.Int32Var(&opts.limit, "limit", DefaultLimit, f.Localizer.MustLocalize("kafka.topic.consume.flag.limit.description"))
@@ -200,9 +202,19 @@ func consume(opts *options, api *kafkainstanceclient.APIClient, kafkaInstance *k
200202

201203
if opts.from != DefaultTimestamp {
202204

203-
_, err := time.Parse(time.RFC3339, opts.from)
204-
if err != nil {
205-
return nil, opts.f.Localizer.MustLocalizeError("kafka.topic.comman.error.timeFormat", localize.NewEntry("Time", opts.from))
205+
if opts.unix {
206+
digits, err := strconv.ParseInt(opts.from, 10, 64)
207+
if err != nil {
208+
return nil, err
209+
}
210+
211+
opts.from = time.Unix(digits, 0).Format(time.RFC3339)
212+
opts.f.Logger.Info(opts.from)
213+
} else {
214+
_, err := time.Parse(time.RFC3339, opts.from)
215+
if err != nil {
216+
return nil, opts.f.Localizer.MustLocalizeError("kafka.topic.comman.error.timeFormat", localize.NewEntry("Time", opts.from))
217+
}
206218
}
207219

208220
request = request.Timestamp(opts.from)
@@ -259,7 +271,7 @@ func outputRecords(opts *options, records *kafkainstanceclient.RecordList) {
259271
opts.f.Logger.Info(fmt.Sprintf("Key: %v\nMessage: %v", row.Key, row.Value))
260272
}
261273
} else {
262-
_ = dump.Formatted(opts.f.IOStreams.Out, format, records.Items[i])
274+
_ = dump.Formatted(opts.f.IOStreams.Out, format, row)
263275
}
264276
}
265277
}

0 commit comments

Comments
 (0)