Skip to content

Commit 123cf1b

Browse files
committed
feat: new consume command
1 parent 3e8e7a1 commit 123cf1b

3 files changed

Lines changed: 100 additions & 0 deletions

File tree

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package consume
2+
3+
import (
4+
kafkaflagutil "github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/flagutil"
5+
6+
"github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/kafkacmdutil"
7+
"github.com/redhat-developer/app-services-cli/pkg/shared/connection"
8+
"github.com/redhat-developer/app-services-cli/pkg/shared/contextutil"
9+
"github.com/redhat-developer/app-services-cli/pkg/shared/factory"
10+
"github.com/spf13/cobra"
11+
)
12+
13+
type options struct {
14+
topicName string
15+
kafkaID string
16+
partition int32
17+
timestamp string
18+
limit int32
19+
20+
f *factory.Factory
21+
}
22+
23+
// NewComsumeTopicCommand creates a new command for producing to a kafka topic.
24+
func NewConsumeTopicCommand(f *factory.Factory) *cobra.Command {
25+
opts := &options{
26+
f: f,
27+
}
28+
29+
cmd := &cobra.Command{
30+
Use: "consume",
31+
Short: "consume short",
32+
Long: "consume long",
33+
Example: "consume example",
34+
Args: cobra.NoArgs,
35+
RunE: func(cmd *cobra.Command, args []string) (err error) {
36+
if opts.kafkaID == "" {
37+
38+
kafkaInstance, err := contextutil.GetCurrentKafkaInstance(f)
39+
if err != nil {
40+
return err
41+
}
42+
43+
opts.kafkaID = kafkaInstance.GetId()
44+
}
45+
46+
return runCmd(opts)
47+
},
48+
}
49+
50+
flags := kafkaflagutil.NewFlagSet(cmd, f.Localizer)
51+
52+
flags.StringVar(&opts.topicName, "name", "", f.Localizer.MustLocalize("kafka.topic.common.flag.name.description"))
53+
flags.Int32Var(&opts.partition, "partition", 0, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description"))
54+
flags.StringVar(&opts.timestamp, "timestamp", "", f.Localizer.MustLocalize("kafka.topic.consume.flag.timestamp.description"))
55+
flags.Int32Var(&opts.limit, "limit", 20, f.Localizer.MustLocalize("kafka.topic.consume.flag.limit.description"))
56+
57+
_ = cmd.MarkFlagRequired("name")
58+
59+
_ = cmd.RegisterFlagCompletionFunc("name", func(cmd *cobra.Command, _ []string, toComplete string) ([]string, cobra.ShellCompDirective) {
60+
return kafkacmdutil.FilterValidTopicNameArgs(f, toComplete)
61+
})
62+
63+
flags.AddInstanceID(&opts.kafkaID)
64+
65+
return cmd
66+
}
67+
68+
func runCmd(opts *options) error {
69+
conn, err := opts.f.Connection(connection.DefaultConfigRequireMasAuth)
70+
if err != nil {
71+
return err
72+
}
73+
74+
api, _, err := conn.API().KafkaAdmin(opts.kafkaID)
75+
if err != nil {
76+
return err
77+
}
78+
79+
list, _, err := api.RecordsApi.ConsumeRecords(opts.f.Context, opts.topicName).Limit(opts.limit).Partition(opts.partition).Execute()
80+
if err != nil {
81+
return err
82+
}
83+
84+
for i := int32(0); i < list.Total; i++ {
85+
opts.f.Logger.Info(list.Items[i].Value)
86+
}
87+
88+
return nil
89+
}

pkg/cmd/kafka/topic/topic.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package topic
22

33
import (
4+
"github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/topic/consume"
45
"github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/topic/create"
56
"github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/topic/delete"
67
"github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/topic/describe"
@@ -30,6 +31,7 @@ func NewTopicCommand(f *factory.Factory) *cobra.Command {
3031
delete.NewDeleteTopicCommand(f),
3132
describe.NewDescribeTopicCommand(f),
3233
update.NewUpdateTopicCommand(f),
34+
consume.NewConsumeTopicCommand(f),
3335
)
3436

3537
return cmd

pkg/core/localize/locales/en/cmd/kafka.en.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,15 @@ one = 'Format in which to display the Kafka topic (choose from: "json", "yml", "
551551
[kafka.topic.list.flag.output.description]
552552
one = 'Format in which to display the Kafka topic (choose from: "json", "yml", "yaml")'
553553

554+
[kafka.topic.consume.flag.partition.description]
555+
one = 'The partition to consume from'
556+
557+
[kafka.topic.consume.flag.timestamp.description]
558+
one = 'Timestamp to start consuming from'
559+
560+
[kafka.topic.consume.flag.limit.description]
561+
one = 'Max records to consume from topic'
562+
554563
[kafka.topic.common.input.partitions.description]
555564
description = 'help for the Partitions input'
556565
one = 'The number of partitions in the topic'

0 commit comments

Comments
 (0)