diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java new file mode 100644 index 000000000000..217b8c1496f4 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameCodecBuilder.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.rpc.protocol.tri; + +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.Http2CodecUtil; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2LocalFlowController; +import io.netty.handler.codec.http2.Http2RemoteFlowController; +import org.apache.dubbo.common.utils.Assert; + +import java.util.function.Consumer; + +public class TripleHttp2FrameCodecBuilder extends Http2FrameCodecBuilder { + + TripleHttp2FrameCodecBuilder(Http2Connection connection) { + connection(connection); + } + + public static TripleHttp2FrameCodecBuilder fromConnection(Http2Connection connection) { + return new TripleHttp2FrameCodecBuilder(connection); + } + + public static TripleHttp2FrameCodecBuilder forClient() { + return forClient(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS); + } + + public static TripleHttp2FrameCodecBuilder forClient(int maxReservedStreams) { + return fromConnection(new DefaultHttp2Connection(false, maxReservedStreams)); + } + + public static TripleHttp2FrameCodecBuilder forServer() { + return forServer(Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS); + } + + public static TripleHttp2FrameCodecBuilder forServer(int maxReservedStreams) { + return fromConnection(new DefaultHttp2Connection(true, maxReservedStreams)); + } + + public TripleHttp2FrameCodecBuilder customizeConnection(Consumer connectionCustomizer) { + Http2Connection connection = this.connection(); + Assert.notNull(connection, "connection cannot be null."); + connectionCustomizer.accept(connection); + return this; + } + + public TripleHttp2FrameCodecBuilder remoteFlowController(Http2RemoteFlowController remoteFlowController) { + return this.customizeConnection((connection) -> connection.remote().flowController(remoteFlowController)); + } + + public TripleHttp2FrameCodecBuilder localFlowController(Http2LocalFlowController localFlowController) { + return this.customizeConnection((connection) -> connection.local().flowController(localFlowController)); + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java index d13e4bfcd275..321af9d61452 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java @@ -43,7 +43,6 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http2.Http2FrameCodec; -import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2MultiplexHandler; import io.netty.handler.codec.http2.Http2Settings; @@ -113,7 +112,8 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) { } else { headFilters = Collections.emptyList(); } - final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer() + final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer() + .customizeConnection((connection) -> connection.remote().flowController(new TriHttp2RemoteFlowController(connection, url.getOrDefaultApplicationModel()))) .gracefulShutdownTimeoutMillis(10000) .initialSettings(new Http2Settings().headerTableSize( config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE)) @@ -127,7 +127,6 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) { .frameLogger(SERVER_LOGGER) .build(); ExecutorSupport executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).getExecutorSupport(url); - codec.connection().remote().flowController(new TriHttp2RemoteFlowController(codec.connection(), url.getOrDefaultApplicationModel())); codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter()); TripleWriteQueue writeQueue = new TripleWriteQueue(); final Http2MultiplexHandler handler = new Http2MultiplexHandler( @@ -152,7 +151,8 @@ protected void initChannel(Http2StreamChannel ch) { @Override public void configClientPipeline(URL url, ChannelOperator operator, ContextOperator contextOperator) { - final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient() + final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forClient() + .customizeConnection((connection) -> connection.remote().flowController(new TriHttp2RemoteFlowController(connection, url.getOrDefaultApplicationModel()))) .gracefulShutdownTimeoutMillis(10000) .initialSettings(new Http2Settings().headerTableSize( config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE)) @@ -166,7 +166,6 @@ public void configClientPipeline(URL url, ChannelOperator operator, ContextOpera DEFAULT_MAX_HEADER_LIST_SIZE))) .frameLogger(CLIENT_LOGGER) .build(); - codec.connection().remote().flowController(new TriHttp2RemoteFlowController(codec.connection(), url.getOrDefaultApplicationModel())); codec.connection().local().flowController().frameWriter(codec.encoder().frameWriter()); final Http2MultiplexHandler handler = new Http2MultiplexHandler( new TripleClientHandler(frameworkModel));