Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package org.apache.dubbo.common.threadpool.manager;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.executor.IsolationExecutorSupportFactory;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.concurrent.ExecutorService;

Expand All @@ -45,13 +45,17 @@ protected URL setThreadNameIfAbsent(URL url, String executorCacheKey) {

@Override
protected String getProviderKey(URL url) {
return url.getServiceKey();
if (url.getAttributes().containsKey(SERVICE_EXECUTOR)) {
return url.getServiceKey();
} else {
return super.getProviderKey(url);
}
}

@Override
protected ExecutorService createExecutor(URL url) {
Object executor = url.getAttributes().get(SERVICE_EXECUTOR);
if (executor != null && executor instanceof ExecutorService) {
if (executor instanceof ExecutorService) {
return (ExecutorService) executor;
}
return super.createExecutor(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,8 @@ public interface Constants {
String IGNORE_CHECK_KEYS = "ignoreCheckKeys";

String PARAMETERS = "parameters";

String SERVER_THREAD_POOL_NAME = "DubboServerHandler";


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,68 +18,50 @@

import org.apache.dubbo.common.ServiceKey;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.resource.GlobalResourcesRepository;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
import org.apache.dubbo.rpc.model.ProviderModel;

import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;

public abstract class AbstractIsolationExecutorSupport implements ExecutorSupport {
private final URL url;
private final ExecutorRepository executorRepository;
private final Map<String, Executor> executorMap;
private final FrameworkServiceRepository frameworkServiceRepository;

public AbstractIsolationExecutorSupport(URL url) {
this.url = url;
this.executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
this.executorMap = new HashMap<>();
GlobalResourcesRepository.getInstance().registerDisposable(this::destroy);
this.frameworkServiceRepository = url.getOrDefaultFrameworkModel().getServiceRepository();
}

public Executor getExecutor(Object data) {

ServiceKey serviceKey = getServiceKey(data);
if (!isValid(serviceKey)) {
return null;
ProviderModel providerModel = getProviderModel(data);
if (providerModel == null) {
return executorRepository.getExecutor(url);
}
String interfaceName = serviceKey.getInterfaceName();
String version = serviceKey.getVersion();
String group = serviceKey.getGroup();
String cachedKey = URL.buildKey(interfaceName, group, version);
if (executorMap.containsKey(cachedKey)) {
return executorMap.get(cachedKey);

List<URL> serviceUrls = providerModel.getServiceUrls();
if (serviceUrls == null || serviceUrls.isEmpty()) {
return executorRepository.getExecutor(url);
}

synchronized (this) {
if (executorMap.containsKey(cachedKey)) {
return executorMap.get(cachedKey);
for (URL serviceUrl : serviceUrls) {
if (serviceUrl.getProtocol().equals(url.getProtocol()) && serviceUrl.getPort() == url.getPort()) {
return executorRepository.getExecutor(serviceUrl);
}
Map<String, String> parameters = url.getParameters();
parameters.put(GROUP_KEY, group);
parameters.put(INTERFACE_KEY, interfaceName);
parameters.put(VERSION_KEY, version);
ServiceConfigURL tmpURL = new ServiceConfigURL(url.getProtocol(), url.getHost(), url.getPort(), interfaceName, parameters);
ExecutorService executor = executorRepository.getExecutor(tmpURL);
executorMap.put(cachedKey, executor);
return executor;
}
}

public synchronized void destroy() {
executorMap.clear();
}

private boolean isValid(ServiceKey serviceKey) {
return serviceKey != null && StringUtils.isNotEmpty(serviceKey.getInterfaceName());
return executorRepository.getExecutor(serviceUrls.get(0));
}

protected abstract ServiceKey getServiceKey(Object data);
}

private ProviderModel getProviderModel(Object data) {
ServiceKey serviceKey = getServiceKey(data);
if (serviceKey == null) {
return null;
}
return frameworkServiceRepository.lookupExportedService(serviceKey.toString());
}}
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_UNEXPECTED_EXCEPTION;
import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;
import static org.apache.dubbo.remoting.Constants.ACCEPTS_KEY;
import static org.apache.dubbo.remoting.Constants.DEFAULT_ACCEPTS;

/**
* AbstractServer
*/
public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer {

protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractServer.class);
private Set<ExecutorService> executors = new ConcurrentHashSet<>();
private InetSocketAddress localAddress;
Expand Down Expand Up @@ -75,7 +74,7 @@ public AbstractServer(URL url, ChannelHandler handler) throws RemotingException
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + bindAddress + ", cause: " + t.getMessage(), t);
}
executors.add(executorRepository.createExecutorIfAbsent(url));
executors.add(executorRepository.createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

protected abstract void doOpen() throws Throwable;
Expand All @@ -99,7 +98,7 @@ public void reset(URL url) {
logger.error(TRANSPORT_UNEXPECTED_EXCEPTION, "", "", t.getMessage(), t);
}

ExecutorService executor = executorRepository.createExecutorIfAbsent(url);
ExecutorService executor = executorRepository.createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));
executors.add(executor);
executorRepository.updateThreadpool(url, executor);
super.setUrl(getUrl().addParameters(url.getParameters()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
Expand All @@ -30,23 +29,26 @@
import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.remoting.api.pu.AbstractPortUnificationServer;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.dubbo.common.constants.CommonConstants.BACKLOG_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.IO_THREADS_KEY;

import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.BACKLOG_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.IO_THREADS_KEY;
import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_BOSS_POOL_NAME;
import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_WORKER_POOL_NAME;

Expand All @@ -65,7 +67,7 @@ public class NettyPortUnificationServer extends AbstractPortUnificationServer {


public NettyPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
super(url, ChannelHandlers.wrap(handler, url));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
Expand Down Expand Up @@ -65,7 +64,7 @@ public class NettyServer extends AbstractServer implements RemotingServer {
private org.jboss.netty.channel.Channel channel;

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
super(url, ChannelHandlers.wrap(handler, url));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.transport.netty4.ssl.SslServerTlsHandler;
import org.apache.dubbo.remoting.transport.AbstractServer;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
import org.apache.dubbo.remoting.transport.netty4.ssl.SslServerTlsHandler;
import org.apache.dubbo.remoting.utils.UrlUtils;

import io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -83,7 +82,7 @@ public class NettyServer extends AbstractServer {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREAD_POOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
super(url, ChannelHandlers.wrap(handler, url));

// read config before destroy
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dubbo.common.threadlocal.InternalThreadLocalMap;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
Expand Down Expand Up @@ -54,6 +55,7 @@
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;

/**
Expand Down Expand Up @@ -123,7 +125,7 @@ public Result doInvoke(Invocation invocation) throws Throwable {
if (isAsync(invoker.getUrl(), getUrl())) {
((RpcInvocation) copiedInvocation).setInvokeMode(InvokeMode.ASYNC);
// use consumer executor
ExecutorService executor = executorRepository.createExecutorIfAbsent(getUrl());
ExecutorService executor = executorRepository.createExecutorIfAbsent(ExecutorUtil.setThreadName(getUrl(), SERVER_THREAD_POOL_NAME));
CompletableFuture<AppResponse> appResponseFuture = CompletableFuture.supplyAsync(() -> {
Result result = invoker.invoke(copiedInvocation);
if (result.hasException()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,22 @@
import org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleServerConnectionHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleTailHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;

import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;

import static org.apache.dubbo.common.constants.CommonConstants.HEADER_FILTER_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
Expand Down Expand Up @@ -152,13 +150,6 @@ protected void initChannel(Http2StreamChannel ch) {

}


private Executor lookupExecutor(URL url) {
return url.getOrDefaultApplicationModel()
.getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension().getExecutor(url);
}

@Override
public void configClientPipeline(URL url, ChannelOperator operator, SslContext sslContext) {
final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;

import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;
import static org.apache.dubbo.rpc.Constants.H2_SUPPORT_NO_LOWER_HEADER_KEY;

public class TripleProtocol extends AbstractProtocol {
Expand Down Expand Up @@ -117,7 +119,7 @@ public void afterUnExport() {
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
// init
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).createExecutorIfAbsent(url);
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()).createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));

PortUnificationExchanger.bind(url, new DefaultPuHandler());
optimizeSerialization(url);
Expand All @@ -137,7 +139,7 @@ public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
}

private ExecutorService getOrCreateStreamExecutor(ApplicationModel applicationModel, URL url) {
ExecutorService executor = ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(url);
ExecutorService executor = ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME));
Objects.requireNonNull(executor,
String.format("No available executor found in %s", url));
return executor;
Expand Down