Skip to content

Produce and consume records#1598

Merged
wtrocki merged 17 commits intomainfrom
produce-consume
Jun 27, 2022
Merged

Produce and consume records#1598
wtrocki merged 17 commits intomainfrom
produce-consume

Conversation

@jackdelahunt
Copy link
Contributor

@jackdelahunt jackdelahunt commented Jun 14, 2022

This PR adds a produce and cosume command to a kafka topic. Flags like the partition, key, limit, and timestamp are optional. This is useful for debugging purposes and currently you need another thrid-party kafka cli for this feature.

Verification Steps

  1. Do x
  2. Do y

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation change
  • Other (please specify)

This was referenced Jun 14, 2022
@jackdelahunt
Copy link
Contributor Author

Just to comment on the format of the timestamps...

Format Value Worked?
UTC 06/14/2022 @ 2:13pm No
ISO 8601 2022-06-14T14:13:31+00:00 Yes
RFC 822 Tue, 14 Jun 2022 14:13:31 +0000 No
RFC 2822 Tuesday, 14-Jun-22 14:13:31 UTC No
Unix timestamp 1655216639 No

If I had to guess ISO is the default and unix could work if I was able to set the timestamp type(currently cannot do this through go-sdk)

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 14, 2022

I guess we can send this as email for feedback to the admin backend so we can add better sdk docs for it

@jackdelahunt
Copy link
Contributor Author

@wtrocki basic functionality is in just for consuming and producing, tried to stick to design guidelines but I may have missed something

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 15, 2022

make lint and make format required

Comment on lines +95 to +96
// setting timestamp as "" (not set by user) is not valid
// not setting timestamp is handled by the request
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment describes SDK behaviour

Suggested change
// setting timestamp as "" (not set by user) is not valid
// not setting timestamp is handled by the request

if opts.timestamp != "" {
// setting timestamp as "" (not set by user) is not valid
// not setting timestamp is handled by the request
request = request.Timestamp(opts.timestamp)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would typically validate that timestamp and document values that are possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So instead of letting the api throw errors if the format is correct parse it ourselves and then return our custom error. I guess we also add possible format options in the long description of the command.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good API accept only single format. Good client accept as many formats and converts them for the API. If input cannot be converted to proper format supported by API error should be returned.

@jackdelahunt
Copy link
Contributor Author

Okay some issues with using --wait. Unless I need to use a different endpoint using the currently consume api retrieves all records matching that query no matter if the current connection already consumed that. This means if we consume over and over while there may not be any new records produced we still get all the old ones again and again.

There could be ways to use this and still get a nice result but I think we would then be going around the issue. @wtrocki thoughts?

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 15, 2022

@wtrocki thoughts?

That is why we have offset argument set in the consume api. We can control offset :)

@jackdelahunt
Copy link
Contributor Author

jackdelahunt commented Jun 15, 2022

Oh okay that makes sense I didn't think of this... I forgot offset existed

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 15, 2022

make lint and make format required

flags.AddOutput(&opts.outputFormat)
flags.StringVar(&opts.topicName, "name", "", f.Localizer.MustLocalize("kafka.topic.common.flag.name.description"))
flags.Int32Var(&opts.partition, "partition", 0, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description"))
flags.StringVar(&opts.timestamp, "timestamp", "", f.Localizer.MustLocalize("kafka.topic.consume.flag.timestamp.description"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need date flag as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Date flag? what would this do? would this not be what timestamp already covers

Copy link
Collaborator

@wtrocki wtrocki Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are number of date formats. We need to be specific what timestamp is. For example databases have specific format associated with timestamp etc.

More: https://en.wikipedia.org/wiki/ISO_8601

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay I understand, just something to specify the format used

}
}

func mapRecordsToRows(topic string, records *[]kafkainstanceclient.Record) []kafkaRow {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Why we need this method? Any reason why we do not want to use API structure?
When API structure changes we will need to make manual change here which is not ideal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Record has members we may not want or need like timestamp type. This was the standard way in other parts of cli to just extract the values we want to see. Personally I don't mind either way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have key value output for that (default) and json should be just output of api.

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 23, 2022

Left basic comments.

[wtrocki@graphapi app-services-cli (produce-consume)]$ rhoas kafka topic produce --name=missing-topic

Missing info that we are waiting for user input. Missing validation on non existent topic.

[wtrocki@graphapi app-services-cli (produce-consume)]$ rhoas kafka topic produce --name=test --file connector.json
❌ 404 Not Found. Run the command in verbose mode using the -v flag to see more information

No API error handling present.

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 23, 2022

[wtrocki@graphapi app-services-cli (produce-consume)]$ rhoas kafka topic produce --name=test --file connector.json
  TOPIC   KEY   VALUE                                                                    PARTITION   OFFSET  
 ------- ----- ------------------------------------------------------------------------ ----------- -------- 
  test          {                                                                                0        0  
                  "channel": "stable",                                                                       
                  "connector": {                                                                             
                    "kafka_topic": "lb-cos",                                                                 
                    "log_show_all": true                                                                     
                  },                                                                                         
                  "connector_type_id": "log_sink_0.1",                                                       
                  "id": "c97gajsotd37bufhphu0",                                                              
                  "kafka": {                                                                                 
                    "id": "c52ajhf2ek06ri44ife0",                                                            
                    "url": "openbridge-c--ajhf-ek--ri--ifea.bf2.kafka.rhcloud.com:443"                       
                  },                                                                                         
                  "kind": "Connector",                                                                       
                  "modified_at": "2022-04-07T15:37:51.612514Z",                                              
                  "name": "cos-wtrocki-logs",                                                                
                  "namespace_id": "c9f8i3njogcbrlqqlt90",                                                    
                  "owner": "wtrocki_kafka_supporting",                                                       
                  "schema_registry": {                                                                       
                    "id": "",                                                                                
                    "url": ""                                                                                
                  },                                                                                         
                  "service_account": {                                                                       
                    "client_id": "srvc-acct-3de38d77-90ff-4480-9a0f-8a8230ccff23",                           
                    "client_secret": "srvc-acct-3de38d77-90ff-4480-9a0f-8a8230ccff23"            
                This looks odd for me. I would recommend to only show json/yaml for now. Yaml is more human readable than table.

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 23, 2022

No validation for --partition flag - value too large

[wtrocki@graphapi app-services-cli (produce-consume)]$ rhoas kafka topic produce --name=test --file connector.json --key=test
 --partition 2
❌ 400 Bad Request. Run the command in verbose mode using the -v flag to see more information

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 23, 2022

JSON format is unusable. Value needs to be escaped thus it is hard to read.
I do not think we can do anything else than using key, value format as default instead.

[wtrocki@graphapi app-services-cli (produce-consume)]$ rhoas kafka topic consume --name=test
{
"topic": "test",
"key": "",
"value": "{\n "channel": "stable",\n "connector": {\n "kafka_topic": "lb-cos",\n "log_show_all": true\n },\n "connector_type_id": "log_sink_0.1",\n "id": "c97gajsotd37bufhphu0",\n "kafka": {\n "id": "c52ajhf2ek06ri44ife0",\n "url": "openbridge-c--ajhf-ek--ri--ifea.bf2.kafka.rhcloud.com:443"\n },\n "kind": "Connector",\n "modified_at": "2022-04-07T15:37:51.612514Z",\n "name": "cos-wtrocki-logs",\n "namespace_id": "c9f8i3njogcbrlqqlt90",\n "owner": "wtrocki_kafka_supporting",\n "schema_registry": {\n "id": "",\n "url": ""\n },\n "service_account": {\n "client_id": "srvc-acct-3de38d77-90ff-4480-9a0f-8a8230ccff23",\n "client_secret": "srvc-acct-3de38d77-90ff-4480-9a0f-8a8230ccff23"\n }\n}",
"partition": 0,
"offset": 0
}

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 23, 2022

Key value format seems to be missformatted. Some messages with key will get formatted differently than others:

{
  "schema_registry": {
    "id": "",
    "url": ""
  },
  "service_account": {
    "client_id": "srvc-acct-3de38d77-90ff-4480-9a0f-8a8230ccff23",
    "client_secret": "srvc-acct-3de38d77-90ff-4480-9a0f-8a8230ccff23"
  }
}

Key: test
Value: {
"channel": "stable",
"connector": {
"kafka_topic": "lb-cos",
"log_show_all": true
},
"connector_type_id": "log_sink_0.1",
"id": "c97gajsotd37bufhphu0",
"kafka": {
"id": "c52ajhf2ek06ri44ife0",
"url": "openbridge-c--ajhf-ek--ri--ifea.bf2.kafka.rhcloud.com:443"
},
"kind": "Connector",
"modified_at": "2022-04-07T15:37:51.612514Z",
"name": "cos-wtrocki-logs",
"namespace_id": "c9f8i3njogcbrlqqlt90",
"owner": "wtrocki_kafka_supporting",
"schema_registry": {
"id": "",



I think that we would need always print Key, Message or Message: format to visibly show end of the messages.

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 23, 2022

Some valid cases return 400 for me:

[wtrocki@graphapi app-services-cli (produce-consume)]$ rhoas kafka topic consume --name=test --partition=1 --offset=1
❌ 400 Bad Request. Run the command in verbose mode using the -v flag to see more information

@@ -86,7 +86,7 @@ func NewConsumeTopicCommand(f *factory.Factory) *cobra.Command {
flags.Int32Var(&opts.partition, "partition", 0, f.Localizer.MustLocalize("kafka.topic.consume.flag.partition.description"))
flags.StringVar(&opts.from, "from", DefaultTimestamp, f.Localizer.MustLocalize("kafka.topic.consume.flag.from.description"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we need 2 flags for Unix Epoch and Actual Date.
I would prefer 2 flags for simplicity of documentation and validation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how much epoch is used so I was thinking of some kind of falg to say that is what yuo are using.

For example, use --from as normal either ISO or unix time and just have another bool flag like --unix-time which tells the cli how to parse time default would be iso.

Copy link
Collaborator

@wtrocki wtrocki Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think flag modifying another flag can be hard to work on and confusing.

I would suggest:

--from-time (ISO 8601 date - short format)
--from-timestamp (epoch/unix timestamp)

I would love to save this discussion somewhere so we can recall this when building CLI guidelines.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw. That is my opinion and this is opinionated part so do not feel that you need to do it this way. It is more about my personal preference to keep docs clean.

@jackdelahunt
Copy link
Contributor Author

Some valid cases return 400 for me

Was not able to replicate this so will wait for feedback from others to find the issue

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 24, 2022

Was not able to replicate this so will wait for feedback from others to find the issue

I think you would need messages on specific partitions to replicate that.
Checking it again

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 24, 2022

It works now (throws validation error)

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 24, 2022

Used https://awesomeopensource.com/project/jdorfman/awesome-json-datasets
Tested --wait use case. Tested normal produce consume. This looks very good to be shared. It is just matter of figuring out dates and examples

Message: Binary or non-UTF-8 encoded data cannot be displayed

Nice!

Copy link
Collaborator

@wtrocki wtrocki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more examples/docs and we good to merge

cmd := &cobra.Command{
Use: "consume",
Short: f.Localizer.MustLocalize("kafka.topic.consume.cmd.shortDescription"),
Long: f.Localizer.MustLocalize("kafka.topic.consume.cmd.longDescription"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add Hidden: True to hide commands for now

@jackdelahunt
Copy link
Contributor Author

@wtrocki added more examples and made commands hidden, will I squash and merge this now?

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 27, 2022

Every new change - new review and verification. doing it now

$ rhoas kafka topic consume --name=topic-1 --wait --from=2022-06-17T07:05:34+00:00Z

# Consume from a topic starting from a certain time using unix time format
$ rhoas kafka topic consume --name=topic-1 --wait --unix-time --from=812762
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing jq example etc. all examples are for wait - missing non wait examples

@wtrocki
Copy link
Collaborator

wtrocki commented Jun 27, 2022

We need more examples for typical use cases:

  • getting message at offset
  • jq some part of the message stream only
  • getting info about current offset
  • adding some examples to topic root
    etc.

@wtrocki wtrocki merged commit e9a98d1 into main Jun 27, 2022
@wtrocki wtrocki deleted the produce-consume branch June 27, 2022 11:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants