Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class TripleProtocol extends AbstractProtocol {
*/
public static boolean CONVERT_NO_LOWER_HEADER = false;


public TripleProtocol(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
this.triBuiltinService = new TriBuiltinService(frameworkModel);
Expand All @@ -80,6 +79,7 @@ public int getDefaultPort() {
return 50051;
}


@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,14 @@
import org.apache.dubbo.remoting.api.Connection;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
import org.apache.dubbo.rpc.protocol.tri.RequestMetadata;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.compressor.Identity;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
import org.apache.dubbo.rpc.protocol.tri.stream.StreamUtils;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleClientStream;

import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.ErrorInfo;
import com.google.rpc.Status;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

Expand Down Expand Up @@ -111,15 +100,8 @@ public void onComplete(TriRpcStatus status, Map<String, Object> attachments,
return;
}
done = true;
final TriRpcStatus detailStatus;
final TriRpcStatus statusFromTrailers = getStatusFromTrailers(excludeHeaders);
if (statusFromTrailers != null) {
detailStatus = statusFromTrailers;
} else {
detailStatus = status;
}
try {
listener.onClose(detailStatus, StreamUtils.toAttachments(attachments));
listener.onClose(status, StreamUtils.toAttachments(attachments));
} catch (Throwable t) {
cancelByLocal(
TriRpcStatus.INTERNAL.withDescription("Close stream error").withCause(t)
Expand All @@ -130,59 +112,6 @@ public void onComplete(TriRpcStatus status, Map<String, Object> attachments,
}
}

private TriRpcStatus getStatusFromTrailers(Map<String, String> metadata) {
if (null == metadata) {
return null;
}
// second get status detail
if (!metadata.containsKey(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader())) {
return null;
}
final String raw = (metadata.remove(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader()));
byte[] statusDetailBin = StreamUtils.decodeASCIIByte(raw);
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
final Status statusDetail = Status.parseFrom(statusDetailBin);
List<Any> detailList = statusDetail.getDetailsList();
Map<Class<?>, Object> classObjectMap = tranFromStatusDetails(detailList);

// get common exception from DebugInfo
TriRpcStatus status = TriRpcStatus.fromCode(statusDetail.getCode())
.withDescription(TriRpcStatus.decodeMessage(statusDetail.getMessage()));
DebugInfo debugInfo = (DebugInfo) classObjectMap.get(DebugInfo.class);
if (debugInfo != null) {
String msg = ExceptionUtils.getStackFrameString(
debugInfo.getStackEntriesList());
status = status.appendDescription(msg);
}
return status;
} catch (IOException ioException) {
return null;
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}

}

private Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
Map<Class<?>, Object> map = new HashMap<>(detailList.size());
try {
for (Any any : detailList) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
map.putIfAbsent(ErrorInfo.class, errorInfo);
} else if (any.is(DebugInfo.class)) {
DebugInfo debugInfo = any.unpack(DebugInfo.class);
map.putIfAbsent(DebugInfo.class, debugInfo);
}
// support others type but now only support this
}
} catch (Throwable t) {
LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "tran from grpc-status-details error", t);
}
return map;
}

@Override
public void onStart() {
listener.onStart(TripleClientCall.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dubbo.rpc.protocol.tri.stream;

import org.apache.dubbo.common.threadpool.serial.SerializingExecutor;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;

import java.util.concurrent.Executor;
Expand All @@ -30,8 +31,26 @@ public abstract class AbstractStream implements Stream {
protected final Executor executor;
protected final FrameworkModel frameworkModel;


private static final boolean HAS_PROTOBUF = hasProtobuf();

public AbstractStream(Executor executor, FrameworkModel frameworkModel) {
this.executor = new SerializingExecutor(executor);
this.frameworkModel = frameworkModel;
}


public static boolean getGrpcStatusDetailEnabled() {
return HAS_PROTOBUF;
}


private static boolean hasProtobuf() {
try {
ClassUtils.forName("com.google.protobuf.Message");
return true;
} catch (ClassNotFoundException ignore) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.ClassLoadUtil;
import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
import org.apache.dubbo.rpc.protocol.tri.command.CancelQueueCommand;
import org.apache.dubbo.rpc.protocol.tri.command.DataQueueCommand;
Expand All @@ -36,6 +38,10 @@
import org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
import org.apache.dubbo.rpc.protocol.tri.transport.WriteQueue;

import com.google.protobuf.Any;
import com.google.rpc.DebugInfo;
import com.google.rpc.ErrorInfo;
import com.google.rpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -46,11 +52,16 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;


/**
* ClientStream is an abstraction for bi-directional messaging. It maintains a {@link WriteQueue} to
Expand Down Expand Up @@ -195,7 +206,14 @@ void finishProcess(TriRpcStatus status, Http2Headers trailers) {
final Map<String, Object> attachments = headersToMap(trailers, () -> {
return reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
});
listener.onComplete(status, attachments, reserved);
final TriRpcStatus detailStatus;
final TriRpcStatus statusFromTrailers = getStatusFromTrailers(reserved);
if (statusFromTrailers != null) {
detailStatus = statusFromTrailers;
} else {
detailStatus = status;
}
listener.onComplete(detailStatus, attachments, reserved);
}

private TriRpcStatus validateHeaderStatus(Http2Headers headers) {
Expand Down Expand Up @@ -311,6 +329,64 @@ private TriRpcStatus statusFromTrailers(Http2Headers trailers) {
"missing GRPC status, inferred error from HTTP status code");
}


private TriRpcStatus getStatusFromTrailers(Map<String, String> metadata) {
if (null == metadata) {
return null;
}
if (!getGrpcStatusDetailEnabled()){
return null;
}
// second get status detail
if (!metadata.containsKey(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader())) {
return null;
}
final String raw = (metadata.remove(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader()));
byte[] statusDetailBin = StreamUtils.decodeASCIIByte(raw);
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
try {
final Status statusDetail = Status.parseFrom(statusDetailBin);
List<Any> detailList = statusDetail.getDetailsList();
Map<Class<?>, Object> classObjectMap = tranFromStatusDetails(detailList);

// get common exception from DebugInfo
TriRpcStatus status = TriRpcStatus.fromCode(statusDetail.getCode())
.withDescription(TriRpcStatus.decodeMessage(statusDetail.getMessage()));
DebugInfo debugInfo = (DebugInfo) classObjectMap.get(DebugInfo.class);
if (debugInfo != null) {
String msg = ExceptionUtils.getStackFrameString(
debugInfo.getStackEntriesList());
status = status.appendDescription(msg);
}
return status;
} catch (IOException ioException) {
return null;
} finally {
ClassLoadUtil.switchContextLoader(tccl);
}

}


private Map<Class<?>, Object> tranFromStatusDetails(List<Any> detailList) {
Map<Class<?>, Object> map = new HashMap<>(detailList.size());
try {
for (Any any : detailList) {
if (any.is(ErrorInfo.class)) {
ErrorInfo errorInfo = any.unpack(ErrorInfo.class);
map.putIfAbsent(ErrorInfo.class, errorInfo);
} else if (any.is(DebugInfo.class)) {
DebugInfo debugInfo = any.unpack(DebugInfo.class);
map.putIfAbsent(DebugInfo.class, debugInfo);
}
// support others type but now only support this
}
} catch (Throwable t) {
LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "tran from grpc-status-details error", t);
}
return map;
}

@Override
public void onHeader(Http2Headers headers, boolean endStream) {
executor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ private Http2Headers getTrailers(TriRpcStatus rpcStatus, Map<String, Object> att
String grpcMessage = getGrpcMessage(rpcStatus);
grpcMessage = TriRpcStatus.encodeMessage(TriRpcStatus.limitSizeTo1KB(grpcMessage));
headers.set(TripleHeaderEnum.MESSAGE_KEY.getHeader(), grpcMessage);
if (!getGrpcStatusDetailEnabled()) {
return headers;
}
Status.Builder builder = Status.newBuilder().setCode(rpcStatus.code.code)
.setMessage(grpcMessage);
Throwable throwable = rpcStatus.cause;
Expand Down