Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
Expand All @@ -34,7 +35,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -393,28 +393,40 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;
private final long callStartNanos;

public ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.callStartNanos = Server.getCurCallStartnanos().get();
}

private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
ProcessingDetails details = rpcCall.getProcessingDetails();
rpcCall.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos,
TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -425,28 +426,39 @@ static class ProtobufRpcEngineCallbackImpl
private final RPC.Server server;
private final Call call;
private final String methodName;
private final long setupTime;
private final long callStartNanos;

ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.callStartNanos = Server.getCurCallStartnanos().get();
}

private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
ProcessingDetails details = rpcCall.getProcessingDetails();
rpcCall.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredResponse(RpcWritable.wrap(message));
server.updateDeferredMetrics(methodName, processingTime);
server.updateDeferredMetrics(call, methodName, deltaNanos);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
long deltaNanos = Time.monotonicNowNanos() - callStartNanos;
updateProcessingDetails(call, deltaNanos);
call.setDeferredError(t);
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,19 @@ public static Server get() {
* after the call returns.
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();


private static final ThreadLocal<Long> CUR_CALL_STARTNANOS = new ThreadLocal<Long>();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CUR_CALL_STARTNANOS Can it be deleted directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Have fixed~ Thanks a lot.

/** @return Get the current call. */
@VisibleForTesting
public static ThreadLocal<Call> getCurCall() {
return CurCall;
}


public static ThreadLocal<Long> getCurCallStartnanos() {
return CUR_CALL_STARTNANOS;
}

/**
* Returns the currently active RPC call's sequential ID number. A negative
* call ID indicates an invalid value, such as if there is no currently active
Expand Down Expand Up @@ -638,7 +644,8 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
rpcMetrics.addRpcQueueTime(queueTime);

if (call.isResponseDeferred() || connDropped) {
// call was skipped; don't include it in processing metrics
// The call was skipped; don't include it in processing metrics.
// Will update metrics in method updateDeferredMetrics.
return;
}

Expand Down Expand Up @@ -668,9 +675,41 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped
}
}

void updateDeferredMetrics(String name, long processingTime) {
/**
* Update rpc metrics for defered calls.
* @param call The Rpc Call
* @param name Rpc method name
* @param processingTime processing call in ms unit.
*/
void updateDeferredMetrics(Call call, String name, long processingTime) {
long completionTimeNanos = Time.monotonicNowNanos();
long arrivalTimeNanos = call.timestampNanos;

ProcessingDetails details = call.getProcessingDetails();
long waitTime =
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
long responseTime =
details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
rpcMetrics.addRpcLockWaitTime(waitTime);
rpcMetrics.addRpcProcessingTime(processingTime);
rpcMetrics.addRpcResponseTime(responseTime);
rpcMetrics.addDeferredRpcProcessingTime(processingTime);
rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
// don't include lock wait for detailed metrics.
processingTime -= waitTime;
rpcDetailedMetrics.addProcessingTime(name, processingTime);

// Overall processing time is from arrival to completion.
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
callQueue.addResponseTime(name, call, details);
if (isLogSlowRPC()) {
logSlowRpcCalls(name, call, details);
}
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
rpcMetrics.incrRpcCallSuccesses();
}
Comment on lines +677 to +706
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some duplicate code in this method and updateMetrics; we can extract the common code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KeeProMise Sir, thanks for your reviewing. This is a good suggestion, but i am worrying about the code readability will not be good. What's your opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hfutatzhanghb got it.

}

/**
Expand Down Expand Up @@ -1243,6 +1282,7 @@ public Void run() throws Exception {
}

long startNanos = Time.monotonicNowNanos();
CUR_CALL_STARTNANOS.set(startNanos);
Writable value = null;
ResponseParams responseParams = new ResponseParams();

Expand Down Expand Up @@ -1331,6 +1371,7 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException {
* Send a deferred response, ignoring errors.
*/
private void sendDeferedResponse() {
long startNanos = Time.monotonicNowNanos();
try {
connection.sendResponse(this);
} catch (Exception e) {
Expand All @@ -1342,6 +1383,8 @@ private void sendDeferedResponse() {
.currentThread().getName() + ", CallId="
+ callId + ", hostname=" + getHostAddress());
}
getProcessingDetails().set(Timing.RESPONSE,
Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
}

@Override
Expand Down Expand Up @@ -3220,6 +3263,7 @@ public void run() {
}
} finally {
CurCall.set(null);
CUR_CALL_STARTNANOS.set(null);
numInProcessHandler.decrementAndGet();
IOUtils.cleanupWithLogger(LOG, traceScope);
if (call != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
Expand All @@ -26,25 +27,35 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;

public class TestProtoBufRpcServerHandoff {

public static final Logger LOG =
LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class);

@Test(timeout = 20000)
public void test() throws Exception {
Configuration conf = new Configuration();
private static Configuration conf = null;
private static RPC.Server server = null;
private static InetSocketAddress address = null;

@Before
public void setUp() throws IOException {
conf = new Configuration();

TestProtoBufRpcServerHandoffServer serverImpl =
new TestProtoBufRpcServerHandoffServer();
Expand All @@ -53,18 +64,21 @@

RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class,
ProtobufRpcEngine2.class);
RPC.Server server = new RPC.Builder(conf)
server = new RPC.Builder(conf)
.setProtocol(TestProtoBufRpcServerHandoffProtocol.class)
.setInstance(blockingService)
.setVerbose(true)
.setNumHandlers(1) // Num Handlers explicitly set to 1 for test.
.build();
server.start();

InetSocketAddress address = server.getListenerAddress();
address = server.getListenerAddress();
long serverStartTime = System.currentTimeMillis();
LOG.info("Server started at: " + address + " at time: " + serverStartTime);

}

Check failure on line 79 in hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java#L79

blanks: end of line
@Test(timeout = 20000)
public void test() throws Exception {
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);

Expand Down Expand Up @@ -93,6 +107,40 @@

}

@Test(timeout = 20000)
public void testHandoffMetrics() throws Exception {
final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);

ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletionService<ClientInvocationCallable> completionService =
new ExecutorCompletionService<ClientInvocationCallable>(
executorService);

completionService.submit(new ClientInvocationCallable(client, 5000L));
completionService.submit(new ClientInvocationCallable(client, 5000L));

long submitTime = System.currentTimeMillis();
Future<ClientInvocationCallable> future1 = completionService.take();
Future<ClientInvocationCallable> future2 = completionService.take();

ClientInvocationCallable callable1 = future1.get();
ClientInvocationCallable callable2 = future2.get();

LOG.info(callable1.toString());
LOG.info(callable2.toString());

// Ensure the 5 second sleep responses are within a reasonable time of each
// other.
Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000L);
Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000L);

// Check rpcMetrics
MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
assertCounterGt("DeferredRpcProcessingTimeNumOps", 1L, rb);
assertCounter("RpcProcessingTimeNumOps", 2L, rb);
}

private static class ClientInvocationCallable
implements Callable<ClientInvocationCallable> {
final TestProtoBufRpcServerHandoffProtocol client;
Expand Down
Loading