Skip to content

Commit b2ff764

Browse files
authored
optimize: select channel handles based on protocol versions (#6634)
1 parent 08eb0e6 commit b2ff764

18 files changed

+213
-198
lines changed

changes/en-us/2.x.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ Add changes here for all PR submitted to the 2.x branch.
33
<!-- Please add the `changes` to the following location(feature/bugfix/optimize/test) based on the type of PR -->
44

55
### feature:
6-
6+
- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support
77

88
### bugfix:
99
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager
@@ -16,8 +16,10 @@ Add changes here for all PR submitted to the 2.x branch.
1616
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses
1717
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version
1818
- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies
19+
- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] select channel handles based on protocol versions
1920
- [[#6523](https://github.com/apache/incubator-seata/pull/6523)] upgrade alibaba/druid version to 1.2.20
2021

22+
2123
### refactor:
2224
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response
2325

@@ -36,6 +38,7 @@ Thanks to these contributors for their code commits. Please report an unintended
3638
- [liuqiufeng](https://github.com/liuqiufeng)
3739
- [God-Gan](https://github.com/God-Gan)
3840
- [Bughue](https://github.com/Bughue)
41+
- [funky-eyes](https://github.com/funky-eyes)
3942
- [tanyaofei](https://github.com/tanyaofei)
4043

4144
Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

changes/zh-cn/2.x.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
<!-- 请根据PR的类型添加 `变更记录` 到以下对应位置(feature/bugfix/optimize/test) 下 -->
44

55
### feature:
6+
- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容
67

78
### bugfix:
89
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题
@@ -15,8 +16,10 @@
1516
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
1617
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化
1718
- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖
19+
- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] 根据协议版本指定channel handle
1820
- [[#6523](https://github.com/apache/incubator-seata/pull/6523)] 升级 alibaba/druid 的版本到1.2.20
1921

22+
2023
### refactor:
2124
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
2225

@@ -34,6 +37,7 @@
3437
- [liuqiufeng](https://github.com/liuqiufeng)
3538
- [God-Gan](https://github.com/God-Gan)
3639
- [Bughue](https://github.com/Bughue)
40+
- [funky-eyes](https://github.com/funky-eyes)
3741
- [tanyaofei](https://github.com/tanyaofei)
3842

3943

core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,8 @@ class ClientHandler extends ChannelDuplexHandler {
409409

410410
@Override
411411
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
412-
RpcMessage rpcMessage = null;
413-
if (msg instanceof ProtocolRpcMessage) {
414-
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
415-
processMessage(ctx, rpcMessage);
412+
if (msg instanceof RpcMessage) {
413+
processMessage(ctx, (RpcMessage)msg);
416414
} else {
417415
LOGGER.error("rpcMessage type error");
418416
}

core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo
6969
if (channel == null) {
7070
throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
7171
}
72-
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
72+
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
7373
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
7474
}
7575

@@ -78,7 +78,7 @@ public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutExcepti
7878
if (channel == null) {
7979
throw new RuntimeException("client is not connected");
8080
}
81-
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
81+
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
8282
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
8383
}
8484

@@ -87,7 +87,7 @@ public void sendAsyncRequest(Channel channel, Object msg) {
8787
if (channel == null) {
8888
throw new RuntimeException("client is not connected");
8989
}
90-
RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
90+
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
9191
super.sendAsync(channel, rpcMessage);
9292
}
9393

@@ -98,7 +98,7 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg
9898
clientChannel = ChannelManager.getSameClientChannel(channel);
9999
}
100100
if (clientChannel != null) {
101-
RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, msg instanceof HeartbeatMessage
101+
RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
102102
? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
103103
: ProtocolConstants.MSGTYPE_RESPONSE);
104104
super.sendAsync(clientChannel, rpcMsg);
@@ -108,21 +108,6 @@ public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg
108108
}
109109

110110

111-
private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) {
112-
RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, msg, messageType);
113-
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
114-
rpcMessage.setOtherSideVersion(rpcContext.getVersion());
115-
return rpcMessage;
116-
}
117-
118-
protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) {
119-
RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType);
120-
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
121-
rpcMessage.setOtherSideVersion(rpcContext.getVersion());
122-
return rpcMessage;
123-
}
124-
125-
126111
@Override
127112
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
128113
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
@@ -179,10 +164,8 @@ class ServerHandler extends ChannelDuplexHandler {
179164
*/
180165
@Override
181166
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
182-
RpcMessage rpcMessage = null;
183-
if (msg instanceof ProtocolRpcMessage) {
184-
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
185-
processMessage(ctx, rpcMessage);
167+
if (msg instanceof RpcMessage) {
168+
processMessage(ctx, (RpcMessage)msg);
186169
} else {
187170
LOGGER.error("rpcMessage type error");
188171
}

core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java

Lines changed: 0 additions & 79 deletions
This file was deleted.

core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java renamed to core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import com.google.common.collect.ImmutableMap;
2020
import io.netty.buffer.ByteBuf;
21+
import io.netty.channel.ChannelHandler;
2122
import io.netty.channel.ChannelHandlerContext;
2223
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
2324
import org.apache.seata.core.exception.DecodeException;
2425
import org.apache.seata.core.protocol.ProtocolConstants;
2526
import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0;
27+
import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
2628
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
29+
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
2730
import org.slf4j.Logger;
2831
import org.slf4j.LoggerFactory;
2932

@@ -52,17 +55,26 @@
5255
* <li>Body Length: Full Length - Head Length</li>
5356
* </p>
5457
*/
55-
public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder {
58+
public class MultiProtocolDecoder extends LengthFieldBasedFrameDecoder {
5659

57-
private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class);
58-
private static Map<Byte, ProtocolDecoder> protocolDecoderMap;
60+
private static final Logger LOGGER = LoggerFactory.getLogger(MultiProtocolDecoder.class);
61+
private final Map<Byte, ProtocolDecoder> protocolDecoderMap;
5962

60-
public CompatibleProtocolDecoder() {
63+
private final Map<Byte, ProtocolEncoder> protocolEncoderMap;
64+
65+
private final ChannelHandler[] channelHandlers;
66+
67+
public MultiProtocolDecoder(ChannelHandler... channelHandlers) {
6168
// default is 8M
62-
this(ProtocolConstants.MAX_FRAME_LENGTH);
69+
this(ProtocolConstants.MAX_FRAME_LENGTH, channelHandlers);
6370
}
6471

65-
public CompatibleProtocolDecoder(int maxFrameLength) {
72+
public MultiProtocolDecoder() {
73+
// default is 8M
74+
this(ProtocolConstants.MAX_FRAME_LENGTH, null);
75+
}
76+
77+
public MultiProtocolDecoder(int maxFrameLength, ChannelHandler[] channelHandlers) {
6678
/*
6779
int maxFrameLength,
6880
int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3
@@ -71,10 +83,13 @@ int lengthFieldLength, FullLength is int(4B). so values is 4
7183
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
7284
*/
7385
super(maxFrameLength, 3, 4, -7, 0);
74-
protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
75-
.put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
76-
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
77-
.build();
86+
this.protocolDecoderMap =
87+
ImmutableMap.<Byte, ProtocolDecoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
88+
.put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()).build();
89+
this.protocolEncoderMap =
90+
ImmutableMap.<Byte, ProtocolEncoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
91+
.put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()).build();
92+
this.channelHandlers = channelHandlers;
7893
}
7994

8095
@Override
@@ -93,16 +108,23 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
93108

94109
if (decoded instanceof ByteBuf) {
95110
frame = (ByteBuf) decoded;
111+
ProtocolDecoder decoder = protocolDecoderMap.get(version);
112+
ProtocolEncoder encoder = protocolEncoderMap.get(version);
96113
try {
97-
ProtocolDecoder decoder = protocolDecoderMap.get(version);
98-
if (decoder == null) {
114+
if (decoder == null || encoder == null) {
99115
throw new UnsupportedOperationException("Unsupported version: " + version);
100116
}
101117
return decoder.decodeFrame(frame);
102118
} finally {
103119
if (version != ProtocolConstants.VERSION_0) {
104120
frame.release();
105121
}
122+
ctx.pipeline().addLast((ChannelHandler)decoder);
123+
ctx.pipeline().addLast((ChannelHandler)encoder);
124+
if (channelHandlers != null) {
125+
ctx.pipeline().addLast(channelHandlers);
126+
}
127+
ctx.pipeline().remove(this);
106128
}
107129
}
108130
} catch (Exception exx) {

core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.seata.common.exception.FrameworkException;
3636
import org.apache.seata.common.thread.NamedThreadFactory;
3737
import org.apache.seata.core.rpc.RemotingBootstrap;
38+
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
39+
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
3840
import org.slf4j.Logger;
3941
import org.slf4j.LoggerFactory;
4042

@@ -128,12 +130,12 @@ public void start() {
128130
@Override
129131
public void initChannel(SocketChannel ch) {
130132
ChannelPipeline pipeline = ch.pipeline();
131-
pipeline.addLast(
132-
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
133+
pipeline
134+
.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
133135
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
134136
nettyClientConfig.getChannelMaxAllIdleSeconds()))
135-
.addLast(new CompatibleProtocolDecoder())
136-
.addLast(new CompatibleProtocolEncoder());
137+
.addLast(new ProtocolDecoderV1())
138+
.addLast(new ProtocolEncoderV1());
137139
if (channelHandlers != null) {
138140
addChannelPipelineLast(ch, channelHandlers);
139141
}

core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,10 @@ public void start() {
158158
.childHandler(new ChannelInitializer<SocketChannel>() {
159159
@Override
160160
public void initChannel(SocketChannel ch) {
161-
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
162-
.addLast(new CompatibleProtocolDecoder())
163-
.addLast(new CompatibleProtocolEncoder());
164-
if (channelHandlers != null) {
165-
addChannelPipelineLast(ch, channelHandlers);
166-
}
167-
161+
MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers);
162+
ch.pipeline()
163+
.addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
164+
.addLast(multiProtocolDecoder);
168165
}
169166
});
170167

core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
package org.apache.seata.core.rpc.netty;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import org.apache.seata.core.protocol.RpcMessage;
2021

2122
/**
2223
* the protocol decoder
2324
*
2425
**/
2526
public interface ProtocolDecoder {
2627

27-
ProtocolRpcMessage decodeFrame(ByteBuf in);
28+
RpcMessage decodeFrame(ByteBuf in);
2829

2930
}

0 commit comments

Comments
 (0)