|
16 | 16 | */ |
17 | 17 | package io.seata.core.rpc.netty.v1; |
18 | 18 |
|
| 19 | +import java.util.Map; |
| 20 | + |
19 | 21 | import io.netty.buffer.ByteBuf; |
20 | 22 | import io.netty.channel.ChannelHandlerContext; |
21 | 23 | import io.netty.handler.codec.LengthFieldBasedFrameDecoder; |
22 | | -import io.seata.core.exception.DecodeException; |
23 | | -import io.seata.core.serializer.Serializer; |
| 24 | +import io.seata.config.Configuration; |
| 25 | +import io.seata.config.ConfigurationFactory; |
24 | 26 | import io.seata.core.compressor.Compressor; |
25 | 27 | import io.seata.core.compressor.CompressorFactory; |
| 28 | +import io.seata.core.constants.ConfigurationKeys; |
| 29 | +import io.seata.core.exception.DecodeException; |
26 | 30 | import io.seata.core.protocol.HeartbeatMessage; |
27 | 31 | import io.seata.core.protocol.ProtocolConstants; |
28 | 32 | import io.seata.core.protocol.RpcMessage; |
| 33 | +import io.seata.core.serializer.Serializer; |
29 | 34 | import io.seata.core.serializer.SerializerServiceLoader; |
30 | 35 | import io.seata.core.serializer.SerializerType; |
31 | 36 | import org.slf4j.Logger; |
32 | 37 | import org.slf4j.LoggerFactory; |
33 | 38 |
|
34 | | -import java.util.Map; |
35 | | - |
36 | 39 | /** |
37 | 40 | * <pre> |
38 | 41 | * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
62 | 65 | public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder { |
63 | 66 |
|
64 | 67 | private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Decoder.class); |
| 68 | + private static final Configuration CONFIG = ConfigurationFactory.getInstance(); |
| 69 | + private SerializerType serializerType; |
65 | 70 |
|
66 | 71 | public ProtocolV1Decoder() { |
67 | 72 | // default is 8M |
68 | 73 | this(ProtocolConstants.MAX_FRAME_LENGTH); |
| 74 | + String serializerName = CONFIG.getConfig(ConfigurationKeys.SERIALIZE_FOR_RPC, SerializerType.SEATA.name()); |
| 75 | + this.serializerType = SerializerType.getByName(serializerName); |
69 | 76 | } |
70 | 77 |
|
71 | 78 | public ProtocolV1Decoder(int maxFrameLength) { |
@@ -142,8 +149,13 @@ public Object decodeFrame(ByteBuf frame) { |
142 | 149 | frame.readBytes(bs); |
143 | 150 | Compressor compressor = CompressorFactory.getCompressor(compressorType); |
144 | 151 | bs = compressor.decompress(bs); |
145 | | - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec())); |
146 | | - rpcMessage.setBody(serializer.deserialize(bs)); |
| 152 | + SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec()); |
| 153 | + if (this.serializerType.equals(protocolType)) { |
| 154 | + Serializer serializer = SerializerServiceLoader.load(protocolType); |
| 155 | + rpcMessage.setBody(serializer.deserialize(bs)); |
| 156 | + } else { |
| 157 | + throw new IllegalArgumentException("SerializerType not match"); |
| 158 | + } |
147 | 159 | } |
148 | 160 | } |
149 | 161 |
|
|
0 commit comments