Skip to content

Commit d78e359

Browse files
committed
HBASE-25401 Add trace support for async call in rpc client (#2790)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
1 parent b6e1e4a commit d78e359

14 files changed

Lines changed: 299 additions & 174 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
2222
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
2323

24+
import io.opentelemetry.api.trace.Span;
25+
import io.opentelemetry.api.trace.StatusCode;
26+
import io.opentelemetry.context.Scope;
2427
import java.io.IOException;
2528
import java.net.SocketAddress;
2629
import java.util.Collection;
@@ -38,6 +41,7 @@
3841
import org.apache.hadoop.hbase.net.Address;
3942
import org.apache.hadoop.hbase.security.User;
4043
import org.apache.hadoop.hbase.security.UserProvider;
44+
import org.apache.hadoop.hbase.trace.TraceUtil;
4145
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4246
import org.apache.hadoop.hbase.util.PoolMap;
4347
import org.apache.hadoop.hbase.util.Threads;
@@ -365,7 +369,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
365369
protected abstract T createConnection(ConnectionId remoteId) throws IOException;
366370

367371
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
368-
RpcCallback<Message> callback) {
372+
RpcCallback<Message> callback) {
369373
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
370374
if (metrics != null) {
371375
metrics.updateRpc(call.md, call.param, call.callStats);
@@ -388,44 +392,59 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
388392
}
389393
}
390394

391-
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
395+
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
392396
final Message param, Message returnType, final User ticket,
393397
final Address addr, final RpcCallback<Message> callback) {
394-
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
395-
cs.setStartTime(EnvironmentEdgeManager.currentTime());
396-
397-
if (param instanceof ClientProtos.MultiRequest) {
398-
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
399-
int numActions = 0;
400-
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
401-
numActions += regionAction.getActionCount();
402-
}
398+
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
399+
.startSpan();
400+
try (Scope scope = span.makeCurrent()) {
401+
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
402+
cs.setStartTime(EnvironmentEdgeManager.currentTime());
403+
404+
if (param instanceof ClientProtos.MultiRequest) {
405+
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
406+
int numActions = 0;
407+
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
408+
numActions += regionAction.getActionCount();
409+
}
403410

404-
cs.setNumActionsPerServer(numActions);
405-
}
411+
cs.setNumActionsPerServer(numActions);
412+
}
406413

407-
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
408-
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
414+
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
415+
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
409416
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
410417
@Override
411418
public void run(Call call) {
412-
counter.decrementAndGet();
413-
onCallFinished(call, hrc, addr, callback);
419+
try (Scope scope = call.span.makeCurrent()) {
420+
counter.decrementAndGet();
421+
onCallFinished(call, hrc, addr, callback);
422+
} finally {
423+
if (hrc.failed()) {
424+
span.setStatus(StatusCode.ERROR);
425+
span.recordException(hrc.getFailed());
426+
} else {
427+
span.setStatus(StatusCode.OK);
428+
}
429+
span.end();
430+
}
414431
}
415432
}, cs);
416-
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
417-
int count = counter.incrementAndGet();
418-
try {
419-
if (count > maxConcurrentCallsPerServer) {
420-
throw new ServerTooBusyException(addr, count);
433+
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
434+
int count = counter.incrementAndGet();
435+
try {
436+
if (count > maxConcurrentCallsPerServer) {
437+
throw new ServerTooBusyException(addr, count);
438+
}
439+
cs.setConcurrentCallsPerServer(count);
440+
T connection = getConnection(remoteId);
441+
connection.sendRequest(call, hrc);
442+
} catch (Exception e) {
443+
call.setException(toIOE(e));
444+
span.end();
421445
}
422-
cs.setConcurrentCallsPerServer(count);
423-
T connection = getConnection(remoteId);
424-
connection.sendRequest(call, hrc);
425-
} catch (Exception e) {
426-
call.setException(toIOE(e));
446+
return call;
427447
}
428-
return call;
429448
}
430449

431450
private static Address createAddr(ServerName sn) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
2525
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
2626

27-
import io.opentelemetry.api.trace.Span;
28-
import io.opentelemetry.context.Context;
2927
import io.opentelemetry.context.Scope;
3028
import java.io.BufferedInputStream;
3129
import java.io.BufferedOutputStream;
@@ -57,7 +55,6 @@
5755
import org.apache.hadoop.hbase.security.SaslUtil;
5856
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
5957
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
60-
import org.apache.hadoop.hbase.trace.TraceUtil;
6158
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
6259
import org.apache.hadoop.hbase.util.ExceptionUtil;
6360
import org.apache.hadoop.io.IOUtils;
@@ -192,8 +189,8 @@ public void run() {
192189
if (call.isDone()) {
193190
continue;
194191
}
195-
try {
196-
tracedWriteRequest(call);
192+
try (Scope scope = call.span.makeCurrent()) {
193+
writeRequest(call);
197194
} catch (IOException e) {
198195
// exception here means the call has not been added to the pendingCalls yet, so we need
199196
// to fail it by our own.
@@ -594,16 +591,6 @@ private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
594591
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
595592
}
596593

597-
private void tracedWriteRequest(Call call) throws IOException {
598-
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
599-
.setParent(Context.current().with(call.span)).startSpan();
600-
try (Scope scope = span.makeCurrent()) {
601-
writeRequest(call);
602-
} finally {
603-
span.end();
604-
}
605-
}
606-
607594
/**
608595
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
609596
* the Connection thread, but by other threads.
@@ -811,7 +798,9 @@ public void run(boolean cancelled) throws IOException {
811798
if (callSender != null) {
812799
callSender.sendCall(call);
813800
} else {
814-
tracedWriteRequest(call);
801+
// this is in the same thread with the caller so do not need to attach the trace context
802+
// again.
803+
writeRequest(call);
815804
}
816805
}
817806
});

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class Call {
6161
final Span span;
6262
Timeout timeoutTask;
6363

64-
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
64+
Call(int id, final Descriptors.MethodDescriptor md, Message param,
6565
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
6666
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
6767
this.param = param;

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.api.GlobalOpenTelemetry;
21+
import io.opentelemetry.context.Context;
2022
import java.io.IOException;
2123
import java.io.OutputStream;
2224
import java.lang.reflect.InvocationTargetException;
@@ -48,6 +50,7 @@
4850
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
4951
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
5052
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
53+
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
5154

5255
/**
5356
* Utility to help ipc'ing.
@@ -111,11 +114,10 @@ public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
111114
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
112115
RequestHeader.Builder builder = RequestHeader.newBuilder();
113116
builder.setCallId(call.id);
114-
//TODO handle htrace API change, see HBASE-18895
115-
/*if (call.span != null) {
116-
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
117-
.setTraceId(call.span.getTracerId()));
118-
}*/
117+
RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
118+
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
119+
traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
120+
builder.setTraceInfo(traceBuilder.build());
119121
builder.setMethodName(call.md.getName());
120122
builder.setRequestParam(call.param != null);
121123
if (cellBlockMeta != null) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.context.Scope;
2021
import java.io.IOException;
2122
import java.util.HashMap;
2223
import java.util.Map;
@@ -114,9 +115,12 @@ private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise p
114115

115116
@Override
116117
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
117-
throws Exception {
118+
throws Exception {
118119
if (msg instanceof Call) {
119-
writeRequest(ctx, (Call) msg, promise);
120+
Call call = (Call) msg;
121+
try (Scope scope = call.span.makeCurrent()) {
122+
writeRequest(ctx, call, promise);
123+
}
120124
} else {
121125
ctx.write(msg, promise);
122126
}

hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.trace;
1919

20-
import io.opentelemetry.api.OpenTelemetry;
20+
import io.opentelemetry.api.GlobalOpenTelemetry;
2121
import io.opentelemetry.api.trace.Tracer;
2222
import org.apache.yetus.audience.InterfaceAudience;
2323

@@ -30,6 +30,6 @@ private TraceUtil() {
3030
}
3131

3232
public static Tracer getGlobalTracer() {
33-
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
33+
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
3434
}
3535
}

hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
2323
option java_generate_equals_and_hash = true;
2424
option optimize_for = SPEED;
2525

26-
//Used to pass through the information necessary to continue
27-
//a trace after an RPC is made. All we need is the traceid
28-
//(so we know the overarching trace this message is a part of), and
29-
//the id of the current span when this message was sent, so we know
30-
//what span caused the new span we will create when this message is received.
26+
// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
27+
// is a text-based approach that passes properties with http headers. Here we will also use this
28+
// approach so we just need a map to store the key value pair.
29+
3130
message RPCTInfo {
32-
optional int64 trace_id = 1;
33-
optional int64 parent_id = 2;
31+
optional int64 trace_id = 1 [deprecated = true];
32+
optional int64 parent_id = 2 [deprecated = true];
33+
map<string, string> headers = 3;
3434
}

hbase-server/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,16 @@
422422
<artifactId>hamcrest-core</artifactId>
423423
<scope>test</scope>
424424
</dependency>
425+
<dependency>
426+
<groupId>io.opentelemetry</groupId>
427+
<artifactId>opentelemetry-sdk</artifactId>
428+
<scope>test</scope>
429+
</dependency>
430+
<dependency>
431+
<groupId>io.opentelemetry</groupId>
432+
<artifactId>opentelemetry-sdk-testing</artifactId>
433+
<scope>test</scope>
434+
</dependency>
425435
<dependency>
426436
<groupId>org.hamcrest</groupId>
427437
<artifactId>hamcrest-library</artifactId>

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import io.opentelemetry.api.trace.Span;
21+
import io.opentelemetry.api.trace.StatusCode;
22+
import io.opentelemetry.context.Context;
2123
import io.opentelemetry.context.Scope;
2224
import java.net.InetSocketAddress;
2325
import java.nio.channels.ClosedChannelException;
@@ -72,15 +74,6 @@ public RpcCall getRpcCall() {
7274
return call;
7375
}
7476

75-
/**
76-
* Keep for backward compatibility.
77-
* @deprecated As of release 2.0, this will be removed in HBase 3.0
78-
*/
79-
@Deprecated
80-
public ServerCall<?> getCall() {
81-
return (ServerCall<?>) call;
82-
}
83-
8477
public void setStatus(MonitoredRPCHandler status) {
8578
this.status = status;
8679
}
@@ -129,7 +122,8 @@ public void run() {
129122
String serviceName = getServiceName();
130123
String methodName = getMethodName();
131124
String traceString = serviceName + "." + methodName;
132-
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
125+
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
126+
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
133127
try (Scope traceScope = span.makeCurrent()) {
134128
if (!this.rpcServer.isStarted()) {
135129
InetSocketAddress address = rpcServer.getListenerAddress();
@@ -140,8 +134,12 @@ public void run() {
140134
resultPair = this.rpcServer.call(call, this.status);
141135
} catch (TimeoutIOException e){
142136
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
137+
span.recordException(e);
138+
span.setStatus(StatusCode.ERROR);
143139
return;
144140
} catch (Throwable e) {
141+
span.recordException(e);
142+
span.setStatus(StatusCode.ERROR);
145143
if (e instanceof ServerNotRunningYetException) {
146144
// If ServerNotRunningYetException, don't spew stack trace.
147145
if (RpcServer.LOG.isTraceEnabled()) {
@@ -160,6 +158,7 @@ public void run() {
160158
RpcServer.CurCall.set(null);
161159
if (resultPair != null) {
162160
this.rpcServer.addCallSize(call.getSize() * -1);
161+
span.setStatus(StatusCode.OK);
163162
sucessful = true;
164163
}
165164
span.end();

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.api.trace.Span;
21+
import io.opentelemetry.api.trace.StatusCode;
2022
import java.io.IOException;
2123
import java.net.InetAddress;
2224
import java.nio.ByteBuffer;
@@ -99,6 +101,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
99101
// the current implementation. We should fix this in the future.
100102
private final AtomicInteger reference = new AtomicInteger(0b01);
101103

104+
private final Span span;
105+
102106
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
103107
justification = "Can't figure why this complaint is happening... see below")
104108
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
@@ -129,6 +133,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
129133
this.bbAllocator = byteBuffAllocator;
130134
this.cellBlockBuilder = cellBlockBuilder;
131135
this.reqCleanup = reqCleanup;
136+
this.span = Span.current();
132137
}
133138

134139
/**
@@ -147,6 +152,7 @@ public void done() {
147152
// If the call was run successfuly, we might have already returned the BB
148153
// back to pool. No worries..Then inputCellBlock will be null
149154
cleanup();
155+
span.end();
150156
}
151157

152158
private void release(int mask) {
@@ -226,6 +232,10 @@ public synchronized void setResponse(Message m, final CellScanner cells, Throwab
226232
}
227233
if (t != null) {
228234
this.isError = true;
235+
span.recordException(t);
236+
span.setStatus(StatusCode.ERROR);
237+
} else {
238+
span.setStatus(StatusCode.OK);
229239
}
230240
BufferChain bc = null;
231241
try {
@@ -549,4 +559,8 @@ public int getRemotePort() {
549559
public synchronized BufferChain getResponse() {
550560
return response;
551561
}
562+
563+
public Span getSpan() {
564+
return span;
565+
}
552566
}

0 commit comments

Comments
 (0)