diff --git a/src/rdkafka_msgset_reader.c b/src/rdkafka_msgset_reader.c index 2bebf49c6..c880a583f 100644 --- a/src/rdkafka_msgset_reader.c +++ b/src/rdkafka_msgset_reader.c @@ -653,7 +653,8 @@ rd_kafka_msgset_reader_msg_v0_1(rd_kafka_msgset_reader_t *msetr) { * we cant perform this offset check here * in that case. */ if (!relative_offsets && - hdr.Offset < rktp->rktp_offsets.fetch_pos.offset) + hdr.Offset < rktp->rktp_offsets.fetch_pos.offset && + !RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk)) return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */ /* Handle compressed MessageSet */ @@ -751,8 +752,11 @@ rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) { * epoch is different the fetch will fail (KIP-320) and if offset leader * epoch is different it'll return an empty fetch (KIP-595). If we * checked it, it's possible to have a loop when moving from a broker - * that supports leader epoch to one that doesn't. */ - if (hdr.Offset < rktp->rktp_offsets.fetch_pos.offset) { + * that supports leader epoch to one that doesn't. + * Share consumers may receive re-delivered records at earlier offsets + * after RELEASE, so skip this check for them. */ + if (hdr.Offset < rktp->rktp_offsets.fetch_pos.offset && + !RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk)) { rd_rkb_dbg( msetr->msetr_rkb, MSG, "MSG", "%s [%" PRId32 @@ -1125,8 +1129,11 @@ rd_kafka_msgset_reader_v2(rd_kafka_msgset_reader_t *msetr) { rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, hdr.BaseOffset, payload_size); - /* If entire MessageSet contains old outdated offsets, skip it. */ - if (LastOffset < rktp->rktp_offsets.fetch_pos.offset) { + /* If entire MessageSet contains old outdated offsets, skip it. + * Share consumers may receive re-delivered records at earlier offsets + * after RELEASE, so skip this check for them. */ + if (LastOffset < rktp->rktp_offsets.fetch_pos.offset && + !RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk)) { rd_kafka_buf_skip(rkbuf, payload_size); goto done; } @@ -1237,7 +1244,8 @@ rd_kafka_msgset_reader_peek_msg_version(rd_kafka_msgset_reader_t *msetr, rd_slice_size(&rkbuf->rkbuf_reader)); if (Offset >= - msetr->msetr_rktp->rktp_offsets.fetch_pos.offset) { + msetr->msetr_rktp->rktp_offsets.fetch_pos.offset && + !RD_KAFKA_IS_SHARE_CONSUMER(msetr->msetr_rkb->rkb_rk)) { rd_kafka_consumer_err( &msetr->msetr_rkq, msetr->msetr_broker_id, RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, @@ -1327,6 +1335,14 @@ static void rd_kafka_msgset_reader_postproc(rd_kafka_msgset_reader_t *msetr, if (rko) { *last_offsetp = rko->rko_u.fetch.rkm.rkm_offset; + /* + * TODO KIP-932: Though we shouldn't reach here for share + * consumers as msetr_relative_offsets is set to true only + * for MsgVersion v1 but still check if MsgVersion v1 is + * possible with share consumer. If not, this handling can + * be skipped safely. If yes, then we should use negative + * offset in place of min_offset instead of fetch_pos.offset. + */ if (*last_offsetp != -1 && msetr->msetr_relative_offsets) { /* Update messages to absolute offsets * and purge any messages older than the current @@ -1394,6 +1410,10 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) { err = RD_KAFKA_RESP_ERR_NO_ERROR; } else if (!err && msetr->msetr_aborted_cnt == 0) { + /* + * TODO KIP-932: Check how to handle fetch_pos.offset + * for share consumer. + */ rd_kafka_consumer_err( &msetr->msetr_rkq, msetr->msetr_broker_id, RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, @@ -1445,12 +1465,20 @@ rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) { if (rd_kafka_q_concat(msetr->msetr_par_rkq, &msetr->msetr_rkq) != -1) { /* Update partition's fetch offset based on * last message's offest. */ + /* + * TODO KIP-932: Might need to remove this handling of + * fetch_pos.offset for share consumer. + */ if (likely(last_offset != -1)) rktp->rktp_offsets.fetch_pos.offset = last_offset + 1; } /* Adjust next fetch offset if outlier code has indicated * an even later next offset. */ + /* + * TODO KIP-932: Might need to remove this handling of fetch_pos.offset + * for share consumer. + */ if (msetr->msetr_next_offset > rktp->rktp_offsets.fetch_pos.offset) rktp->rktp_offsets.fetch_pos.offset = msetr->msetr_next_offset;