Skip to content

feat(consumer-group): add reset-offset command#876

Merged
rkpattnaik780 merged 17 commits intomainfrom
reset_offset
Sep 3, 2021
Merged

feat(consumer-group): add reset-offset command#876
rkpattnaik780 merged 17 commits intomainfrom
reset_offset

Conversation

@rkpattnaik780
Copy link
Contributor

@rkpattnaik780 rkpattnaik780 commented Aug 6, 2021

Implement rhoas kafka consumer-group reset-offset command.

Closes #763

Verification Steps against mock

  1. Start mock server.
make mock-api/start
  1. Select a Kafka instance using
rhoas kafka use --id <id-of-instance>
  1. Run the following command to reset offset for a consumer group:
go run ./cmd/rhoas kafka consumer-group reset-offset --id my-app --offset earliest --topic topic-1
  1. The output should be updated consumers in a tabular form

Verification Steps against stage (WIP)

  1. Create a Kafka instance,a Kafka topic and Kafka consumer group.
  2. After consuming a few messages, disconnect the consumer group.
  3. Run the following command to reset offset for a consumer group:
./rhoas kafka consumer-group reset-offset --id my-group --offset earliest -v
  1. Run the following command to reset offset for specific consumers in a consumer group:
./rhoas kafka consumer-group reset-offset --id my-group --topic rama --offset earliest --partitions "1 2" -v

Type of change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation change
  • Other (please specify)

Checklist

  • Documentation added for the feature
  • CI and all relevant tests are passing
  • Code Review completed
  • Verified independently by reviewer

one = 'Partitions on which reset-offset is to be carried upon (space separated integers)'

[kafka.consumerGroup.resetOffset.flag.output]
one = 'Output in which to display reset offset result'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
one = 'Output in which to display reset offset result'
one = 'Format in which to display reset offset result (choose from: "json", "yml", "yaml")'

one = 'Custom offset value (required when offset is absolute or timestamp)'

[kafka.consumerGroup.resetOffset.flag.partitions]
one = 'Partitions on which reset-offset is to be carried upon (space separated integers)'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
one = 'Partitions on which reset-offset is to be carried upon (space separated integers)'
one = 'Partitions on which to reset the consumer group offset (space separated integers)'

one = 'Skip confirmation to forcibly reset-offset of a consumer group'

[kafka.consumerGroup.resetOffset.flag.offset]
one = 'Offset type (earliest, latest, absolute, timestamp)'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
one = 'Offset type (earliest, latest, absolute, timestamp)'
one = 'Offset type (choose from: "earliest", "latest", "absolute", "timestamp")'

one = 'Select topic for which consumer group offsets are to be reset'

[kafka.consumerGroup.resetOffset.flag.yes]
one = 'Skip confirmation to forcibly reset-offset of a consumer group'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
one = 'Skip confirmation to forcibly reset-offset of a consumer group'
one = 'Skip confirmation to forcibly reset the offset for the consumer group'

@@ -0,0 +1,43 @@
[kafka.consumerGroup.resetOffset.cmd.shortDescription]
one = 'Reset offset of a consumer group'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
one = 'Reset offset of a consumer group'
one = 'Reset offset for a consumer group'


[kafka.consumerGroup.resetOffset.cmd.longDescription]
one = '''
Reset offset for a consumer group from the Kafka instance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Reset offset for a consumer group from the Kafka instance.
Reset the offset for a consumer group.

I think the fact that this applies to a Kafka instance should be clear without stating it.

one = 'Offset type (earliest, latest, absolute, timestamp)'

[kafka.consumerGroup.resetOffset.flag.value]
one = 'Custom offset value (required when offset is absolute or timestamp)'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
one = 'Custom offset value (required when offset is absolute or timestamp)'
one = 'Custom offset value (required when offset is "absolute" or "timestamp")'

one = 'Are you sure you want to reset the offset for consumer group "{{.ID}}"?'

[kafka.consumerGroup.resetOffset.log.debug.cancelledReset]
one = 'You have chosen to not reset the consumer group offset.'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
one = 'You have chosen to not reset the consumer group offset.'
one = 'You have chosen not to reset the consumer group offset.'

Copy link
Contributor

@bhardesty bhardesty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just made a few minor help text suggestions.


defer httpRes.Body.Close()

logger.Info(opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.log.info.successful", localize.NewEntry("ConsumerGroupID", opts.id), localize.NewEntry("InstanceName", kafkaInstance.GetName())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder should be start breaking long logs onto multiple lines (the code, not the content).

localizer localize.Localizer
}

type UpdatedConsumerRow struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this was made public?

@craicoverflow
Copy link
Contributor

I think it would be better if there was earlier validation of the flag values. For example, if I set offset=absolute but don't pass any value for --value I should be informed of this before a server request is made.

@craicoverflow
Copy link
Contributor

Does this require the mocks to test?

@rkpattnaik780
Copy link
Contributor Author

Does this require the mocks to test?

Yes, I will be updating verification steps.

Copy link
Contributor

@craicoverflow craicoverflow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I entered a random value for "id" and "topic" but was allowed to go through the steps as if they existed:
❯ ./rhoas kafka consumer-group reset-offset --partitions 1 --partitions 2 --offset latest --id 1 --topic hello
? Are you sure you want to reset the offset for consumer group "1"? Yes
Error: 400 Bad Request. Run the command in verbose mode using the -v flag to see more information
  1. I think we ought to start extracting the error messages from the body:

The error displays "400 Bad Request" but this won't help the user know what to do.

{"code":400,"error_message":"Topic topic1, partition 1 is not valid","class":"IllegalArgumentException"}
Error: 400 Bad Request. Run the command in verbose mode using the -v flag to see more **information**
  1. When I enter an invalid topic, the "Are you sure" dialog gets stuck (hangs). This seems to be an issue on the backend though - the request is sent but it is not responding. I still think we should check if the topic exists on the client-side first.
❯ ./rhoas kafka consumer-group reset-offset --offset earliest --id consumer-group-0 --topic mytopic
? Are you sure you want to reset the offset for consumer group "consumer-group-0"? Yes
  1. Do you think maybe it is a good idea to print the same columns/data that you see in consumer-group describe? There are columns like "log end offset", "current offset" which users may expect to see. (Up for discussion).

  2. Passing an inval;id value for --value is not handled by the API. We might need to log some bugs/work with the team to get this resolved.

./rhoas kafka consumer-group reset-offset \                                  
--id consumer-group-0 \
--topic topic1 -y \
--offset absolute --value "hello" \
-yv
  1. It would be cool to validate the "value" value when the "offset" is "timestamp". The format should be "Timestamp must be in format 'yyyy-MM-dd'T'HH:mm:ssz" but the user cannot tell this from anywhere in the CLI so it needs to be documented.

Overall, good PR but this needs to be tested more extensively as it is an important command with many option combinations possible.

Comment on lines +19 to +27
func ValidateOffset(v string) error {
isValid := flagutil.IsValidInput(v, flagutil.ValidOffsets...)

if isValid {
return nil
}

return InvalidValueError("output", v, flagutil.ValidOffsets...)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see pkg/cmd/flag/validation.go as a place for generic, reusable flag helpers. Offset is specific to consumer groups, and so it should not be here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have another method with signature ValidateInput(v string, validValues string[], flagName string) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if you think that would be an in improvement just go ahead and implement it 😄

}
}

if opts.value == "" && (opts.offset == "absolute" || opts.offset == "timestamp") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create some constants for the different possible values for "offset".

cmd.Flags().BoolVarP(&opts.skipConfirm, "yes", "y", false, opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.yes"))
cmd.Flags().StringVar(&opts.id, "id", "", opts.localizer.MustLocalize("kafka.consumerGroup.common.flag.id.description", localize.NewEntry("Action", "reset-offset")))
cmd.Flags().StringVar(&opts.value, "value", "", opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.value"))
cmd.Flags().StringVar(&opts.offset, "offset", "", opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.offset"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should be using a default? Does the UI default to anything? From some research I see that it can be configured from Kafka itself, and the default value is "latest". I am not sure whether the backend produces some default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UI doesn't have a default, renders "select" in the beginning.

switch opts.output {
case "json":
data, _ := json.Marshal(updatedConsumers)
_ = dump.JSON(opts.IO.Out, data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the PrintDataInFormat helper defined in the dump package.

)

switch opts.output {
case "json":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the constants defined in dump.

var (
ValidOutputFormats = []string{dump.JSONFormat, dump.YAMLFormat, dump.YMLFormat}
CredentialsOutputFormats = []string{"env", "json", "properties"}
ValidOffsets = []string{"timestamp", "absolute", "latest", "earliest"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the constants which will be created :)


[kafka.consumerGroup.resetOffset.cmd.longDescription]
one = '''
Reset the offset for a consumer group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably have some more detail?

cmd.Flags().StringVar(&opts.value, "value", "", opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.value"))
cmd.Flags().StringVar(&opts.offset, "offset", "", opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.offset"))
cmd.Flags().StringVar(&opts.topic, "topic", "", opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.topic"))
cmd.Flags().Int32SliceVar(&opts.partitions, "partitions", []int32{}, opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.partitions"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cmd.Flags().Int32SliceVar(&opts.partitions, "partitions", []int32{}, opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.partitions"))
cmd.Flags().Int32SliceVar(&opts.partitions, "partition", []int32{}, opts.localizer.MustLocalize("kafka.consumerGroup.resetOffset.flag.partitions"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think partitions 1,2,3 will look better than --partition 1 --partition 2 --partition 3. Will become a drag as the number of partitions to pass will increase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah neat, I did not know that Int32SliceVar allowed for both input types to be valid. That is fine then.

one = 'Custom offset value (required when offset is "absolute" or "timestamp")'

[kafka.consumerGroup.resetOffset.flag.partitions]
one = 'Partitions on which to reset the consumer group offset (space separated integers)'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"(space separated integers)" is that right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is right, it should be "space-separated" :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be comma separated as of now, used to be space separated before.

Copy link
Contributor

@craicoverflow craicoverflow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of nits, but happy for those to be addressed in a follow up.

EarliestOffset = "earliest"
TimestampOffset = "timestamp"
LatestOffset = "latest"
OffsetAbssolute = "absolute"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OffsetAbssolute = "absolute"
OffsetAbsolute

typo.


var ValidOffsets = []string{OffsetAbssolute, OffsetEarliest, OffsetTimestamp, OffsetLatest}

var timestampOffsetRegExp = regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}-\d{2}:\d{2})$`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be better to use an iso8601 validator lib for this:

https://github.com/relvacode/iso8601

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to validators package.

}

// ValidateOffset checks if value v is a valid value for --offset
func ValidateOffset(v string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Ideally this should go into the validators package

@craicoverflow
Copy link
Contributor

Next steps once this is merged @rkpattnaik780 - add an example to the guides repo.

@rkpattnaik780
Copy link
Contributor Author

Next steps once this is merged @rkpattnaik780 - add an example to the guides repo.

Will be doing these in a follow up along with tests for consumer group validator.

@wtrocki
Copy link
Collaborator

wtrocki commented Sep 3, 2021

Is this feature deployed to prod?

@rkpattnaik780
Copy link
Contributor Author

Is this feature deployed to prod?

Will check in prod and get back to you, it is there in stage though.

@wtrocki
Copy link
Collaborator

wtrocki commented Sep 3, 2021

Please do not bother. I would try to play with this on stage. Going to play with this for a while

@craicoverflow
Copy link
Contributor

Is this feature deployed to prod?

I think that it is yes.

[discrete]
== Synopsis

Reset the offset for consumers in a consumer group reading from a given topic.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more information.

List offset types.
We need to list 2 modes (partition mode is only for advanced use cases)

....

[discrete]
== Examples
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Examples should reflect all the modes and types.

@rkpattnaik780 rkpattnaik780 merged commit 1b0876f into main Sep 3, 2021
@rkpattnaik780 rkpattnaik780 deleted the reset_offset branch September 3, 2021 11:58
@rkpattnaik780
Copy link
Contributor Author

Next steps once this is merged @rkpattnaik780 - add an example to the guides repo.

Should we create a new release (0.29.0) before updating the guides?

@craicoverflow
Copy link
Contributor

Next steps once this is merged @rkpattnaik780 - add an example to the guides repo.

Should we create a new release (0.29.0) before updating the guides?

You should have the pull request ready to go as a draft, and merge it once 0.29.0 is released.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add the ability to reset the offset of an existing Kafka consumer group

4 participants