From 13e96a561dda29f49d350daf1ad2d1930366a955 Mon Sep 17 00:00:00 2001 From: conghuhu Date: Mon, 21 Nov 2022 19:56:56 +0800 Subject: [PATCH] feat: decoupling WireProtocol by netty --- .../java/org/apache/dubbo/qos/pu/QosWireProtocol.java | 9 --------- .../apache/dubbo/remoting/api/AbstractWireProtocol.java | 4 ++-- .../java/org/apache/dubbo/remoting/api/WireProtocol.java | 3 +-- .../org/apache/dubbo/remoting/api/EmptyProtocol.java | 3 +-- .../remoting/transport/netty4/NettyConnectionClient.java | 5 ++++- .../remoting/transport/netty4/api/EmptyWireProtocol.java | 2 +- .../dubbo/rpc/protocol/tri/TripleHttp2Protocol.java | 8 ++++++-- .../apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java | 8 -------- 8 files changed, 15 insertions(+), 27 deletions(-) diff --git a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java index 71e6633eb316..bd4bc2db1e58 100644 --- a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java +++ b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/pu/QosWireProtocol.java @@ -27,9 +27,6 @@ import org.apache.dubbo.rpc.model.FrameworkModel; import org.apache.dubbo.rpc.model.ScopeModelAware; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.ssl.SslContext; - import java.util.ArrayList; import java.util.List; @@ -54,10 +51,4 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) { operator.configChannelHandler(handlers); } - - @Override - public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) { - - } - } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java index 278712bcacb3..621b5e45ddaf 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java @@ -17,8 +17,8 @@ package org.apache.dubbo.remoting.api; import org.apache.dubbo.common.URL; +import org.apache.dubbo.remoting.api.pu.ChannelOperator; -import io.netty.channel.ChannelPipeline; import io.netty.handler.ssl.SslContext; public abstract class AbstractWireProtocol implements WireProtocol { @@ -35,7 +35,7 @@ public ProtocolDetector detector() { } @Override - public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) { + public void configClientPipeline(URL url, ChannelOperator operator, SslContext sslContext) { } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java index fe615f82109c..f64b5fa6bd81 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/WireProtocol.java @@ -21,7 +21,6 @@ import org.apache.dubbo.common.extension.SPI; import org.apache.dubbo.remoting.api.pu.ChannelOperator; -import io.netty.channel.ChannelPipeline; import io.netty.handler.ssl.SslContext; @SPI(scope = ExtensionScope.FRAMEWORK) @@ -31,7 +30,7 @@ public interface WireProtocol { void configServerProtocolHandler(URL url, ChannelOperator operator); - void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext); + void configClientPipeline(URL url, ChannelOperator operator, SslContext sslContext); void close(); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java index 5c69d9b945c2..bfb87e77c1dd 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/api/EmptyProtocol.java @@ -19,7 +19,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.remoting.api.pu.ChannelOperator; -import io.netty.channel.ChannelPipeline; import io.netty.handler.ssl.SslContext; public class EmptyProtocol implements WireProtocol { @@ -34,7 +33,7 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) { } @Override - public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) { + public void configClientPipeline(URL url, ChannelOperator operator, SslContext sslContext) { } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java index 299d8abc9d95..7911b480fdf2 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java @@ -110,6 +110,7 @@ private void initBootstrap() { nettyBootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { + NettyChannel nettyChannel = NettyChannel.getOrAddChannel(ch, getUrl(), getChannelHandler()); final ChannelPipeline pipeline = ch.pipeline(); SslContext sslContext = null; if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { @@ -120,7 +121,9 @@ protected void initChannel(SocketChannel ch) { // TODO support IDLE // int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); pipeline.addLast("connectionHandler", connectionHandler); - protocol.configClientPipeline(getUrl(), pipeline, sslContext); + + NettyConfigOperator operator = new NettyConfigOperator(nettyChannel, getChannelHandler()); + protocol.configClientPipeline(getUrl(), operator, sslContext); // TODO support Socks5 } }); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java index 595617574281..7bd7104b7744 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/api/EmptyWireProtocol.java @@ -36,7 +36,7 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) { } @Override - public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) { + public void configClientPipeline(URL url, ChannelOperator operator, SslContext sslContext) { } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index 170ad30614d0..4516f27a249b 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -158,7 +158,7 @@ private Executor lookupExecutor(URL url) { } @Override - public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) { + public void configClientPipeline(URL url, ChannelOperator operator, SslContext sslContext) { final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient() .gracefulShutdownTimeoutMillis(10000) .initialSettings(new Http2Settings().headerTableSize( @@ -175,6 +175,10 @@ public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext s .build(); final Http2MultiplexHandler handler = new Http2MultiplexHandler( new TripleClientHandler(frameworkModel)); - pipeline.addLast(codec, handler, new TripleTailHandler()); + List handlers = new ArrayList<>(); + handlers.add(new ChannelHandlerPretender(codec)); + handlers.add(new ChannelHandlerPretender(handler)); + handlers.add(new ChannelHandlerPretender(new TripleTailHandler())); + operator.configChannelHandler(handlers); } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java index 020cbfc522c9..a0a9ac16f305 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleInvokerTest.java @@ -19,15 +19,9 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.stream.StreamObserver; import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; -import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.ChannelHandler; -import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient; import org.apache.dubbo.remoting.api.connection.ConnectionManager; -import org.apache.dubbo.remoting.api.connection.SingleProtocolConnectionManager; -import org.apache.dubbo.remoting.api.pu.DefaultPuHandler; -import org.apache.dubbo.remoting.exchange.PortUnificationExchanger; -import org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer; import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.model.MethodDescriptor; import org.apache.dubbo.rpc.model.ReflectionMethodDescriptor; @@ -38,12 +32,10 @@ import io.netty.channel.Channel; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import java.util.HashSet; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import static org.mockito.ArgumentMatchers.any;