Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 8f1c1b2

Browse files
committed
Merge branch 'branch-3.0' into branch-3.0.0.1-rc1
2 parents 79edf13 + 17fab5d commit 8f1c1b2

File tree

7 files changed

+26
-17
lines changed

7 files changed

+26
-17
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/DelayedFetch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void onComplete() {
9191
replicaManager.readFromLocalLog(
9292
readCommitted, fetchMaxBytes, maxReadEntriesNum, readPartitionInfo, context
9393
).thenAccept(readRecordsResult -> {
94-
this.context.getStatsLogger().getWaitingFetchesTriggered().add(1);
94+
this.context.getStatsLogger().getWaitingFetchesTriggered().addCount(1);
9595
this.callback.complete(readRecordsResult);
9696
}).thenAccept(__ -> {
9797
// Ensure the old decode result are recycled.

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ protected boolean channelReady() {
189189
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
190190
// Get a buffer that contains the full frame
191191
ByteBuf buffer = (ByteBuf) msg;
192-
requestStats.getNetworkTotalBytesIn().add(buffer.readableBytes());
192+
requestStats.getNetworkTotalBytesIn().addCount(buffer.readableBytes());
193193

194194
// Update parse request latency metrics
195195
final BiConsumer<Long, Throwable> registerRequestParseLatency = (timeBeforeParse, throwable) -> {
@@ -448,7 +448,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
448448
if (!future.isSuccess()) {
449449
log.error("[{}] Failed to write {}", channel, request.getHeader(), future.cause());
450450
} else {
451-
requestStats.getNetworkTotalBytesOut().add(resultSize);
451+
requestStats.getNetworkTotalBytesOut().addCount(resultSize);
452452
}
453453
});
454454
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY)
@@ -474,7 +474,7 @@ private void sendErrorResponse(KafkaHeaderAndRequest request, Channel channel, T
474474
final int resultSize = result.readableBytes();
475475
channel.writeAndFlush(result).addListener(future -> {
476476
if (future.isSuccess()) {
477-
requestStats.getNetworkTotalBytesOut().add(resultSize);
477+
requestStats.getNetworkTotalBytesOut().addCount(resultSize);
478478
}
479479
});
480480
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/DecodeResult.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void updateConsumerStats(final TopicPartition topicPartition,
9999

100100
final StatsLogger statsLoggerForThisPartition = statsLogger.getStatsLoggerForTopicPartition(topicPartition);
101101

102-
statsLoggerForThisPartition.getCounter(CONSUME_MESSAGE_CONVERSIONS).add(conversionCount);
102+
statsLoggerForThisPartition.getCounter(CONSUME_MESSAGE_CONVERSIONS).addCount(conversionCount);
103103
statsLoggerForThisPartition.getOpStatsLogger(CONSUME_MESSAGE_CONVERSIONS_TIME_NANOS)
104104
.registerSuccessfulEvent(conversionTimeNanos, TimeUnit.NANOSECONDS);
105105
final StatsLogger statsLoggerForThisGroup;
@@ -108,9 +108,9 @@ public void updateConsumerStats(final TopicPartition topicPartition,
108108
} else {
109109
statsLoggerForThisGroup = statsLoggerForThisPartition;
110110
}
111-
statsLoggerForThisGroup.getCounter(BYTES_OUT).add(records.sizeInBytes());
112-
statsLoggerForThisGroup.getCounter(MESSAGE_OUT).add(numMessages);
113-
statsLoggerForThisGroup.getCounter(ENTRIES_OUT).add(entrySize);
111+
statsLoggerForThisGroup.getCounter(BYTES_OUT).addCount(records.sizeInBytes());
112+
statsLoggerForThisGroup.getCounter(MESSAGE_OUT).addCount(numMessages);
113+
statsLoggerForThisGroup.getCounter(ENTRIES_OUT).addCount(entrySize);
114114

115115
}
116116

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/EncodeResult.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ public void updateProducerStats(final TopicPartition topicPartition,
8989

9090
final StatsLogger statsLoggerForThisPartition = requestStats.getStatsLoggerForTopicPartition(topicPartition);
9191

92-
statsLoggerForThisPartition.getCounter(BYTES_IN).add(numBytes);
93-
statsLoggerForThisPartition.getCounter(MESSAGE_IN).add(numMessages);
94-
statsLoggerForThisPartition.getCounter(PRODUCE_MESSAGE_CONVERSIONS).add(conversionCount);
92+
statsLoggerForThisPartition.getCounter(BYTES_IN).addCount(numBytes);
93+
statsLoggerForThisPartition.getCounter(MESSAGE_IN).addCount(numMessages);
94+
statsLoggerForThisPartition.getCounter(PRODUCE_MESSAGE_CONVERSIONS).addCount(conversionCount);
9595
statsLoggerForThisPartition.getOpStatsLogger(PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS)
9696
.registerSuccessfulEvent(conversionTimeNanos, TimeUnit.NANOSECONDS);
9797

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/LongAdderCounter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.streamnative.pulsar.handlers.kop.stats;
1515

1616
import java.util.Map;
17+
import java.util.concurrent.TimeUnit;
1718
import java.util.concurrent.atomic.LongAdder;
1819
import org.apache.bookkeeper.stats.Counter;
1920

@@ -48,10 +49,15 @@ public void dec() {
4849
}
4950

5051
@Override
51-
public void add(long delta) {
52+
public void addCount(long delta) {
5253
counter.add(delta);
5354
}
5455

56+
@Override
57+
public void addLatency(long l, TimeUnit timeUnit) {
58+
// No-op
59+
}
60+
5561
@Override
5662
public Long get() {
5763
return counter.sum();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/stats/NullStatsLogger.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,12 @@ public void dec() {
8888
}
8989

9090
@Override
91-
public void add(long delta) {
91+
public void addCount(long delta) {
92+
// nop
93+
}
94+
95+
@Override
96+
public void addLatency(long l, TimeUnit timeUnit) {
9297
// nop
9398
}
9499

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaMessageOrderTestBase.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@
4040
import org.apache.kafka.common.serialization.StringSerializer;
4141
import org.apache.pulsar.client.api.Consumer;
4242
import org.apache.pulsar.client.api.Message;
43-
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
44-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
43+
import org.apache.pulsar.client.api.MessageIdAdv;
4544
import org.apache.pulsar.common.policies.data.RetentionPolicies;
4645
import org.apache.pulsar.common.util.FutureUtil;
4746
import org.testng.annotations.AfterClass;
@@ -158,8 +157,7 @@ public void testKafkaProduceMessageOrder(int batchSize) throws Exception {
158157

159158
consumer.acknowledge(msg);
160159

161-
BatchMessageIdImpl id =
162-
(BatchMessageIdImpl) ((TopicMessageIdImpl) msg.getMessageId()).getInnerMessageId();
160+
MessageIdAdv id = (MessageIdAdv) msg.getMessageId();
163161
if (id.getBatchIndex() == 0) {
164162
numBatches++;
165163
}

0 commit comments

Comments
 (0)