Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions src/rdkafka_msgset_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ rd_kafka_msgset_reader_msg_v0_1(rd_kafka_msgset_reader_t *msetr) {
* we cant perform this offset check here
* in that case. */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* in that case. */
* in that case.
* Share consumers may receive re-delivered records at earlier offsets
* after RELEASE, so skip this check for them.*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe add a TODO to remove this handling after making sure we dont recieve Messages v0 and v1 after broker v4.0

if (!relative_offsets &&
hdr.Offset < rktp->rktp_offsets.fetch_pos.offset)
hdr.Offset < rktp->rktp_offsets.fetch_pos.offset &&
!RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk))
return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */

/* Handle compressed MessageSet */
Expand Down Expand Up @@ -751,8 +752,11 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) {
* epoch is different the fetch will fail (KIP-320) and if offset leader
* epoch is different it'll return an empty fetch (KIP-595). If we
* checked it, it's possible to have a loop when moving from a broker
* that supports leader epoch to one that doesn't. */
if (hdr.Offset < rktp->rktp_offsets.fetch_pos.offset) {
* that supports leader epoch to one that doesn't.
* Share consumers may receive re-delivered records at earlier offsets
* after RELEASE, so skip this check for them. */
if (hdr.Offset < rktp->rktp_offsets.fetch_pos.offset &&
!RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk)) {
rd_rkb_dbg(
msetr->msetr_rkb, MSG, "MSG",
"%s [%" PRId32
Expand Down Expand Up @@ -1125,8 +1129,11 @@ rd_kafka_msgset_reader_v2(rd_kafka_msgset_reader_t *msetr) {
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
hdr.BaseOffset, payload_size);

/* If entire MessageSet contains old outdated offsets, skip it. */
if (LastOffset < rktp->rktp_offsets.fetch_pos.offset) {
/* If entire MessageSet contains old outdated offsets, skip it.
* Share consumers may receive re-delivered records at earlier offsets
* after RELEASE, so skip this check for them. */
if (LastOffset < rktp->rktp_offsets.fetch_pos.offset &&
!RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk)) {
rd_kafka_buf_skip(rkbuf, payload_size);
goto done;
}
Expand Down Expand Up @@ -1237,7 +1244,8 @@ rd_kafka_msgset_reader_peek_msg_version(rd_kafka_msgset_reader_t *msetr,
rd_slice_size(&rkbuf->rkbuf_reader));

if (Offset >=
msetr->msetr_rktp->rktp_offsets.fetch_pos.offset) {
msetr->msetr_rktp->rktp_offsets.fetch_pos.offset &&
!RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk)) {
rd_kafka_consumer_err(
&msetr->msetr_rkq, msetr->msetr_broker_id,
RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
Expand Down Expand Up @@ -1327,6 +1335,14 @@ static void rd_kafka_msgset_reader_postproc(rd_kafka_msgset_reader_t *msetr,
if (rko) {
*last_offsetp = rko->rko_u.fetch.rkm.rkm_offset;

/*
* TODO KIP-932: Though we shouldn't reach here for share
* consumers as msetr_relative_offsets is set to true only
* for MsgVersion v1 but still check if MsgVersion v1 is
* possible with share consumer. If not, this handling can
* be skipped safely. If yes, then we should use negative
* offset in place of min_offset instead of fetch_pos.offset.
*/
if (*last_offsetp != -1 && msetr->msetr_relative_offsets) {
/* Update messages to absolute offsets
* and purge any messages older than the current
Expand Down Expand Up @@ -1394,6 +1410,10 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
err = RD_KAFKA_RESP_ERR_NO_ERROR;

} else if (!err && msetr->msetr_aborted_cnt == 0) {
/*
* TODO KIP-932: Check how to handle fetch_pos.offset
* for share consumer.
*/
rd_kafka_consumer_err(
&msetr->msetr_rkq, msetr->msetr_broker_id,
RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE,
Expand Down Expand Up @@ -1445,12 +1465,20 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) {
if (rd_kafka_q_concat(msetr->msetr_par_rkq, &msetr->msetr_rkq) != -1) {
/* Update partition's fetch offset based on
* last message's offest. */
/*
* TODO KIP-932: Might need to remove this handling of
* fetch_pos.offset for share consumer.
*/
if (likely(last_offset != -1))
rktp->rktp_offsets.fetch_pos.offset = last_offset + 1;
}

/* Adjust next fetch offset if outlier code has indicated
* an even later next offset. */
/*
* TODO KIP-932: Might need to remove this handling of fetch_pos.offset
* for share consumer.
*/
if (msetr->msetr_next_offset > rktp->rktp_offsets.fetch_pos.offset)
rktp->rktp_offsets.fetch_pos.offset = msetr->msetr_next_offset;

Expand Down