Skip to content

Commit 528a51a

Browse files
committed
HADOOP-16290. Enable RpcMetrics units to be configurable
1 parent 177d906 commit 528a51a

8 files changed

Lines changed: 126 additions & 19 deletions

File tree

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
381381
public static final boolean RPC_METRICS_QUANTILE_ENABLE_DEFAULT = false;
382382
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
383383
"rpc.metrics.percentiles.intervals";
384-
384+
385+
public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit";
386+
385387
/** Allowed hosts for nfs exports */
386388
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
387389
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -725,8 +725,9 @@ public void addResponseTime(String callName, Schedulable schedulable,
725725
addCost(user, processingCost);
726726

727727
int priorityLevel = schedulable.getPriorityLevel();
728-
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
729-
long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
728+
long queueTime = details.get(Timing.QUEUE, RpcMetrics.getMetricsTimeUnit());
729+
long processingTime = details.get(Timing.PROCESSING,
730+
RpcMetrics.getMetricsTimeUnit());
730731

731732
this.decayRpcSchedulerDetailedMetrics.addQueueTime(
732733
priorityLevel, queueTime);

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ default void addResponseTime(String callName, Schedulable schedulable,
6262
// this interface, a default implementation is supplied which uses the old
6363
// method. All new implementations MUST override this interface and should
6464
// NOT use the other addResponseTime method.
65-
int queueTime = (int)
66-
details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
67-
int processingTime = (int)
68-
details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
65+
int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE,
66+
RpcMetrics.getMetricsTimeUnit());
67+
int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING,
68+
RpcMetrics.getMetricsTimeUnit());
6969
addResponseTime(callName, schedulable.getPriorityLevel(),
7070
queueTime, processingTime);
7171
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -544,13 +544,13 @@ void logSlowRpcCalls(String methodName, Call call,
544544
(rpcMetrics.getProcessingStdDev() * deviation);
545545

546546
long processingTime =
547-
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
547+
details.get(Timing.PROCESSING, RpcMetrics.getMetricsTimeUnit());
548548
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
549549
(processingTime > threeSigma)) {
550550
LOG.warn(
551551
"Slow RPC : {} took {} {} to process from client {},"
552552
+ " the processing detail is {}",
553-
methodName, processingTime, RpcMetrics.TIMEUNIT, call,
553+
methodName, processingTime, RpcMetrics.getMetricsTimeUnit(), call,
554554
details.toString());
555555
rpcMetrics.incrSlowRpc();
556556
}
@@ -570,7 +570,7 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
570570
deltaNanos -= details.get(Timing.RESPONSE);
571571
details.set(Timing.HANDLER, deltaNanos);
572572

573-
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
573+
long queueTime = details.get(Timing.QUEUE, RpcMetrics.getMetricsTimeUnit());
574574
rpcMetrics.addRpcQueueTime(queueTime);
575575

576576
if (call.isResponseDeferred() || connDropped) {
@@ -579,9 +579,9 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
579579
}
580580

581581
long processingTime =
582-
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
582+
details.get(Timing.PROCESSING, RpcMetrics.getMetricsTimeUnit());
583583
long waitTime =
584-
details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
584+
details.get(Timing.LOCKWAIT, RpcMetrics.getMetricsTimeUnit());
585585
rpcMetrics.addRpcLockWaitTime(waitTime);
586586
rpcMetrics.addRpcProcessingTime(processingTime);
587587
// don't include lock wait for detailed metrics.

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.hadoop.ipc.RPC;
2224
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
2325
import org.apache.hadoop.fs.CommonConfigurationKeys;
2426
import org.apache.hadoop.ipc.Server;
@@ -49,7 +51,7 @@ public class RpcMetrics {
4951
final String name;
5052
final boolean rpcQuantileEnable;
5153
/** The time unit used when storing/accessing time durations. */
52-
public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
54+
private static TimeUnit metricsTimeUnit = TimeUnit.MILLISECONDS;
5355

5456
RpcMetrics(Server server, Configuration conf) {
5557
String port = String.valueOf(server.getListenerAddress().getPort());
@@ -75,19 +77,19 @@ public class RpcMetrics {
7577
for (int i = 0; i < intervals.length; i++) {
7678
int interval = intervals[i];
7779
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
78-
+ interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
80+
+ interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
7981
"latency", interval);
8082
rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
8183
"rpcLockWaitTime" + interval + "s",
82-
"rpc lock wait time in " + TIMEUNIT, "ops",
84+
"rpc lock wait time in " + metricsTimeUnit, "ops",
8385
"latency", interval);
8486
rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
8587
"rpcProcessingTime" + interval + "s",
86-
"rpc processing time in " + TIMEUNIT, "ops",
88+
"rpc processing time in " + metricsTimeUnit, "ops",
8789
"latency", interval);
8890
deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
8991
"deferredRpcProcessingTime" + interval + "s",
90-
"deferred rpc processing time in " + TIMEUNIT, "ops",
92+
"deferred rpc processing time in " + metricsTimeUnit, "ops",
9193
"latency", interval);
9294
}
9395
}
@@ -97,10 +99,27 @@ public class RpcMetrics {
9799
public String name() { return name; }
98100

99101
public static RpcMetrics create(Server server, Configuration conf) {
102+
if (server instanceof RPC.Server) {
103+
setMetricTimeUnit(conf);
104+
}
100105
RpcMetrics m = new RpcMetrics(server, conf);
101106
return DefaultMetricsSystem.instance().register(m.name, null, m);
102107
}
103108

109+
private static void setMetricTimeUnit(Configuration conf) {
110+
String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT);
111+
if (StringUtils.isNotEmpty(timeunit)) {
112+
try {
113+
metricsTimeUnit = TimeUnit.valueOf(timeunit);
114+
} catch (IllegalArgumentException e) {
115+
LOG.info("Config key {} 's value {} does not correspond to enum values"
116+
+ " of java.util.concurrent.TimeUnit. Hence default unit"
117+
+ " MILLISECONDS will be used",
118+
CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit);
119+
}
120+
}
121+
}
122+
104123
@Metric("Number of received bytes") MutableCounterLong receivedBytes;
105124
@Metric("Number of sent bytes") MutableCounterLong sentBytes;
106125
@Metric("Queue time") MutableRate rpcQueueTime;
@@ -141,6 +160,10 @@ public String numOpenConnectionsPerUser() {
141160
return server.getNumDroppedConnections();
142161
}
143162

163+
public static TimeUnit getMetricsTimeUnit() {
164+
return metricsTimeUnit;
165+
}
166+
144167
// Public instrumentation methods that could be extracted to an
145168
// abstract class if we decide to do custom instrumentation classes a la
146169
// JobTrackerInstrumentation. The methods with //@Override comment are

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3312,6 +3312,21 @@
33123312
</description>
33133313
</property>
33143314

3315+
<property>
3316+
<name>rpc.metrics.timeunit</name>
3317+
<value>MILLISECONDS</value>
3318+
<description>
3319+
This property is used to configure timeunit for various RPC Metrics
3320+
e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime,
3321+
deferredRpcProcessingTime. In the absence of this property,
3322+
default timeunit used is milliseconds.
3323+
The value of this property should match to any one value of enum:
3324+
java.util.concurrent.TimeUnit.
3325+
Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS,
3326+
SECONDS etc.
3327+
</description>
3328+
</property>
3329+
33153330
<property>
33163331
<name>rpc.metrics.percentiles.intervals</name>
33173332
<value></value>

hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ rpc
6565
---
6666

6767
Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics.
68+
`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics.
69+
The default timeunit used for RPC metrics is milliseconds (as per the below description).
6870

6971
| Name | Description |
7072
|:---- |:---- |

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,8 +1098,8 @@ public TestRpcService run() {
10981098
proxy.lockAndSleep(null, newSleepRequest(5));
10991099
rpcMetrics = getMetrics(server.getRpcMetrics().name());
11001100
assertGauge("RpcLockWaitTimeAvgTime",
1101-
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
1102-
rpcMetrics);
1101+
(double)(RpcMetrics.getMetricsTimeUnit().convert(10L,
1102+
TimeUnit.SECONDS)), rpcMetrics);
11031103
} finally {
11041104
if (proxy2 != null) {
11051105
RPC.stopProxy(proxy2);
@@ -1603,6 +1603,70 @@ public void testSetProtocolEngine() {
16031603
assertTrue(rpcEngine instanceof StoppedRpcEngine);
16041604
}
16051605

1606+
@Test
1607+
public void testRpcMetricsInNanos() throws Exception {
1608+
final Server server;
1609+
TestRpcService proxy = null;
1610+
1611+
final int interval = 1;
1612+
conf.setBoolean(CommonConfigurationKeys.
1613+
RPC_METRICS_QUANTILE_ENABLE, true);
1614+
conf.set(CommonConfigurationKeys.
1615+
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
1616+
conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS");
1617+
1618+
server = setupTestServer(conf, 5);
1619+
String testUser = "testUserInNanos";
1620+
UserGroupInformation anotherUser =
1621+
UserGroupInformation.createRemoteUser(testUser);
1622+
TestRpcService proxy2 =
1623+
anotherUser.doAs((PrivilegedAction<TestRpcService>) () -> {
1624+
try {
1625+
return RPC.getProxy(TestRpcService.class, 0,
1626+
server.getListenerAddress(), conf);
1627+
} catch (IOException e) {
1628+
LOG.error("Something went wrong.", e);
1629+
}
1630+
return null;
1631+
});
1632+
try {
1633+
proxy = getClient(addr, conf);
1634+
for (int i = 0; i < 1000; i++) {
1635+
proxy.ping(null, newEmptyRequest());
1636+
proxy.echo(null, newEchoRequest("" + i));
1637+
proxy2.echo(null, newEchoRequest("" + i));
1638+
}
1639+
MetricsRecordBuilder rpcMetrics =
1640+
getMetrics(server.getRpcMetrics().name());
1641+
assertEquals("Expected zero rpc lock wait time",
1642+
0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
1643+
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
1644+
rpcMetrics);
1645+
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
1646+
rpcMetrics);
1647+
1648+
proxy.lockAndSleep(null, newSleepRequest(5));
1649+
rpcMetrics = getMetrics(server.getRpcMetrics().name());
1650+
assertGauge("RpcLockWaitTimeAvgTime",
1651+
(double)(RpcMetrics.getMetricsTimeUnit().convert(10L,
1652+
TimeUnit.SECONDS)), rpcMetrics);
1653+
LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
1654+
getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
1655+
getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));
1656+
1657+
assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
1658+
> 4000000D);
1659+
assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
1660+
> 4000D);
1661+
} finally {
1662+
if (proxy2 != null) {
1663+
RPC.stopProxy(proxy2);
1664+
}
1665+
stop(server, proxy);
1666+
}
1667+
}
1668+
1669+
16061670
public static void main(String[] args) throws Exception {
16071671
new TestRPC().testCallsInternal(conf);
16081672
}

0 commit comments

Comments
 (0)