Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,63 +62,58 @@ public ConsumerContextFilter(ApplicationModel applicationModel) {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();
try {
RpcContext.getServiceContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0);
RpcContext.getServiceContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0);

RpcContext context = RpcContext.getClientAttachment();
context.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getApplication());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
RpcContext context = RpcContext.getClientAttachment();
context.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getApplication());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}

if (CollectionUtils.isNotEmpty(supportedSelectors)) {
for (PenetrateAttachmentSelector supportedSelector : supportedSelectors) {
Map<String, Object> selected = supportedSelector.select(invocation, RpcContext.getClientAttachment(), RpcContext.getServerAttachment());
if (CollectionUtils.isNotEmptyMap(selected)) {
((RpcInvocation) invocation).addObjectAttachments(selected);
}
if (CollectionUtils.isNotEmpty(supportedSelectors)) {
for (PenetrateAttachmentSelector supportedSelector : supportedSelectors) {
Map<String, Object> selected = supportedSelector.select(invocation, RpcContext.getClientAttachment(), RpcContext.getServerAttachment());
if (CollectionUtils.isNotEmptyMap(selected)) {
((RpcInvocation) invocation).addObjectAttachments(selected);
}
} else {
((RpcInvocation) invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
}
Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
} else {
((RpcInvocation) invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());
}
Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}

// pass default timeout set by end user (ReferenceConfig)
Object countDown = RpcContext.getServerAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
if (countDown != null) {
String methodName = RpcUtils.getMethodName(invocation);
// When the client has enabled the timeout-countdown function,
// the subsequent calls launched by the Server side will be enabled by default,
// and support to turn off the function on a node to get rid of the timeout control.
if (invoker.getUrl().getMethodParameter(methodName, ENABLE_TIMEOUT_COUNTDOWN_KEY, true)) {
context.setObjectAttachment(TIME_COUNTDOWN_KEY, countDown);
// pass default timeout set by end user (ReferenceConfig)
Object countDown = RpcContext.getServerAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
if (countDown != null) {
String methodName = RpcUtils.getMethodName(invocation);
// When the client has enabled the timeout-countdown function,
// the subsequent calls launched by the Server side will be enabled by default,
// and support to turn off the function on a node to get rid of the timeout control.
if (invoker.getUrl().getMethodParameter(methodName, ENABLE_TIMEOUT_COUNTDOWN_KEY, true)) {
context.setObjectAttachment(TIME_COUNTDOWN_KEY, countDown);

TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ RpcUtils.getMethodName(invocation) + ", terminate directly."), invocation);
}
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
+ RpcUtils.getMethodName(invocation) + ", terminate directly."), invocation);
}
}
RpcContext.removeClientResponseContext();
return invoker.invoke(invocation);
} finally {
RpcContext.restoreServiceContext(originServiceContext);
}
RpcContext.removeClientResponseContext();
return invoker.invoke(invocation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.dubbo.common.profiler.ProfilerEntry;
import org.apache.dubbo.common.profiler.ProfilerSwitch;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcServiceContext;
import org.apache.dubbo.rpc.support.RpcUtils;
Expand All @@ -36,53 +37,59 @@ public class InvocationUtil {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(InvokerInvocationHandler.class);

public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
URL url = invoker.getUrl();
String serviceKey = url.getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();

// invoker.getUrl() returns consumer url.
RpcServiceContext.getServiceContext().setConsumerUrl(url);
try {
URL url = invoker.getUrl();
String serviceKey = url.getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);

if (ProfilerSwitch.isEnableSimpleProfiler()) {
ProfilerEntry parentProfiler = Profiler.getBizProfiler();
ProfilerEntry bizProfiler;
if (parentProfiler != null) {
bizProfiler = Profiler.enter(parentProfiler,
"Receive request. Client invoke begin. ServiceKey: " + serviceKey + " MethodName:" + rpcInvocation.getMethodName());
} else {
bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" + rpcInvocation.getMethodName());
}
rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
try {
return invoker.invoke(rpcInvocation).recreate();
} finally {
Profiler.release(bizProfiler);
Long timeout = RpcUtils.convertToNumber(rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY));
// invoker.getUrl() returns consumer url.
RpcServiceContext.getServiceContext().setConsumerUrl(url);

if (timeout == null) {
timeout = (long) url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
TIMEOUT_KEY,
DEFAULT_TIMEOUT);
if (ProfilerSwitch.isEnableSimpleProfiler()) {
ProfilerEntry parentProfiler = Profiler.getBizProfiler();
ProfilerEntry bizProfiler;
if (parentProfiler != null) {
bizProfiler = Profiler.enter(parentProfiler,
"Receive request. Client invoke begin. ServiceKey: " + serviceKey + " MethodName:" + rpcInvocation.getMethodName());
} else {
bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" + rpcInvocation.getMethodName());
}
long usage = bizProfiler.getEndTime() - bizProfiler.getStartTime();
if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) > timeout) {
StringBuilder attachment = new StringBuilder();
rpcInvocation.foreachAttachment((entry) -> {
attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
});
rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);
try {
return invoker.invoke(rpcInvocation).recreate();
} finally {
Profiler.release(bizProfiler);
Long timeout = RpcUtils.convertToNumber(rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY));

if (timeout == null) {
timeout = (long) url.getMethodPositiveParameter(rpcInvocation.getMethodName(),
TIMEOUT_KEY,
DEFAULT_TIMEOUT);
}
long usage = bizProfiler.getEndTime() - bizProfiler.getStartTime();
if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) > timeout) {
StringBuilder attachment = new StringBuilder();
rpcInvocation.foreachAttachment((entry) -> {
attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
});

logger.warn(PROXY_TIMEOUT_REQUEST, "", "", String.format(
"[Dubbo-Consumer] execute service %s#%s cost %d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" + "invocation context:\n%s" + "thread info: \n%s",
rpcInvocation.getProtocolServiceKey(),
rpcInvocation.getMethodName(),
usage / 1000_000,
usage % 1000_000,
timeout,
attachment,
Profiler.buildDetail(bizProfiler)));
logger.warn(PROXY_TIMEOUT_REQUEST, "", "", String.format(
"[Dubbo-Consumer] execute service %s#%s cost %d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" + "invocation context:\n%s" + "thread info: \n%s",
rpcInvocation.getProtocolServiceKey(),
rpcInvocation.getMethodName(),
usage / 1000_000,
usage % 1000_000,
timeout,
attachment,
Profiler.buildDetail(bizProfiler)));
}
}
}
return invoker.invoke(rpcInvocation).recreate();
} finally {
RpcContext.restoreServiceContext(originServiceContext);
}
return invoker.invoke(rpcInvocation).recreate();
}
}