Skip to content

Commit d55bc00

Browse files
authored
[Issue 4803][client] return null if the message value/data is not set by producer (#6379)
Fixes #4803 ### Motivation Allow the typed consumer receive messages with `null` value if the producer sends message without payload. ### Modifications - add a flag in `MessageMetadata` to indicate if the payload is set when the message is created - check and return `null` if the flag is not set when reading data from a message
1 parent 1d5c418 commit d55bc00

File tree

15 files changed

+370
-31
lines changed

15 files changed

+370
-31
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,6 +1930,9 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
19301930
if (metadata.hasNumMessagesInBatch()) {
19311931
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
19321932
}
1933+
if (metadata.hasNullValue()) {
1934+
responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue());
1935+
}
19331936

19341937
// Decode if needed
19351938
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,17 @@ public void persistentTopics(String topicName) throws Exception {
793793
topicStats = admin.topics().getStats(persistentTopicName);
794794
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);
795795

796+
publishNullValueMessageOnPersistentTopic(persistentTopicName, 10);
797+
topicStats = admin.topics().getStats(persistentTopicName);
798+
assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
799+
messages = admin.topics().peekMessages(persistentTopicName, subName, 10);
800+
assertEquals(messages.size(), 10);
801+
for (int i = 0; i < 10; i++) {
802+
assertNull(messages.get(i).getData());
803+
assertNull(messages.get(i).getValue());
804+
}
805+
admin.topics().skipAllMessages(persistentTopicName, subName);
806+
796807
consumer.close();
797808
client.close();
798809

@@ -1559,19 +1570,28 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception {
15591570
long secondTimestamp = System.currentTimeMillis();
15601571

15611572
private void publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
1562-
publishMessagesOnPersistentTopic(topicName, messages, 0);
1573+
publishMessagesOnPersistentTopic(topicName, messages, 0, false);
15631574
}
15641575

1565-
private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
1576+
private void publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
1577+
publishMessagesOnPersistentTopic(topicName, messages, 0, true);
1578+
}
1579+
1580+
private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
1581+
boolean nullValue) throws Exception {
15661582
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
15671583
.topic(topicName)
15681584
.enableBatching(false)
15691585
.messageRoutingMode(MessageRoutingMode.SinglePartition)
15701586
.create();
15711587

15721588
for (int i = startIdx; i < (messages + startIdx); i++) {
1573-
String message = "message-" + i;
1574-
producer.send(message.getBytes());
1589+
if (nullValue) {
1590+
producer.send(null);
1591+
} else {
1592+
String message = "message-" + i;
1593+
producer.send(message.getBytes());
1594+
}
15751595
}
15761596

15771597
producer.close();
@@ -1704,13 +1724,13 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {
17041724

17051725
assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));
17061726

1707-
publishMessagesOnPersistentTopic(topicName, 5, 0);
1727+
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
17081728

17091729
// Allow at least 1ms for messages to have different timestamps
17101730
Thread.sleep(1);
17111731
long messageTimestamp = System.currentTimeMillis();
17121732

1713-
publishMessagesOnPersistentTopic(topicName, 5, 5);
1733+
publishMessagesOnPersistentTopic(topicName, 5, 5, false);
17141734

17151735
List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
17161736
assertEquals(messages.size(), 10);
@@ -1757,17 +1777,17 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep
17571777

17581778
assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));
17591779

1760-
publishMessagesOnPersistentTopic(topicName, 5, 0);
1780+
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
17611781

17621782
// Allow at least 1ms for messages to have different timestamps
17631783
Thread.sleep(1);
17641784
long firstTimestamp = System.currentTimeMillis();
1765-
publishMessagesOnPersistentTopic(topicName, 3, 5);
1785+
publishMessagesOnPersistentTopic(topicName, 3, 5, false);
17661786

17671787
Thread.sleep(1);
17681788
long secondTimestamp = System.currentTimeMillis();
17691789

1770-
publishMessagesOnPersistentTopic(topicName, 2, 8);
1790+
publishMessagesOnPersistentTopic(topicName, 2, 8, false);
17711791

17721792
List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
17731793
assertEquals(messages.size(), 10);
@@ -1829,13 +1849,13 @@ public void persistentTopicsCursorResetAndFailover() throws Exception {
18291849
.consumerName("consumerA").subscriptionType(SubscriptionType.Failover)
18301850
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
18311851

1832-
publishMessagesOnPersistentTopic(topicName, 5, 0);
1852+
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
18331853

18341854
// Allow at least 1ms for messages to have different timestamps
18351855
Thread.sleep(1);
18361856
long messageTimestamp = System.currentTimeMillis();
18371857

1838-
publishMessagesOnPersistentTopic(topicName, 5, 5);
1858+
publishMessagesOnPersistentTopic(topicName, 5, 5, false);
18391859

18401860
// Currently the active consumer is consumerA
18411861
for (int i = 0; i < 10; i++) {
@@ -1866,7 +1886,7 @@ public void persistentTopicsCursorResetAndFailover() throws Exception {
18661886
// Closing consumerA activates consumerB
18671887
consumerA.close();
18681888

1869-
publishMessagesOnPersistentTopic(topicName, 5, 10);
1889+
publishMessagesOnPersistentTopic(topicName, 5, 10, false);
18701890

18711891
int receivedAfterFailover = 0;
18721892
for (int i = 10; i < 15; i++) {
@@ -1901,11 +1921,11 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {
19011921

19021922
assertEquals(admin.topics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));
19031923

1904-
publishMessagesOnPersistentTopic(topicName, 5, 0);
1924+
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
19051925
Thread.sleep(1);
19061926

19071927
long timestamp = System.currentTimeMillis();
1908-
publishMessagesOnPersistentTopic(topicName, 5, 5);
1928+
publishMessagesOnPersistentTopic(topicName, 5, 5, false);
19091929

19101930
for (int i = 0; i < 10; i++) {
19111931
Message<byte[]> message = consumer.receive();
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import lombok.Cleanup;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.pulsar.client.api.Consumer;
25+
import org.apache.pulsar.client.api.Message;
26+
import org.apache.pulsar.client.api.Producer;
27+
import org.apache.pulsar.client.api.PulsarClientException;
28+
import org.apache.pulsar.client.api.Schema;
29+
import org.testng.Assert;
30+
import org.testng.annotations.AfterMethod;
31+
import org.testng.annotations.BeforeMethod;
32+
import org.testng.annotations.Test;
33+
34+
/**
35+
* Null value message produce and consume test.
36+
*/
37+
@Slf4j
38+
public class NullValueTest extends BrokerTestBase {
39+
40+
@BeforeMethod
41+
@Override
42+
protected void setup() throws Exception {
43+
super.baseSetup();
44+
}
45+
46+
@AfterMethod
47+
@Override
48+
protected void cleanup() throws Exception {
49+
super.internalCleanup();
50+
}
51+
52+
@Test
53+
public void nullValueBytesSchemaTest() throws PulsarClientException {
54+
String topic = "persistent://prop/ns-abc/null-value-bytes-test";
55+
56+
@Cleanup
57+
Producer producer = pulsarClient.newProducer()
58+
.topic(topic)
59+
.create();
60+
61+
@Cleanup
62+
Consumer consumer = pulsarClient.newConsumer()
63+
.topic(topic)
64+
.subscriptionName("test")
65+
.subscribe();
66+
67+
int numMessage = 10;
68+
for (int i = 0; i < numMessage; i++) {
69+
if (i % 2 == 0) {
70+
producer.newMessage().value("not null".getBytes()).send();
71+
} else {
72+
producer.newMessage().value(null).send();
73+
}
74+
}
75+
76+
for (int i = 0; i < numMessage; i++) {
77+
Message message = consumer.receive();
78+
if (i % 2 == 0) {
79+
Assert.assertNotNull(message.getData());
80+
Assert.assertNotNull(message.getValue());
81+
Assert.assertEquals(new String(message.getData()), "not null");
82+
} else {
83+
Assert.assertNull(message.getData());
84+
Assert.assertNull(message.getValue());
85+
}
86+
consumer.acknowledge(message);
87+
}
88+
89+
for (int i = 0; i < numMessage; i++) {
90+
if (i % 2 == 0) {
91+
producer.newMessage().value("not null".getBytes()).sendAsync();
92+
} else {
93+
producer.newMessage().value(null).sendAsync();
94+
}
95+
}
96+
97+
for (int i = 0; i < numMessage; i++) {
98+
CompletableFuture<Message> completableFuture = consumer.receiveAsync();
99+
final int index = i;
100+
completableFuture.whenComplete((message, throwable) -> {
101+
Assert.assertNull(throwable);
102+
if (index % 2 == 0) {
103+
Assert.assertNotNull(message.getData());
104+
Assert.assertNotNull(message.getValue());
105+
Assert.assertEquals(new String(message.getData()), "not null");
106+
} else {
107+
Assert.assertNull(message.getData());
108+
Assert.assertNull(message.getValue());
109+
}
110+
try {
111+
consumer.acknowledge(message);
112+
} catch (PulsarClientException e) {
113+
Assert.assertNull(e);
114+
}
115+
});
116+
}
117+
118+
}
119+
120+
@Test
121+
public void nullValueBooleanSchemaTest() throws PulsarClientException {
122+
String topic = "persistent://prop/ns-abc/null-value-bool-test";
123+
124+
@Cleanup
125+
Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL)
126+
.topic(topic)
127+
.create();
128+
129+
@Cleanup
130+
Consumer<Boolean> consumer = pulsarClient.newConsumer(Schema.BOOL)
131+
.topic(topic)
132+
.subscriptionName("test")
133+
.subscribe();
134+
135+
int numMessage = 10;
136+
for (int i = 0; i < numMessage; i++) {
137+
producer.newMessage().value(null).sendAsync();
138+
}
139+
140+
for (int i = 0; i < numMessage; i++) {
141+
Message<Boolean> message = consumer.receive();
142+
Assert.assertNull(message.getValue());
143+
Assert.assertNull(message.getData());
144+
}
145+
146+
}
147+
148+
}

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1288,6 +1288,7 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
12881288
}
12891289

12901290
String msgId = response.getHeaderString(MESSAGE_ID);
1291+
PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
12911292
try (InputStream stream = (InputStream) response.getEntity()) {
12921293
byte[] data = new byte[stream.available()];
12931294
stream.read(data);
@@ -1298,10 +1299,16 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
12981299
if (tmp != null) {
12991300
properties.put("publish-time", (String) tmp);
13001301
}
1302+
1303+
tmp = headers.getFirst("X-Pulsar-null-value");
1304+
if (tmp != null) {
1305+
messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
1306+
}
1307+
13011308
tmp = headers.getFirst(BATCH_HEADER);
13021309
if (response.getHeaderString(BATCH_HEADER) != null) {
13031310
properties.put(BATCH_HEADER, (String) tmp);
1304-
return getIndividualMsgsFromBatch(topic, msgId, data, properties);
1311+
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
13051312
}
13061313
for (Entry<String, List<Object>> entry : headers.entrySet()) {
13071314
String header = entry.getKey();
@@ -1312,12 +1319,12 @@ private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response
13121319
}
13131320

13141321
return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
1315-
Unpooled.wrappedBuffer(data), Schema.BYTES));
1322+
Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata));
13161323
}
13171324
}
13181325

13191326
private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
1320-
Map<String, String> properties) {
1327+
Map<String, String> properties, PulsarApi.MessageMetadata.Builder msgMetadataBuilder) {
13211328
List<Message<byte[]>> ret = new ArrayList<>();
13221329
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
13231330
ByteBuf buf = Unpooled.wrappedBuffer(data);
@@ -1334,7 +1341,8 @@ private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String ms
13341341
properties.put(entry.getKey(), entry.getValue());
13351342
}
13361343
}
1337-
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, Schema.BYTES));
1344+
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload,
1345+
Schema.BYTES, msgMetadataBuilder));
13381346
} catch (Exception ex) {
13391347
log.error("Exception occured while trying to get BatchMsgId: {}", batchMsgId, ex);
13401348
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,8 @@ protected boolean canEnqueueMessage(Message<T> message) {
557557
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
558558
if (canEnqueueMessage(message)) {
559559
incomingMessages.add(message);
560-
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.getData().length);
560+
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
561+
this, message.getData() == null ? 0 : message.getData().length);
561562
}
562563
return hasEnoughMessagesForBatchReceive();
563564
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1289,7 +1289,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
12891289
stats.updateNumMsgsReceived(msg);
12901290

12911291
trackMessage(msg);
1292-
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length);
1292+
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length);
12931293
}
12941294

12951295
protected void trackMessage(Message<?> msg) {

0 commit comments

Comments
 (0)