Skip to content

Commit 08eb0e6

Browse files
authored
feature: multi-version seata protocol support (#6226)
1 parent 699891d commit 08eb0e6

20 files changed

+734
-138
lines changed

core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class RpcMessage implements Serializable {
3535
private Map<String, String> headMap = new HashMap<>();
3636
private Object body;
3737

38+
private String otherSideVersion;
39+
3840
/**
3941
* Gets id.
4042
*
@@ -169,6 +171,14 @@ public void setMessageType(byte messageType) {
169171
this.messageType = messageType;
170172
}
171173

174+
public String getOtherSideVersion() {
175+
return otherSideVersion;
176+
}
177+
178+
public void setOtherSideVersion(String otherSideVersion) {
179+
this.otherSideVersion = otherSideVersion;
180+
}
181+
172182
@Override
173183
public String toString() {
174184
return StringUtils.toString(this);

core/src/main/java/org/apache/seata/core/protocol/Version.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,16 @@ public static long convertVersionNotThrowException(String version) {
150150
return -1;
151151
}
152152

153+
public static byte calcProtocolVersion(String sdkVersion) throws IncompatibleVersionException {
154+
long version = convertVersion(sdkVersion);
155+
long v0 = convertVersion(VERSION_0_7_1);
156+
if (version <= v0) {
157+
return ProtocolConstants.VERSION_0;
158+
} else {
159+
return ProtocolConstants.VERSION_1;
160+
}
161+
}
162+
153163
private static long calculatePartValue(String partNumeric, int size, int index) {
154164
return Long.parseLong(partNumeric) * Double.valueOf(Math.pow(100, size - index)).longValue();
155165
}

core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo
6969
if (channel == null) {
7070
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
7171
}
72-
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
72+
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
7373
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
7474
}
7575

@@ -78,7 +78,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti
7878
if (channel == null) {
7979
throw new RuntimeException("client is not connected");
8080
}
81-
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
81+
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
8282
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
8383
}
8484

@@ -87,26 +87,42 @@ public void sendAsyncRequest(Channel channel, Object msg) {
8787
if (channel == null) {
8888
throw new RuntimeException("client is not connected");
8989
}
90-
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
90+
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
9191
super.sendAsync(channel, rpcMessage);
9292
}
9393

9494
@Override
9595
public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {
96-
final Channel clientChannel = msg instanceof HeartbeatMessage
97-
? channel
98-
: ChannelManager.getSameClientChannel(channel);
99-
100-
if (clientChannel == null) {
101-
throw new RuntimeException("Not found client channel to response | channel: " + channel);
96+
Channel clientChannel = channel;
97+
if (!(msg instanceof HeartbeatMessage)) {
98+
clientChannel = ChannelManager.getSameClientChannel(channel);
10299
}
103-
104-
RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
100+
if (clientChannel != null) {
101+
RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, msg instanceof HeartbeatMessage
105102
? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
106103
: ProtocolConstants.MSGTYPE_RESPONSE);
107-
super.sendAsync(clientChannel, rpcMsg);
104+
super.sendAsync(clientChannel, rpcMsg);
105+
} else {
106+
throw new RuntimeException("channel is error.");
107+
}
108108
}
109109

110+
111+
private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) {
112+
RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, msg, messageType);
113+
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
114+
rpcMessage.setOtherSideVersion(rpcContext.getVersion());
115+
return rpcMessage;
116+
}
117+
118+
protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) {
119+
RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType);
120+
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
121+
rpcMessage.setOtherSideVersion(rpcContext.getVersion());
122+
return rpcMessage;
123+
}
124+
125+
110126
@Override
111127
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
112128
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.rpc.netty;
18+
19+
import com.google.common.collect.ImmutableMap;
20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
23+
import org.apache.seata.core.exception.DecodeException;
24+
import org.apache.seata.core.protocol.ProtocolConstants;
25+
import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0;
26+
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.Map;
31+
32+
/**
33+
* <pre>
34+
* (> 0.7.0)
35+
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
36+
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
37+
* | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId |
38+
* | code |colVer| (head+body) | Length |Type |lizer|ess | |
39+
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
40+
*
41+
* (<= 0.7.0)
42+
* 0 1 2 3 4 6 8 10 12 14
43+
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
44+
* | 0xdada | flag | typecode/ | requestid |
45+
* | | | bodylength| |
46+
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+
47+
*
48+
* </pre>
49+
* <p>
50+
* <li>Full Length: include all data </li>
51+
* <li>Head Length: include head data from magic code to head map. </li>
52+
* <li>Body Length: Full Length - Head Length</li>
53+
* </p>
54+
*/
55+
public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder {
56+
57+
private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class);
58+
private static Map<Byte, ProtocolDecoder> protocolDecoderMap;
59+
60+
public CompatibleProtocolDecoder() {
61+
// default is 8M
62+
this(ProtocolConstants.MAX_FRAME_LENGTH);
63+
}
64+
65+
public CompatibleProtocolDecoder(int maxFrameLength) {
66+
/*
67+
int maxFrameLength,
68+
int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3
69+
int lengthFieldLength, FullLength is int(4B). so values is 4
70+
int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7
71+
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
72+
*/
73+
super(maxFrameLength, 3, 4, -7, 0);
74+
protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
75+
.put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
76+
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
77+
.build();
78+
}
79+
80+
@Override
81+
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
82+
ByteBuf frame;
83+
Object decoded;
84+
byte version;
85+
try {
86+
if (isV0(in)) {
87+
decoded = in;
88+
version = ProtocolConstants.VERSION_0;
89+
} else {
90+
decoded = super.decode(ctx, in);
91+
version = decideVersion(decoded);
92+
}
93+
94+
if (decoded instanceof ByteBuf) {
95+
frame = (ByteBuf) decoded;
96+
try {
97+
ProtocolDecoder decoder = protocolDecoderMap.get(version);
98+
if (decoder == null) {
99+
throw new UnsupportedOperationException("Unsupported version: " + version);
100+
}
101+
return decoder.decodeFrame(frame);
102+
} finally {
103+
if (version != ProtocolConstants.VERSION_0) {
104+
frame.release();
105+
}
106+
}
107+
}
108+
} catch (Exception exx) {
109+
LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
110+
throw new DecodeException(exx);
111+
}
112+
return decoded;
113+
}
114+
115+
protected byte decideVersion(Object in) {
116+
if (in instanceof ByteBuf) {
117+
ByteBuf frame = (ByteBuf) in;
118+
frame.markReaderIndex();
119+
byte b0 = frame.readByte();
120+
byte b1 = frame.readByte();
121+
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
122+
|| ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
123+
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
124+
}
125+
126+
byte version = frame.readByte();
127+
frame.resetReaderIndex();
128+
return version;
129+
}
130+
return -1;
131+
}
132+
133+
134+
protected boolean isV0(ByteBuf in) {
135+
boolean isV0 = false;
136+
in.markReaderIndex();
137+
byte b0 = in.readByte();
138+
byte b1 = in.readByte();
139+
// v1/v2/v3 : b2 = version
140+
// v0 : 1st byte in FLAG(2byte:0x10/0x20/0x40/0x80)
141+
byte b2 = in.readByte();
142+
if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0
143+
&& ProtocolConstants.MAGIC_CODE_BYTES[1] == b1
144+
&& 0 == b2) {
145+
isV0 = true;
146+
}
147+
148+
in.resetReaderIndex();
149+
return isV0;
150+
}
151+
152+
protected boolean isV0(byte version) {
153+
return version == ProtocolConstants.VERSION_0;
154+
}
155+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.rpc.netty;
18+
19+
import com.google.common.collect.ImmutableMap;
20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.handler.codec.MessageToByteEncoder;
23+
import org.apache.seata.common.util.StringUtils;
24+
import org.apache.seata.core.protocol.ProtocolConstants;
25+
import org.apache.seata.core.protocol.RpcMessage;
26+
import org.apache.seata.core.protocol.Version;
27+
import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
28+
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.util.Map;
33+
34+
/**
35+
* Compatible Protocol Encoder
36+
* <p>
37+
* <li>Full Length: include all data </li>
38+
* <li>Head Length: include head data from magic code to head map. </li>
39+
* <li>Body Length: Full Length - Head Length</li>
40+
* </p>
41+
*/
42+
public class CompatibleProtocolEncoder extends MessageToByteEncoder {
43+
44+
private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolEncoder.class);
45+
46+
private static Map<Byte, ProtocolEncoder> protocolEncoderMap;
47+
48+
public CompatibleProtocolEncoder() {
49+
super();
50+
protocolEncoderMap = ImmutableMap.<Byte, ProtocolEncoder>builder()
51+
.put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
52+
.put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1())
53+
.build();
54+
}
55+
56+
@Override
57+
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
58+
try {
59+
if (msg instanceof RpcMessage) {
60+
RpcMessage rpcMessage = (RpcMessage) msg;
61+
String sdkVersion = rpcMessage.getOtherSideVersion();
62+
if (StringUtils.isBlank(sdkVersion)) {
63+
sdkVersion = Version.getCurrent();
64+
}
65+
byte protocolVersion = Version.calcProtocolVersion(sdkVersion);
66+
ProtocolEncoder encoder = protocolEncoderMap.get(protocolVersion);
67+
if (encoder == null) {
68+
throw new UnsupportedOperationException("Unsupported protocolVersion: " + protocolVersion);
69+
}
70+
71+
encoder.encode(rpcMessage, out);
72+
} else {
73+
throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
74+
}
75+
} catch (Throwable e) {
76+
LOGGER.error("Encode request error!", e);
77+
}
78+
}
79+
}

core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import org.apache.seata.common.exception.FrameworkException;
3636
import org.apache.seata.common.thread.NamedThreadFactory;
3737
import org.apache.seata.core.rpc.RemotingBootstrap;
38-
import org.apache.seata.core.rpc.netty.v1.ProtocolV1Decoder;
39-
import org.apache.seata.core.rpc.netty.v1.ProtocolV1Encoder;
4038
import org.slf4j.Logger;
4139
import org.slf4j.LoggerFactory;
4240

@@ -134,8 +132,8 @@ public void initChannel(SocketChannel ch) {
134132
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
135133
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
136134
nettyClientConfig.getChannelMaxAllIdleSeconds()))
137-
.addLast(new ProtocolV1Decoder())
138-
.addLast(new ProtocolV1Encoder());
135+
.addLast(new CompatibleProtocolDecoder())
136+
.addLast(new CompatibleProtocolEncoder());
139137
if (channelHandlers != null) {
140138
addChannelPipelineLast(ch, channelHandlers);
141139
}

core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import org.apache.seata.common.thread.NamedThreadFactory;
3838
import org.apache.seata.config.ConfigurationFactory;
3939
import org.apache.seata.core.rpc.RemotingBootstrap;
40-
import org.apache.seata.core.rpc.netty.v1.ProtocolV1Decoder;
41-
import org.apache.seata.core.rpc.netty.v1.ProtocolV1Encoder;
4240
import org.apache.seata.discovery.registry.MultiRegistryFactory;
4341
import org.apache.seata.discovery.registry.RegistryService;
4442
import org.slf4j.Logger;
@@ -161,8 +159,8 @@ public void start() {
161159
@Override
162160
public void initChannel(SocketChannel ch) {
163161
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
164-
.addLast(new ProtocolV1Decoder())
165-
.addLast(new ProtocolV1Encoder());
162+
.addLast(new CompatibleProtocolDecoder())
163+
.addLast(new CompatibleProtocolEncoder());
166164
if (channelHandlers != null) {
167165
addChannelPipelineLast(ch, channelHandlers);
168166
}

0 commit comments

Comments
 (0)