Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Use Oldest offset as the initial one when creating the consumer group#428

Closed
slinkydeveloper wants to merge 1 commit intoknative-extensions:masterfrom
slinkydeveloper:issues/420
Closed

Use Oldest offset as the initial one when creating the consumer group#428
slinkydeveloper wants to merge 1 commit intoknative-extensions:masterfrom
slinkydeveloper:issues/420

Conversation

@slinkydeveloper
Copy link
Contributor

Signed-off-by: Francesco Guardiani [email protected]

Fixes #420

Proposed Changes

  • Use OffsetOldest as default, but let user change this value through the config-kafka CM

Release Note


Docs

@slinkydeveloper slinkydeveloper requested a review from aliok March 3, 2021 11:02
@slinkydeveloper slinkydeveloper requested review from a team as code owners March 3, 2021 11:02
@slinkydeveloper slinkydeveloper requested a review from a team March 3, 2021 11:02
@google-cla google-cla bot added the cla: yes Indicates the PR's author has signed the CLA. label Mar 3, 2021
@knative-prow-robot knative-prow-robot added the size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. label Mar 3, 2021
@aliok
Copy link
Member

aliok commented Mar 3, 2021

  • Should we also do it for distributed channel?
  • Sorry, I don't have 100% knowledge on this: why not keeping the default as is but documenting that this behavior can be done with the CM change?

@knative-metrics-robot
Copy link

The following is the coverage report on the affected files.
Say /test pull-knative-sandbox-eventing-kafka-go-coverage to re-run this coverage report

File Old Coverage New Coverage Delta
pkg/channel/consolidated/dispatcher/dispatcher.go 67.2% 67.6% 0.4

@codecov
Copy link

codecov bot commented Mar 3, 2021

Codecov Report

Merging #428 (04b1055) into master (b7499c4) will increase coverage by 0.01%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #428      +/-   ##
==========================================
+ Coverage   73.44%   73.45%   +0.01%     
==========================================
  Files         129      129              
  Lines        5008     5011       +3     
==========================================
+ Hits         3678     3681       +3     
  Misses       1094     1094              
  Partials      236      236              
Impacted Files Coverage Δ
pkg/channel/consolidated/dispatcher/dispatcher.go 55.90% <100.00%> (+0.60%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b7499c4...04b1055. Read the comment docs.

@slinkydeveloper
Copy link
Contributor Author

Should we also do it for distributed channel?

I honestly don't know cc @travis-minke-sap

Sorry, I don't have 100% knowledge on this: why not keeping the default as is but documenting that this behavior can be done with the CM change?

Because the default behaviour doesn't actually reflect the user expectations, which is: if no offset is committed, the consumer group should start reading from the beginning of the partition. In other words, this should be the default behaviour.

@travis-minke-sap
Copy link
Contributor

travis-minke-sap commented Mar 3, 2021

Should we also do it for distributed channel?

I honestly don't know cc @travis-minke-sap

I think it would be good for the distributed and consolidated channels to support the same configuration, and I support the idea of making it a config setting. I prefer that we not change the distributed implementation to default to the oldest offset UNTIL the configuration option has been added so that current users will have the option to maintain the current behavior.

Sorry, I don't have 100% knowledge on this: why not keeping the default as is but documenting that this behavior can be done with the CM change?

Because the default behaviour doesn't actually reflect the user expectations, which is: if no offset is committed, the consumer group should start reading from the beginning of the partition. In other words, this should be the default behaviour.

I'm curious as to why the oldest offset should be the default? How do you know that is the user's assumption / expectation? I'm not saying you're wrong - just that both assumptions seem equally valid. Have we heard otherwise from users? I can imagine high-volume use cases where historical data (from days/weeks ago) is not of any use to a subscriber and they just want to start receiving new events.

@travis-minke-sap
Copy link
Contributor

travis-minke-sap commented Mar 3, 2021

After reading associated Issue #420 ; )

This seems like a startup bootstrap edge case that we're just using the initial offset as an easy fix for? Should the dispatcher maybe commit the previous offset (not sure if that's possible) upon startup to put a marker in place as to where it started in case the first event fails?

  • By "previous offset" I mean... determine what the current/next offset would be and subtract one (or whatever) and commit that? I know in an older iteration (using confluent and not sarama) we we're able to determine the offset from a timestamp - maybe could do something similar to determine the baseline offset to commit upon startup before processing events?

Again (regardless of whatever the default value might be) I still think making it configurable is a win if we have the time ; )

Copy link
Contributor

@matzew matzew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/approve

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Mar 3, 2021
@knative-prow-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: matzew

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow-robot knative-prow-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Mar 3, 2021
@matzew
Copy link
Contributor

matzew commented Mar 3, 2021

/hold

@knative-prow-robot knative-prow-robot added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Mar 3, 2021

func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) {
confTemplate := sarama.NewConfig()
confTemplate.Consumer.Offsets.Initial = sarama.OffsetOldest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens on upgrades (e.g. from 0.21 to 0.22), when this changes?

Does it have side effects.

I shot too quick w/ the LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slinkydeveloper
Copy link
Contributor Author

This seems like a startup bootstrap edge case that we're just using the initial offset as an easy fix for?

TBH I don't think it's an easy fix, IMO is logically correct: the channel topic is a topic we control and the dispatcher consumer group always starts from the beginning: when you start consuming there might be messages that are already pushed in there.

Ok I'm thinking about this again. Because our consumer groups are per subscription, when a new subscription starts, using OldestOffset will start re-sending from the beginning all the messages in the channel... I completely neglected this... Is that at least once anyway right?

Should the dispatcher maybe commit the previous offset (not sure if that's possible) upon startup to put a marker in place as to where it started in case the first event fails?

I'm pretty sure you can't commit -1 as offset.

Again (regardless of whatever the default value might be) I still think making it configurable is a win if we have the time ; )

It's still configurable this way, you can change it in config-kafka, mine is setting a default.

we we're able to determine the offset from a timestamp - maybe could do something similar to determine the baseline offset to commit upon startup before processing events?

That sounds like a good idea

@travis-minke-sap
Copy link
Contributor

Ok I'm thinking about this again. Because our consumer groups are per subscription, when a new subscription starts, using OldestOffset will start re-sending from the beginning all the messages in the channel... I completely neglected this... Is that at least once anyway right?

right - it would start every new subscription from the beginning of the channel (kafka topic/log)

I'm pretty sure you can't commit -1 as offset.

Yeah, sorry - I didn't mean commit -1 explicitly - I meant determine what the current/next offset would be and then subtract one from that to get the point in the log/topic prior to starting to send events...

It's still configurable this way, you can change it in config-kafka, mine is setting a default.

Yeah, agreed - I only meant that we need to implement the configurability before we change the "default" (at least for distributed implementation) rather than change the default and maybe later get around to making it configurable ; ). cart/horse vs horse/cart is all - haha

we we're able to determine the offset from a timestamp - maybe could do something similar to determine the baseline offset to commit upon startup before processing events?

That sounds like a good idea

👍

@slinkydeveloper
Copy link
Contributor Author

Yeah, sorry - I didn't mean commit -1 explicitly - I meant determine what the current/next offset would be and then subtract one from that to get the point in the log/topic prior to starting to send events...

This would be -1 when the topic is new (like in the example of #420)

rather than change the default and maybe later get around to making it configurable

This change doesn't affect distributed impl, right? Are you worried it might cause inconsistencies between the 2 impls?

@travis-minke-sap
Copy link
Contributor

This would be -1 when the topic is new (like in the example of #420)

Yeah, I assume we'd be able to handle the 0 case to not subtract 1 - just an idea - same issue arises if we determine the current offset from the timestamp - it might be 0 as well.

This change doesn't affect distributed impl, right? Are you worried it might cause inconsistencies between the 2 impls?

Correct - this PR is not affecting the distributed implementation and I (as a user of the distributed channel) am not affected by this change and am fine with it proceeding. The comments are just around @aliok 's question of how to do this in a consistent way for both channels. If there is urgency for making this change in the consolidated channel, and then later we want to add configurability to distributed/both and change the distributed default value, - thats ok with me ; ). The only downside (as you mention) is the difference between the two channels for that interim time period.

Copy link
Contributor

@devguyio devguyio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not correct. @slinkydeveloper as you've said, this means that a subscription can end up receiving events that are days before its creation and that's not correct.

@knative-prow-robot knative-prow-robot removed the lgtm Indicates that a PR is ready to be merged. label Mar 3, 2021
@slinkydeveloper
Copy link
Contributor Author

Ok then I'll close it and we can tell the user to use the config-kafka map to fix its corner case

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

approved Indicates a PR has been approved by an approver from all required OWNERS files. cla: yes Indicates the PR's author has signed the CLA. do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. size/XS Denotes a PR that changes 0-9 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

After kafka-ch-dispatcher is restarted, undelivered events in Kafkatopic are not delivered.

7 participants