Skip to content

Commit f8f6fbe

Browse files
authored
[ISSUE #1637]Fix 1637 (#1644)
* fix(broker): add the check logic of the server to the topic * chore(test):add unit test * chore(validator):polish the code * chore(test):add ASF license header
1 parent 3d778ef commit f8f6fbe

File tree

9 files changed

+178
-34
lines changed

9 files changed

+178
-34
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19+
import io.netty.channel.ChannelHandlerContext;
1920
import java.net.InetSocketAddress;
2021
import java.net.SocketAddress;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Random;
24-
25-
import io.netty.channel.ChannelHandlerContext;
2625
import org.apache.rocketmq.broker.BrokerController;
2726
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
2827
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
28+
import org.apache.rocketmq.broker.topic.TopicValidator;
2929
import org.apache.rocketmq.common.MixAll;
3030
import org.apache.rocketmq.common.TopicConfig;
3131
import org.apache.rocketmq.common.TopicFilterType;
@@ -171,11 +171,8 @@ protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
171171
+ "] sending message is forbidden");
172172
return response;
173173
}
174-
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
175-
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
176-
log.warn(errorMsg);
177-
response.setCode(ResponseCode.SYSTEM_ERROR);
178-
response.setRemark(errorMsg);
174+
175+
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
179176
return response;
180177
}
181178

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,18 @@
3838
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
3939
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
4040
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
41+
import org.apache.rocketmq.broker.topic.TopicValidator;
42+
import org.apache.rocketmq.common.AclConfig;
4143
import org.apache.rocketmq.common.MQVersion;
4244
import org.apache.rocketmq.common.MixAll;
4345
import org.apache.rocketmq.common.PlainAccessConfig;
4446
import org.apache.rocketmq.common.TopicConfig;
45-
import org.apache.rocketmq.common.AclConfig;
4647
import org.apache.rocketmq.common.UtilAll;
4748
import org.apache.rocketmq.common.admin.ConsumeStats;
4849
import org.apache.rocketmq.common.admin.OffsetWrapper;
4950
import org.apache.rocketmq.common.admin.TopicOffset;
5051
import org.apache.rocketmq.common.admin.TopicStatsTable;
5152
import org.apache.rocketmq.common.constant.LoggerName;
52-
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
53-
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
54-
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
55-
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
56-
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
57-
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
58-
import org.apache.rocketmq.logging.InternalLogger;
59-
import org.apache.rocketmq.logging.InternalLoggerFactory;
6053
import org.apache.rocketmq.common.message.MessageAccessor;
6154
import org.apache.rocketmq.common.message.MessageConst;
6255
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -84,10 +77,15 @@
8477
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
8578
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
8679
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
80+
import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
8781
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
82+
import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
8883
import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
8984
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
9085
import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
86+
import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
87+
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
88+
import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
9189
import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
9290
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
9391
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
@@ -110,6 +108,7 @@
110108
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
111109
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
112110
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
111+
import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
113112
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
114113
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
115114
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
@@ -118,6 +117,8 @@
118117
import org.apache.rocketmq.common.stats.StatsSnapshot;
119118
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
120119
import org.apache.rocketmq.filter.util.BitsArray;
120+
import org.apache.rocketmq.logging.InternalLogger;
121+
import org.apache.rocketmq.logging.InternalLoggerFactory;
121122
import org.apache.rocketmq.remoting.common.RemotingHelper;
122123
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
123124
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -258,6 +259,10 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
258259
return response;
259260
}
260261

262+
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
263+
return response;
264+
}
265+
261266
try {
262267
response.setCode(ResponseCode.SUCCESS);
263268
response.setOpaque(request.getOpaque());
@@ -312,8 +317,8 @@ private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerC
312317
accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress());
313318
accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm());
314319
accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm());
315-
accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),","));
316-
accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),","));
320+
accessConfig.setTopicPerms(UtilAll.string2List(requestHeader.getTopicPerms(), ","));
321+
accessConfig.setGroupPerms(UtilAll.string2List(requestHeader.getGroupPerms(), ","));
317322
accessConfig.setAdmin(requestHeader.isAdmin());
318323
try {
319324

@@ -386,7 +391,7 @@ private synchronized RemotingCommand updateGlobalWhiteAddrsConfig(ChannelHandler
386391

387392
try {
388393
AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
389-
if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) {
394+
if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.string2List(requestHeader.getGlobalWhiteAddrs(), ","))) {
390395
response.setCode(ResponseCode.SUCCESS);
391396
response.setOpaque(request.getOpaque());
392397
response.markResponseType();

broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,6 @@ public Set<String> getSystemTopic() {
152152
return this.systemTopicList;
153153
}
154154

155-
public boolean isTopicCanSendMessage(final String topic) {
156-
return !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
157-
}
158-
159155
public TopicConfig selectTopicConfig(final String topic) {
160156
return this.topicConfigTable.get(topic);
161157
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker.topic;
18+
19+
import java.util.regex.Matcher;
20+
import java.util.regex.Pattern;
21+
import org.apache.rocketmq.common.MixAll;
22+
import org.apache.rocketmq.common.UtilAll;
23+
import org.apache.rocketmq.common.protocol.ResponseCode;
24+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
25+
26+
public class TopicValidator {
27+
28+
private static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
29+
private static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
30+
private static final int CHARACTER_MAX_LENGTH = 255;
31+
32+
private static boolean regularExpressionMatcher(String origin, Pattern pattern) {
33+
if (pattern == null) {
34+
return true;
35+
}
36+
Matcher matcher = pattern.matcher(origin);
37+
return matcher.matches();
38+
}
39+
40+
public static boolean validateTopic(String topic, RemotingCommand response) {
41+
42+
if (UtilAll.isBlank(topic)) {
43+
response.setCode(ResponseCode.SYSTEM_ERROR);
44+
response.setRemark("The specified topic is blank.");
45+
return false;
46+
}
47+
48+
if (!regularExpressionMatcher(topic, PATTERN)) {
49+
response.setCode(ResponseCode.SYSTEM_ERROR);
50+
response.setRemark("The specified topic contains illegal characters, allowing only " + VALID_PATTERN_STR);
51+
return false;
52+
}
53+
54+
if (topic.length() > CHARACTER_MAX_LENGTH) {
55+
response.setCode(ResponseCode.SYSTEM_ERROR);
56+
response.setRemark("The specified topic is longer than topic max length 255.");
57+
return false;
58+
}
59+
60+
//whether the same with system reserved keyword
61+
if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
62+
response.setCode(ResponseCode.SYSTEM_ERROR);
63+
response.setRemark("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.");
64+
return false;
65+
}
66+
67+
return true;
68+
}
69+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.broker.topic;
18+
19+
import org.apache.rocketmq.common.MixAll;
20+
import org.apache.rocketmq.common.protocol.ResponseCode;
21+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
22+
import org.junit.Test;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
public class TopicValidatorTest {
27+
28+
@Test
29+
public void testTopicValidator_NotPass() {
30+
RemotingCommand response = RemotingCommand.createResponseCommand(-1, "");
31+
32+
Boolean res = TopicValidator.validateTopic("", response);
33+
assertThat(res).isFalse();
34+
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
35+
assertThat(response.getRemark()).contains("The specified topic is blank");
36+
37+
clearResponse(response);
38+
res = TopicValidator.validateTopic("../TopicTest", response);
39+
assertThat(res).isFalse();
40+
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
41+
assertThat(response.getRemark()).contains("The specified topic contains illegal characters");
42+
43+
clearResponse(response);
44+
res = TopicValidator.validateTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, response);
45+
assertThat(res).isFalse();
46+
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
47+
assertThat(response.getRemark()).contains("The specified topic is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.");
48+
49+
clearResponse(response);
50+
res = TopicValidator.validateTopic(generateString(255), response);
51+
assertThat(res).isFalse();
52+
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
53+
assertThat(response.getRemark()).contains("The specified topic is longer than topic max length 255.");
54+
55+
}
56+
57+
@Test
58+
public void testTopicValidator_Pass() {
59+
RemotingCommand response = RemotingCommand.createResponseCommand(-1, "");
60+
61+
Boolean res = TopicValidator.validateTopic("TestTopic", response);
62+
assertThat(res).isTrue();
63+
assertThat(response.getCode()).isEqualTo(-1);
64+
assertThat(response.getRemark()).isEmpty();
65+
}
66+
67+
private static void clearResponse(RemotingCommand response) {
68+
response.setCode(-1);
69+
response.setRemark("");
70+
}
71+
72+
private static String generateString(int length) {
73+
StringBuilder stringBuffer = new StringBuilder();
74+
String tmpStr = "0123456789";
75+
for (int i = 0; i < length; i++) {
76+
stringBuffer.append(tmpStr);
77+
}
78+
return stringBuffer.toString();
79+
}
80+
}

client/src/main/java/org/apache/rocketmq/client/Validators.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
7777
return matcher.matches();
7878
}
7979

80-
/**
81-
* Validate message
82-
*/
8380
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
8481
throws MQClientException {
8582
if (null == msg) {
@@ -103,9 +100,6 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer
103100
}
104101
}
105102

106-
/**
107-
* Validate topic
108-
*/
109103
public static void checkTopic(String topic) throws MQClientException {
110104
if (UtilAll.isBlank(topic)) {
111105
throw new MQClientException("The specified topic is blank", null);
@@ -127,4 +121,5 @@ public static void checkTopic(String topic) throws MQClientException {
127121
String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null);
128122
}
129123
}
124+
130125
}

client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.locks.ReentrantReadWriteLock;
2929

3030
import org.apache.rocketmq.client.QueryResult;
31+
import org.apache.rocketmq.client.Validators;
3132
import org.apache.rocketmq.client.exception.MQBrokerException;
3233
import org.apache.rocketmq.client.exception.MQClientException;
3334
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -80,6 +81,7 @@ public void createTopic(String key, String newTopic, int queueNum) throws MQClie
8081

8182
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
8283
try {
84+
Validators.checkTopic(newTopic);
8385
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
8486
List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
8587
if (brokerDataList != null && !brokerDataList.isEmpty()) {

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@
4343
import org.apache.rocketmq.client.producer.SendCallback;
4444
import org.apache.rocketmq.client.producer.SendResult;
4545
import org.apache.rocketmq.client.producer.SendStatus;
46+
import org.apache.rocketmq.common.AclConfig;
4647
import org.apache.rocketmq.common.DataVersion;
4748
import org.apache.rocketmq.common.MQVersion;
4849
import org.apache.rocketmq.common.MixAll;
4950
import org.apache.rocketmq.common.PlainAccessConfig;
5051
import org.apache.rocketmq.common.TopicConfig;
51-
import org.apache.rocketmq.common.AclConfig;
5252
import org.apache.rocketmq.common.UtilAll;
5353
import org.apache.rocketmq.common.admin.ConsumeStats;
5454
import org.apache.rocketmq.common.admin.TopicStatsTable;
@@ -305,8 +305,8 @@ public void createPlainAccessConfig(final String addr, final PlainAccessConfig p
305305
requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
306306
requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
307307
requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
308-
requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ","));
309-
requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ","));
308+
requestHeader.setTopicPerms(UtilAll.list2String(plainAccessConfig.getTopicPerms(), ","));
309+
requestHeader.setGroupPerms(UtilAll.list2String(plainAccessConfig.getGroupPerms(), ","));
310310

311311
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
312312

common/src/main/java/org/apache/rocketmq/common/UtilAll.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ public static void deleteFile(File file) {
580580
}
581581
}
582582

583-
public static String List2String(List<String> list, String splitor) {
583+
public static String list2String(List<String> list, String splitor) {
584584
if (list == null || list.size() == 0) {
585585
return null;
586586
}
@@ -595,7 +595,7 @@ public static String List2String(List<String> list, String splitor) {
595595
return str.toString();
596596
}
597597

598-
public static List<String> String2List(String str, String splitor) {
598+
public static List<String> string2List(String str, String splitor) {
599599
if (StringUtils.isEmpty(str)) {
600600
return null;
601601
}

0 commit comments

Comments
 (0)