1313from kafka .future import Future
1414from kafka .metrics .stats import Avg , Count , Max , Rate
1515from kafka .protocol .fetch import FetchRequest
16- from kafka .protocol .message import PartialMessage
1716from kafka .protocol .offset import (
1817 OffsetRequest , OffsetResetStrategy , UNKNOWN_OFFSET
1918)
19+ from kafka .record import MemoryRecords
2020from kafka .serializer import Deserializer
2121from kafka .structs import TopicPartition , OffsetAndTimestamp
2222
@@ -304,7 +304,7 @@ def fetched_records(self, max_records=None):
304304
305305 Raises:
306306 OffsetOutOfRangeError: if no subscription offset_reset_strategy
307- InvalidMessageError : if message crc validation fails (check_crcs
307+ CorruptRecordException : if message crc validation fails (check_crcs
308308 must be set to True)
309309 RecordTooLargeError: if a message is larger than the currently
310310 configured max_partition_fetch_bytes
@@ -449,57 +449,25 @@ def _message_generator(self):
449449
450450 self ._next_partition_records = None
451451
452- def _unpack_message_set (self , tp , messages ):
452+ def _unpack_message_set (self , tp , records ):
453453 try :
454- for offset , size , msg in messages :
455- if self .config ['check_crcs' ] and not msg .validate_crc ():
456- raise Errors .InvalidMessageError (msg )
457-
458- if not msg .is_compressed ():
459- yield self ._parse_record (tp , offset , msg .timestamp , msg )
460-
461- else :
462- # If relative offset is used, we need to decompress the entire message first
463- # to compute the absolute offset.
464- inner_mset = msg .decompress ()
465-
466- # There should only ever be a single layer of compression
467- if inner_mset [0 ][- 1 ].is_compressed ():
468- log .warning ('MessageSet at %s offset %d appears '
469- ' double-compressed. This should not'
470- ' happen -- check your producers!' ,
471- tp , offset )
472- if self .config ['skip_double_compressed_messages' ]:
473- log .warning ('Skipping double-compressed message at'
474- ' %s %d' , tp , offset )
475- continue
476-
477- if msg .magic > 0 :
478- last_offset , _ , _ = inner_mset [- 1 ]
479- absolute_base_offset = offset - last_offset
480- else :
481- absolute_base_offset = - 1
482-
483- for inner_offset , inner_size , inner_msg in inner_mset :
484- if msg .magic > 0 :
485- # When magic value is greater than 0, the timestamp
486- # of a compressed message depends on the
487- # typestamp type of the wrapper message:
488-
489- if msg .timestamp_type == 0 : # CREATE_TIME (0)
490- inner_timestamp = inner_msg .timestamp
491-
492- elif msg .timestamp_type == 1 : # LOG_APPEND_TIME (1)
493- inner_timestamp = msg .timestamp
494-
495- else :
496- raise ValueError ('Unknown timestamp type: {0}' .format (msg .timestamp_type ))
497- else :
498- inner_timestamp = msg .timestamp
499-
500- if absolute_base_offset >= 0 :
501- inner_offset += absolute_base_offset
502- yield self ._parse_record (tp , inner_offset , inner_timestamp , inner_msg )
454+ batch = records .next_batch ()
455+ while batch is not None :
456+ for record in batch :
457+ key_size = len (record .key ) if record .key is not None else - 1
458+ value_size = len (record .value ) if record .value is not None else - 1
459+ key = self ._deserialize (
460+ self .config ['key_deserializer' ],
461+ tp .topic , record .key )
462+ value = self ._deserialize (
463+ self .config ['value_deserializer' ],
464+ tp .topic , record .value )
465+ yield ConsumerRecord (
466+ tp .topic , tp .partition , record .offset , record .timestamp ,
467+ record .timestamp_type , key , value , record .checksum ,
468+ key_size , value_size )
469+
470+ batch = records .next_batch ()
503471
504472 # If unpacking raises StopIteration, it is erroneously
505473 # caught by the generator. We want all exceptions to be raised
@@ -508,21 +476,6 @@ def _unpack_message_set(self, tp, messages):
508476 log .exception ('StopIteration raised unpacking messageset: %s' , e )
509477 raise Exception ('StopIteration raised unpacking messageset' )
510478
511- # If unpacking raises AssertionError, it means decompression unsupported
512- # See Issue 1033
513- except AssertionError as e :
514- log .exception ('AssertionError raised unpacking messageset: %s' , e )
515- raise
516-
517- def _parse_record (self , tp , offset , timestamp , msg ):
518- key = self ._deserialize (self .config ['key_deserializer' ], tp .topic , msg .key )
519- value = self ._deserialize (self .config ['value_deserializer' ], tp .topic , msg .value )
520- return ConsumerRecord (tp .topic , tp .partition , offset ,
521- timestamp , msg .timestamp_type ,
522- key , value , msg .crc ,
523- len (msg .key ) if msg .key is not None else - 1 ,
524- len (msg .value ) if msg .value is not None else - 1 )
525-
526479 def __iter__ (self ): # pylint: disable=non-iterator-returned
527480 return self
528481
@@ -784,15 +737,13 @@ def _handle_fetch_response(self, request, send_time, response):
784737
785738 def _parse_fetched_data (self , completed_fetch ):
786739 tp = completed_fetch .topic_partition
787- partition = completed_fetch .partition_data
788740 fetch_offset = completed_fetch .fetched_offset
789741 num_bytes = 0
790742 records_count = 0
791743 parsed_records = None
792744
793745 error_code , highwater = completed_fetch .partition_data [:2 ]
794746 error_type = Errors .for_code (error_code )
795- messages = completed_fetch .partition_data [- 1 ]
796747
797748 try :
798749 if not self ._subscriptions .is_fetchable (tp ):
@@ -816,21 +767,18 @@ def _parse_fetched_data(self, completed_fetch):
816767 position )
817768 return None
818769
819- partial = None
820- if messages and isinstance (messages [- 1 ][- 1 ], PartialMessage ):
821- partial = messages .pop ()
822-
823- if messages :
770+ records = MemoryRecords (completed_fetch .partition_data [- 1 ])
771+ if records .has_next ():
824772 log .debug ("Adding fetched record for partition %s with"
825773 " offset %d to buffered record list" , tp ,
826774 position )
827- unpacked = list (self ._unpack_message_set (tp , messages ))
775+ unpacked = list (self ._unpack_message_set (tp , records ))
828776 parsed_records = self .PartitionRecords (fetch_offset , tp , unpacked )
829- last_offset , _ , _ = messages [- 1 ]
777+ last_offset = unpacked [- 1 ]. offset
830778 self ._sensors .records_fetch_lag .record (highwater - last_offset )
831- num_bytes = sum ( msg [ 1 ] for msg in messages )
832- records_count = len (messages )
833- elif partial :
779+ num_bytes = records . valid_bytes ( )
780+ records_count = len (unpacked )
781+ elif records . size_in_bytes () > 0 :
834782 # we did not read a single message from a non-empty
835783 # buffer because that message's size is larger than
836784 # fetch size, in this case record this exception
0 commit comments