1919import static io .streamnative .pulsar .handlers .kop .KafkaProtocolHandler .ListenerType .SSL ;
2020import static io .streamnative .pulsar .handlers .kop .KafkaProtocolHandler .getKopBrokerUrl ;
2121import static io .streamnative .pulsar .handlers .kop .KafkaProtocolHandler .getListenerPort ;
22- import static io .streamnative .pulsar .handlers .kop .MessagePublishContext .publishMessages ;
22+ import static io .streamnative .pulsar .handlers .kop .MessagePublishContext .MESSAGE_BATCHED ;
23+ import static io .streamnative .pulsar .handlers .kop .utils .MessageRecordUtils .recordsToByteBuf ;
2324import static io .streamnative .pulsar .handlers .kop .utils .TopicNameUtils .getKafkaTopicNameFromPulsarTopicname ;
2425import static io .streamnative .pulsar .handlers .kop .utils .TopicNameUtils .pulsarTopicName ;
2526import static java .nio .charset .StandardCharsets .UTF_8 ;
2930import com .google .common .collect .Lists ;
3031import com .google .common .collect .Maps ;
3132import com .google .common .collect .Queues ;
33+ import io .netty .buffer .ByteBuf ;
3234import io .netty .channel .ChannelHandlerContext ;
3335import io .streamnative .pulsar .handlers .kop .coordinator .group .GroupCoordinator ;
3436import io .streamnative .pulsar .handlers .kop .coordinator .group .GroupMetadata .GroupOverview ;
3840import io .streamnative .pulsar .handlers .kop .utils .MessageIdUtils ;
3941import io .streamnative .pulsar .handlers .kop .utils .OffsetFinder ;
4042import io .streamnative .pulsar .handlers .kop .utils .SaslUtils ;
43+
4144import java .io .IOException ;
4245import java .net .InetSocketAddress ;
4346import java .net .URI ;
6063import java .util .stream .Collectors ;
6164import java .util .stream .IntStream ;
6265import javax .naming .AuthenticationException ;
66+
6367import lombok .Getter ;
6468import lombok .extern .slf4j .Slf4j ;
6569import org .apache .bookkeeper .mledger .AsyncCallbacks ;
@@ -496,92 +500,81 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
496500 // whether the head of queue is running.
497501 private AtomicBoolean isHeadRequestRun = new AtomicBoolean (false );
498502
499- private void handleProducerRequestInternal () {
500- // the first request that success set to running, get running.
501- if (produceRequestsQueue .isEmpty () || !isHeadRequestRun .compareAndSet (false , true )) {
502- // the head of queue is already running, when head complete, it will peek the following request to run.
503- if (log .isDebugEnabled ()) {
504- log .debug (" Produce messages not entered. queue.size: {}, head isHeadRequestRun: {}" ,
505- produceRequestsQueue .size (), isHeadRequestRun .get ());
506- }
507- return ;
508- }
509-
510- Pair <KafkaHeaderAndRequest , CompletableFuture <AbstractResponse >> head = produceRequestsQueue .peek ();
511- KafkaHeaderAndRequest produceHar = head .getKey ();
512- CompletableFuture <AbstractResponse > resultFuture = head .getValue ();
513- ProduceRequest produceRequest = (ProduceRequest ) produceHar .getRequest ();
514-
515- // Ignore request.acks() and request.timeout(), which related to kafka replication in this broker.
516- Map <TopicPartition , CompletableFuture <PartitionResponse >> responsesFutures = new HashMap <>();
503+ private ConcurrentHashMap <TopicName , Queue <Pair <CompletableFuture <ByteBuf >, CompletableFuture <PartitionResponse >>>>
504+ transQueue = new ConcurrentHashMap <>();
517505
518- final int responsesSize = produceRequest .partitionRecordsOrFail ().size ();
519-
520- // TODO: handle un-exist topic:
521- // nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
522- for (Map .Entry <TopicPartition , ? extends Records > entry : produceRequest .partitionRecordsOrFail ().entrySet ()) {
523- TopicPartition topicPartition = entry .getKey ();
524-
525- CompletableFuture <PartitionResponse > partitionResponse = new CompletableFuture <>();
526- responsesFutures .put (topicPartition , partitionResponse );
506+ private void publishMessages (MemoryRecords records ,
507+ TopicName topic ,
508+ CompletableFuture <PartitionResponse > future ,
509+ CompletableFuture <ByteBuf > transFuture ) {
510+ // get records size.
511+ AtomicInteger size = new AtomicInteger (0 );
512+ records .records ().forEach (record -> size .incrementAndGet ());
513+ int rec = size .get ();
527514
528- if (log .isDebugEnabled ()) {
529- log .debug ("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} " ,
530- ctx .channel (), produceHar .getHeader (),
531- topicPartition .topic (), topicPartition .partition (), responsesSize );
532- }
515+ if (log .isDebugEnabled ()) {
516+ log .debug ("publishMessages for topic partition: {} , records size is {} " , topic .toString (), size .get ());
517+ }
533518
534- TopicName topicName = pulsarTopicName (topicPartition , namespace );
519+ if (MESSAGE_BATCHED ) {
520+ pulsarService .getExecutor ().submit (() -> {
521+ ByteBuf buf = recordsToByteBuf (records , rec );
522+ transFuture .complete (buf );
523+ });
535524
536- topicManager .getTopic (topicName .toString ()).whenComplete ((persistentTopic , exception ) -> {
537- if (exception != null || persistentTopic == null ) {
538- log .warn ("[{}] Request {}: Failed to getOrCreateTopic {}. "
539- + "Topic is in loading status, return LEADER_NOT_AVAILABLE. exception:" ,
540- ctx .channel (), produceHar .getHeader (), topicName , exception );
541- partitionResponse .complete (new PartitionResponse (Errors .LEADER_NOT_AVAILABLE ));
525+ transFuture .whenComplete ((headerAndPayload , ex ) -> {
526+ if (ex != null ) {
527+ log .error ("record to bytebuf error: " , ex );
528+ future .complete (new PartitionResponse (Errors .KAFKA_STORAGE_ERROR ));
542529 } else {
543- CompletableFuture <PersistentTopic > topicFuture = new CompletableFuture <>();
544- topicFuture .complete (persistentTopic );
545- publishMessages ((MemoryRecords ) entry .getValue (), persistentTopic , partitionResponse );
530+ doPublishMessages (topic );
546531 }
547532 });
548533 }
534+ }
549535
550- CompletableFuture .allOf (responsesFutures .values ().toArray (new CompletableFuture <?>[responsesSize ]))
551- .whenComplete ((ignore , ex ) -> {
552- // all ex has translated to PartitionResponse with Errors.KAFKA_STORAGE_ERROR
553- Map <TopicPartition , PartitionResponse > responses = new ConcurrentHashMap <>();
554- for (Map .Entry <TopicPartition , CompletableFuture <PartitionResponse >> entry :
555- responsesFutures .entrySet ()) {
556- responses .put (entry .getKey (), entry .getValue ().join ());
557- }
536+ private void doPublishMessages (TopicName topic ) {
537+ Queue <Pair <CompletableFuture <ByteBuf >, CompletableFuture <PartitionResponse >>> topicQueue =
538+ transQueue .get (topic );
558539
540+ // loop from first responseFuture.
541+ while (topicQueue != null && topicQueue .peek () != null
542+ && topicQueue .peek ().getLeft ().isDone () && isActive .get ()) {
543+ CompletableFuture <Long > offsetFuture = new CompletableFuture <>();
544+ Pair <CompletableFuture <ByteBuf >, CompletableFuture <PartitionResponse >> result = topicQueue .remove ();
545+ try {
559546 if (log .isDebugEnabled ()) {
560- log .debug ("[{}] Request {}: Complete handle produce." ,
561- ctx .channel (), produceHar .toString ());
547+ log .debug ("[{}] topic" , topic .toString ());
562548 }
563- resultFuture .complete (new ProduceResponse (responses ));
564- });
565-
566- // trigger following request to run.
567- resultFuture .whenComplete ((response , throwable ) -> {
568- if (throwable != null ) {
569- log .warn ("Error produce message for {}." , produceHar .getHeader (), throwable );
570- }
549+ ByteBuf headerAndPayload = result .getLeft ().get ();
550+ topicManager .getTopic (topic .toString ()).whenComplete ((persistentTopic , throwable ) -> {
551+ if (throwable != null || persistentTopic == null ) {
552+ log .warn ("[{}] Request {}: Failed to getOrCreateTopic {}. "
553+ + "Topic is in loading status, return LEADER_NOT_AVAILABLE. exception:" ,
554+ ctx .channel (), topic .toString (), throwable );
555+ result .getRight ().complete (new PartitionResponse (Errors .LEADER_NOT_AVAILABLE ));
556+ } else {
557+ persistentTopic .publishMessage (
558+ headerAndPayload ,
559+ MessagePublishContext .get (
560+ offsetFuture , persistentTopic , System .nanoTime ()));
561+ }
562+ });
571563
572- if (log .isDebugEnabled ()) {
573- log .debug ("Produce messages complete. trigger next. queue.size: {}, head isHeadRequestRun: {}" ,
574- produceRequestsQueue .size (), isHeadRequestRun .get ());
564+ offsetFuture .whenComplete ((offset , ex ) -> {
565+ if (ex != null ) {
566+ log .error ("publishMessages for topic partition: {} failed when write." ,
567+ topic .toString (), ex );
568+ result .getRight ().complete (new PartitionResponse (Errors .KAFKA_STORAGE_ERROR ));
569+ } else {
570+ result .getRight ().complete (new PartitionResponse (Errors .NONE ));
571+ }
572+ });
573+ } catch (Exception e ) {
574+ // should not comes here.
575+ log .error ("error to get Response ByteBuf:" , e );
575576 }
576-
577- boolean compare = isHeadRequestRun .compareAndSet (true , false );
578- checkState (compare , "Head should be running when completed head." );
579- // remove completed request.
580- produceRequestsQueue .remove ();
581-
582- // trigger another run.
583- handleProducerRequestInternal ();
584- });
577+ }
585578 }
586579
587580 protected void handleProduceRequest (KafkaHeaderAndRequest produceHar ,
@@ -596,13 +589,59 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
596589 return ;
597590 }
598591
599- if (log .isDebugEnabled ()) {
600- log .debug (" new produce request comes: {}, isHeadRequestRun: {}" ,
601- produceRequestsQueue .size (), isHeadRequestRun .get ());
592+ Map <TopicPartition , CompletableFuture <PartitionResponse >> responsesFutures = new HashMap <>();
593+
594+ final int responsesSize = produceRequest .partitionRecordsOrFail ().size ();
595+
596+ // TODO: handle un-exist topic:
597+ // nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
598+ for (Map .Entry <TopicPartition , ? extends Records > entry : produceRequest .partitionRecordsOrFail ().entrySet ()) {
599+ TopicPartition topicPartition = entry .getKey ();
600+
601+ CompletableFuture <PartitionResponse > partitionResponse = new CompletableFuture <>();
602+ responsesFutures .put (topicPartition , partitionResponse );
603+
604+ if (log .isDebugEnabled ()) {
605+ log .debug ("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} " ,
606+ ctx .channel (), produceHar .getHeader (),
607+ topicPartition .topic (), topicPartition .partition (), responsesSize );
608+ }
609+
610+ TopicName topicName = pulsarTopicName (topicPartition , namespace );
611+
612+ CompletableFuture <ByteBuf > transFuture = new CompletableFuture <>();
613+ //put queue
614+ transQueue .compute (topicName , (key , queue ) -> {
615+ if (queue == null ) {
616+ Queue <Pair <CompletableFuture <ByteBuf >, CompletableFuture <PartitionResponse >>> newQueue =
617+ Queues .newConcurrentLinkedQueue ();
618+ newQueue .add (Pair .of (transFuture , partitionResponse ));
619+ return newQueue ;
620+ } else {
621+ queue .add (Pair .of (transFuture , partitionResponse ));
622+ return queue ;
623+ }
624+ });
625+
626+ topicManager .getTopic (topicName .toString ());
627+ publishMessages ((MemoryRecords ) entry .getValue (), topicName , partitionResponse , transFuture );
602628 }
603- produceRequestsQueue .add (Pair .of (produceHar , resultFuture ));
604629
605- handleProducerRequestInternal ();
630+ CompletableFuture .allOf (responsesFutures .values ().toArray (new CompletableFuture <?>[responsesSize ]))
631+ .whenComplete ((ignore , ex ) -> {
632+ // all ex has translated to PartitionResponse with Errors.KAFKA_STORAGE_ERROR
633+ Map <TopicPartition , PartitionResponse > responses = new ConcurrentHashMap <>();
634+ for (Map .Entry <TopicPartition , CompletableFuture <PartitionResponse >> entry :
635+ responsesFutures .entrySet ()) {
636+ responses .put (entry .getKey (), entry .getValue ().join ());
637+ }
638+
639+ if (log .isDebugEnabled ()) {
640+ log .debug ("[{}] Request {}: Complete handle produce." ,
641+ ctx .channel (), produceHar .toString ());
642+ }
643+ resultFuture .complete (new ProduceResponse (responses ));
644+ });
606645 }
607646
608647 protected void handleFindCoordinatorRequest (KafkaHeaderAndRequest findCoordinator ,
0 commit comments