Skip to content

Commit d2c8603

Browse files
committed
HBASE-25401 Add trace support for async call in rpc client
1 parent 4258d2c commit d2c8603

18 files changed

Lines changed: 329 additions & 266 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
2222
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
2323

24+
import io.opentelemetry.api.trace.Span;
25+
import io.opentelemetry.api.trace.StatusCode;
26+
import io.opentelemetry.context.Scope;
2427
import java.io.IOException;
2528
import java.net.SocketAddress;
2629
import java.util.Collection;
@@ -38,6 +41,7 @@
3841
import org.apache.hadoop.hbase.net.Address;
3942
import org.apache.hadoop.hbase.security.User;
4043
import org.apache.hadoop.hbase.security.UserProvider;
44+
import org.apache.hadoop.hbase.trace.TraceUtil;
4145
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
4246
import org.apache.hadoop.hbase.util.PoolMap;
4347
import org.apache.hadoop.hbase.util.Threads;
@@ -365,7 +369,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
365369
protected abstract T createConnection(ConnectionId remoteId) throws IOException;
366370

367371
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
368-
RpcCallback<Message> callback) {
372+
RpcCallback<Message> callback) {
369373
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
370374
if (metrics != null) {
371375
metrics.updateRpc(call.md, call.param, call.callStats);
@@ -388,44 +392,59 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
388392
}
389393
}
390394

391-
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
395+
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
392396
final Message param, Message returnType, final User ticket,
393397
final Address addr, final RpcCallback<Message> callback) {
394-
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
395-
cs.setStartTime(EnvironmentEdgeManager.currentTime());
396-
397-
if (param instanceof ClientProtos.MultiRequest) {
398-
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
399-
int numActions = 0;
400-
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
401-
numActions += regionAction.getActionCount();
402-
}
398+
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
399+
.startSpan();
400+
try (Scope scope = span.makeCurrent()) {
401+
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
402+
cs.setStartTime(EnvironmentEdgeManager.currentTime());
403+
404+
if (param instanceof ClientProtos.MultiRequest) {
405+
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
406+
int numActions = 0;
407+
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
408+
numActions += regionAction.getActionCount();
409+
}
403410

404-
cs.setNumActionsPerServer(numActions);
405-
}
411+
cs.setNumActionsPerServer(numActions);
412+
}
406413

407-
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
408-
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
414+
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
415+
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
409416
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
410417
@Override
411418
public void run(Call call) {
412-
counter.decrementAndGet();
413-
onCallFinished(call, hrc, addr, callback);
419+
try (Scope scope = call.span.makeCurrent()) {
420+
counter.decrementAndGet();
421+
onCallFinished(call, hrc, addr, callback);
422+
} finally {
423+
if (hrc.failed()) {
424+
span.setStatus(StatusCode.ERROR);
425+
span.recordException(hrc.getFailed());
426+
} else {
427+
span.setStatus(StatusCode.OK);
428+
}
429+
span.end();
430+
}
414431
}
415432
}, cs);
416-
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
417-
int count = counter.incrementAndGet();
418-
try {
419-
if (count > maxConcurrentCallsPerServer) {
420-
throw new ServerTooBusyException(addr, count);
433+
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
434+
int count = counter.incrementAndGet();
435+
try {
436+
if (count > maxConcurrentCallsPerServer) {
437+
throw new ServerTooBusyException(addr, count);
438+
}
439+
cs.setConcurrentCallsPerServer(count);
440+
T connection = getConnection(remoteId);
441+
connection.sendRequest(call, hrc);
442+
} catch (Exception e) {
443+
call.setException(toIOE(e));
444+
span.end();
421445
}
422-
cs.setConcurrentCallsPerServer(count);
423-
T connection = getConnection(remoteId);
424-
connection.sendRequest(call, hrc);
425-
} catch (Exception e) {
426-
call.setException(toIOE(e));
446+
return call;
427447
}
428-
return call;
429448
}
430449

431450
private static Address createAddr(ServerName sn) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
2525
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
2626

27-
import io.opentelemetry.api.trace.Span;
28-
import io.opentelemetry.context.Context;
2927
import io.opentelemetry.context.Scope;
3028
import java.io.BufferedInputStream;
3129
import java.io.BufferedOutputStream;
@@ -57,7 +55,6 @@
5755
import org.apache.hadoop.hbase.security.SaslUtil;
5856
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
5957
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
60-
import org.apache.hadoop.hbase.trace.TraceUtil;
6158
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
6259
import org.apache.hadoop.hbase.util.ExceptionUtil;
6360
import org.apache.hadoop.io.IOUtils;
@@ -192,8 +189,8 @@ public void run() {
192189
if (call.isDone()) {
193190
continue;
194191
}
195-
try {
196-
tracedWriteRequest(call);
192+
try (Scope scope = call.span.makeCurrent()) {
193+
writeRequest(call);
197194
} catch (IOException e) {
198195
// exception here means the call has not been added to the pendingCalls yet, so we need
199196
// to fail it by our own.
@@ -594,16 +591,6 @@ private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
594591
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
595592
}
596593

597-
private void tracedWriteRequest(Call call) throws IOException {
598-
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
599-
.setParent(Context.current().with(call.span)).startSpan();
600-
try (Scope scope = span.makeCurrent()) {
601-
writeRequest(call);
602-
} finally {
603-
span.end();
604-
}
605-
}
606-
607594
/**
608595
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
609596
* the Connection thread, but by other threads.
@@ -811,7 +798,9 @@ public void run(boolean cancelled) throws IOException {
811798
if (callSender != null) {
812799
callSender.sendCall(call);
813800
} else {
814-
tracedWriteRequest(call);
801+
// this is in the same thread with the caller so do not need to attach the trace context
802+
// again.
803+
writeRequest(call);
815804
}
816805
}
817806
});

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class Call {
6161
final Span span;
6262
Timeout timeoutTask;
6363

64-
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
64+
Call(int id, final Descriptors.MethodDescriptor md, Message param,
6565
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
6666
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
6767
this.param = param;

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.api.GlobalOpenTelemetry;
21+
import io.opentelemetry.context.Context;
2022
import java.io.IOException;
2123
import java.io.OutputStream;
2224
import java.lang.reflect.InvocationTargetException;
@@ -48,6 +50,7 @@
4850
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
4951
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
5052
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
53+
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
5154

5255
/**
5356
* Utility to help ipc'ing.
@@ -111,11 +114,10 @@ public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
111114
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
112115
RequestHeader.Builder builder = RequestHeader.newBuilder();
113116
builder.setCallId(call.id);
114-
//TODO handle htrace API change, see HBASE-18895
115-
/*if (call.span != null) {
116-
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
117-
.setTraceId(call.span.getTracerId()));
118-
}*/
117+
RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
118+
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
119+
traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
120+
builder.setTraceInfo(traceBuilder.build());
119121
builder.setMethodName(call.md.getName());
120122
builder.setRequestParam(call.param != null);
121123
if (cellBlockMeta != null) {

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,36 @@
1717
*/
1818
package org.apache.hadoop.hbase.ipc;
1919

20+
import io.opentelemetry.context.Scope;
21+
import java.io.IOException;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import org.apache.hadoop.hbase.CellScanner;
25+
import org.apache.hadoop.hbase.codec.Codec;
2026
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
27+
import org.apache.hadoop.io.compress.CompressionCodec;
28+
import org.apache.hadoop.ipc.RemoteException;
29+
import org.apache.yetus.audience.InterfaceAudience;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
2133
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
2234
import org.apache.hbase.thirdparty.com.google.protobuf.Message.Builder;
2335
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
24-
2536
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
2637
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
2738
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
2839
import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
40+
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
2941
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
3042
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
3143
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
3244
import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner;
3345

34-
import java.io.IOException;
35-
import java.util.HashMap;
36-
import java.util.Map;
37-
38-
import org.apache.hadoop.hbase.CellScanner;
39-
import org.apache.yetus.audience.InterfaceAudience;
40-
import org.slf4j.Logger;
41-
import org.slf4j.LoggerFactory;
42-
import org.apache.hadoop.hbase.codec.Codec;
4346
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
4447
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
4548
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
4649
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
47-
import org.apache.hadoop.io.compress.CompressionCodec;
48-
import org.apache.hadoop.ipc.RemoteException;
4950

5051
/**
5152
* The netty rpc handler.
@@ -103,8 +104,8 @@ private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise p
103104
ctx.write(buf, withoutCellBlockPromise);
104105
ChannelPromise cellBlockPromise = ctx.newPromise();
105106
ctx.write(cellBlock, cellBlockPromise);
106-
PromiseCombiner combiner = new PromiseCombiner();
107-
combiner.addAll(withoutCellBlockPromise, cellBlockPromise);
107+
PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
108+
combiner.addAll((ChannelFuture) withoutCellBlockPromise, (ChannelFuture) cellBlockPromise);
108109
combiner.finish(promise);
109110
} else {
110111
ctx.write(buf, promise);
@@ -114,9 +115,12 @@ private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise p
114115

115116
@Override
116117
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
117-
throws Exception {
118+
throws Exception {
118119
if (msg instanceof Call) {
119-
writeRequest(ctx, (Call) msg, promise);
120+
Call call = (Call) msg;
121+
try (Scope scope = call.span.makeCurrent()) {
122+
writeRequest(ctx, call, promise);
123+
}
120124
} else {
121125
ctx.write(msg, promise);
122126
}

hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.trace;
1919

20-
import io.opentelemetry.api.OpenTelemetry;
20+
import io.opentelemetry.api.GlobalOpenTelemetry;
2121
import io.opentelemetry.api.trace.Tracer;
2222
import org.apache.yetus.audience.InterfaceAudience;
2323

@@ -30,6 +30,6 @@ private TraceUtil() {
3030
}
3131

3232
public static Tracer getGlobalTracer() {
33-
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
33+
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
3434
}
3535
}

hbase-protocol-shaded/src/main/protobuf/rpc/Tracing.proto

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
2323
option java_generate_equals_and_hash = true;
2424
option optimize_for = SPEED;
2525

26-
//Used to pass through the information necessary to continue
27-
//a trace after an RPC is made. All we need is the traceid
28-
//(so we know the overarching trace this message is a part of), and
29-
//the id of the current span when this message was sent, so we know
30-
//what span caused the new span we will create when this message is received.
26+
// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
27+
// is a text-based approach that passes properties with http headers. Here we will also use this
28+
// approach so we just need a map to store the key value pair.
29+
3130
message RPCTInfo {
32-
optional int64 trace_id = 1;
33-
optional int64 parent_id = 2;
31+
optional int64 trace_id = 1 [deprecated = true];
32+
optional int64 parent_id = 2 [deprecated = true];
33+
map<string, string> headers = 3;
3434
}

hbase-server/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,16 @@
422422
<artifactId>hamcrest-core</artifactId>
423423
<scope>test</scope>
424424
</dependency>
425+
<dependency>
426+
<groupId>io.opentelemetry</groupId>
427+
<artifactId>opentelemetry-sdk</artifactId>
428+
<scope>test</scope>
429+
</dependency>
430+
<dependency>
431+
<groupId>io.opentelemetry</groupId>
432+
<artifactId>opentelemetry-sdk-testing</artifactId>
433+
<scope>test</scope>
434+
</dependency>
425435
<dependency>
426436
<groupId>org.hamcrest</groupId>
427437
<artifactId>hamcrest-library</artifactId>

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import io.opentelemetry.api.trace.Span;
21+
import io.opentelemetry.api.trace.StatusCode;
22+
import io.opentelemetry.context.Context;
2123
import io.opentelemetry.context.Scope;
2224
import java.net.InetSocketAddress;
2325
import java.nio.channels.ClosedChannelException;
@@ -72,15 +74,6 @@ public RpcCall getRpcCall() {
7274
return call;
7375
}
7476

75-
/**
76-
* Keep for backward compatibility.
77-
* @deprecated As of release 2.0, this will be removed in HBase 3.0
78-
*/
79-
@Deprecated
80-
public ServerCall<?> getCall() {
81-
return (ServerCall<?>) call;
82-
}
83-
8477
public void setStatus(MonitoredRPCHandler status) {
8578
this.status = status;
8679
}
@@ -129,7 +122,8 @@ public void run() {
129122
String serviceName = getServiceName();
130123
String methodName = getMethodName();
131124
String traceString = serviceName + "." + methodName;
132-
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
125+
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
126+
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
133127
try (Scope traceScope = span.makeCurrent()) {
134128
if (!this.rpcServer.isStarted()) {
135129
InetSocketAddress address = rpcServer.getListenerAddress();
@@ -140,8 +134,12 @@ public void run() {
140134
resultPair = this.rpcServer.call(call, this.status);
141135
} catch (TimeoutIOException e){
142136
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
137+
span.recordException(e);
138+
span.setStatus(StatusCode.ERROR);
143139
return;
144140
} catch (Throwable e) {
141+
span.recordException(e);
142+
span.setStatus(StatusCode.ERROR);
145143
if (e instanceof ServerNotRunningYetException) {
146144
// If ServerNotRunningYetException, don't spew stack trace.
147145
if (RpcServer.LOG.isTraceEnabled()) {
@@ -160,6 +158,7 @@ public void run() {
160158
RpcServer.CurCall.set(null);
161159
if (resultPair != null) {
162160
this.rpcServer.addCallSize(call.getSize() * -1);
161+
span.setStatus(StatusCode.OK);
163162
sucessful = true;
164163
}
165164
span.end();

0 commit comments

Comments
 (0)