Skip to content

Commit 6896c35

Browse files
authored
YARN-11122. Support getClusterNodes API in FederationClientInterceptor (apache#4274)
1 parent 6985f9a commit 6896c35

5 files changed

Lines changed: 127 additions & 2 deletions

File tree

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public final class RouterMetrics {
5555
private MutableGaugeInt numAppAttemptsFailedRetrieved;
5656
@Metric("# of getClusterMetrics failed to be retrieved")
5757
private MutableGaugeInt numGetClusterMetricsFailedRetrieved;
58+
@Metric("# of getClusterNodes failed to be retrieved")
59+
private MutableGaugeInt numGetClusterNodesFailedRetrieved;
5860

5961
// Aggregate metrics are shared, and don't have to be looked up per call
6062
@Metric("Total number of successful Submitted apps and latency(ms)")
@@ -74,7 +76,8 @@ public final class RouterMetrics {
7476
@Metric("Total number of successful Retrieved getClusterMetrics and "
7577
+ "latency(ms)")
7678
private MutableRate totalSucceededGetClusterMetricsRetrieved;
77-
79+
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
80+
private MutableRate totalSucceededGetClusterNodesRetrieved;
7881

7982
/**
8083
* Provide quantile counters for all latencies.
@@ -86,6 +89,7 @@ public final class RouterMetrics {
8689
private MutableQuantiles getApplicationsReportLatency;
8790
private MutableQuantiles getApplicationAttemptReportLatency;
8891
private MutableQuantiles getClusterMetricsLatency;
92+
private MutableQuantiles getClusterNodesLatency;
8993

9094
private static volatile RouterMetrics INSTANCE = null;
9195
private static MetricsRegistry registry;
@@ -112,6 +116,10 @@ private RouterMetrics() {
112116
getClusterMetricsLatency =
113117
registry.newQuantiles("getClusterMetricsLatency",
114118
"latency of get cluster metrics", "ops", "latency", 10);
119+
120+
getClusterNodesLatency =
121+
registry.newQuantiles("getClusterNodesLatency",
122+
"latency of get cluster nodes", "ops", "latency", 10);
115123
}
116124

117125
public static RouterMetrics getMetrics() {
@@ -168,6 +176,11 @@ public long getNumSucceededGetClusterMetricsRetrieved(){
168176
return totalSucceededGetClusterMetricsRetrieved.lastStat().numSamples();
169177
}
170178

179+
@VisibleForTesting
180+
public long getNumSucceededGetClusterNodesRetrieved(){
181+
return totalSucceededGetClusterNodesRetrieved.lastStat().numSamples();
182+
}
183+
171184
@VisibleForTesting
172185
public double getLatencySucceededAppsCreated() {
173186
return totalSucceededAppsCreated.lastStat().mean();
@@ -203,6 +216,11 @@ public double getLatencySucceededGetClusterMetricsRetrieved() {
203216
return totalSucceededGetClusterMetricsRetrieved.lastStat().mean();
204217
}
205218

219+
@VisibleForTesting
220+
public double getLatencySucceededGetClusterNodesRetrieved() {
221+
return totalSucceededGetClusterNodesRetrieved.lastStat().mean();
222+
}
223+
206224
@VisibleForTesting
207225
public int getAppsFailedCreated() {
208226
return numAppsFailedCreated.value();
@@ -238,6 +256,11 @@ public int getClusterMetricsFailedRetrieved() {
238256
return numGetClusterMetricsFailedRetrieved.value();
239257
}
240258

259+
@VisibleForTesting
260+
public int getClusterNodesFailedRetrieved() {
261+
return numGetClusterNodesFailedRetrieved.value();
262+
}
263+
241264
public void succeededAppsCreated(long duration) {
242265
totalSucceededAppsCreated.add(duration);
243266
getNewApplicationLatency.add(duration);
@@ -273,6 +296,11 @@ public void succeededGetClusterMetricsRetrieved(long duration) {
273296
getClusterMetricsLatency.add(duration);
274297
}
275298

299+
public void succeededGetClusterNodesRetrieved(long duration) {
300+
totalSucceededGetClusterNodesRetrieved.add(duration);
301+
getClusterNodesLatency.add(duration);
302+
}
303+
276304
public void incrAppsFailedCreated() {
277305
numAppsFailedCreated.incr();
278306
}
@@ -301,4 +329,7 @@ public void incrGetClusterMetricsFailedRetrieved() {
301329
numGetClusterMetricsFailedRetrieved.incr();
302330
}
303331

332+
public void incrClusterNodesFailedRetrieved() {
333+
numGetClusterNodesFailedRetrieved.incr();
334+
}
304335
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.yarn.server.router.clientrm;
2020

21+
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
2122
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
2223
import java.io.IOException;
2324
import java.lang.reflect.Method;
@@ -791,7 +792,30 @@ <R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds,
791792
@Override
792793
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
793794
throws YarnException, IOException {
794-
throw new NotImplementedException("Code is not implemented");
795+
if (request == null) {
796+
routerMetrics.incrClusterNodesFailedRetrieved();
797+
RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null);
798+
}
799+
long startTime = clock.getTime();
800+
Map<SubClusterId, SubClusterInfo> subClusters =
801+
federationFacade.getSubClusters(true);
802+
Map<SubClusterId, GetClusterNodesResponse> clusterNodes = Maps.newHashMap();
803+
for (SubClusterId subClusterId : subClusters.keySet()) {
804+
ApplicationClientProtocol client;
805+
try {
806+
client = getClientRMProxyForSubCluster(subClusterId);
807+
GetClusterNodesResponse response = client.getClusterNodes(request);
808+
clusterNodes.put(subClusterId, response);
809+
} catch (Exception ex) {
810+
routerMetrics.incrClusterNodesFailedRetrieved();
811+
LOG.error("Unable to get cluster nodes due to exception.", ex);
812+
throw ex;
813+
}
814+
}
815+
long stopTime = clock.getTime();
816+
routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime);
817+
// Merge the NodesResponse
818+
return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values());
795819
}
796820

797821
@Override

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,19 @@
2020
import java.util.Collection;
2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.List;
24+
import java.util.ArrayList;
2325

2426
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
2527
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
28+
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
2629
import org.apache.hadoop.yarn.api.records.ApplicationId;
2730
import org.apache.hadoop.yarn.api.records.ApplicationReport;
2831
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
2932
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
33+
import org.apache.hadoop.yarn.api.records.NodeReport;
3034
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
35+
import org.apache.hadoop.yarn.util.Records;
3136
import org.apache.hadoop.yarn.util.resource.Resources;
3237

3338
/**
@@ -194,4 +199,23 @@ private static boolean mergeUamToReport(String appName,
194199
return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
195200
appName.startsWith(PARTIAL_REPORT));
196201
}
202+
203+
/**
204+
* Merges a list of GetClusterNodesResponse.
205+
*
206+
* @param responses a list of GetClusterNodesResponse to merge.
207+
* @return the merged GetClusterNodesResponse.
208+
*/
209+
public static GetClusterNodesResponse mergeClusterNodesResponse(
210+
Collection<GetClusterNodesResponse> responses) {
211+
GetClusterNodesResponse clusterNodesResponse = Records.newRecord(GetClusterNodesResponse.class);
212+
List<NodeReport> nodeReports = new ArrayList<>();
213+
for (GetClusterNodesResponse response : responses) {
214+
if (response != null && response.getNodeReports() != null) {
215+
nodeReports.addAll(response.getNodeReports());
216+
}
217+
}
218+
clusterNodesResponse.setNodeReports(nodeReports);
219+
return clusterNodesResponse;
220+
}
197221
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public class TestRouterMetrics {
3838

3939
private static RouterMetrics metrics = RouterMetrics.getMetrics();
4040

41+
private static final Double ASSERT_DOUBLE_DELTA = 0.01;
42+
4143
@BeforeClass
4244
public static void init() {
4345

@@ -346,6 +348,11 @@ public void getClusterMetrics() {
346348
LOG.info("Mocked: failed getClusterMetrics call");
347349
metrics.incrGetClusterMetricsFailedRetrieved();
348350
}
351+
352+
public void getClusterNodes() {
353+
LOG.info("Mocked: failed getClusterNodes call");
354+
metrics.incrClusterNodesFailedRetrieved();
355+
}
349356
}
350357

351358
// Records successes for all calls
@@ -392,5 +399,30 @@ public void getClusterMetrics(long duration){
392399
duration);
393400
metrics.succeededGetClusterMetricsRetrieved(duration);
394401
}
402+
403+
public void getClusterNodes(long duration) {
404+
LOG.info("Mocked: successful getClusterNodes call with duration {}", duration);
405+
metrics.succeededGetClusterNodesRetrieved(duration);
406+
}
407+
}
408+
409+
@Test
410+
public void testSucceededGetClusterNodes() {
411+
long totalGoodBefore = metrics.getNumSucceededGetClusterNodesRetrieved();
412+
goodSubCluster.getClusterNodes(150);
413+
Assert.assertEquals(totalGoodBefore + 1, metrics.getNumSucceededGetClusterNodesRetrieved());
414+
Assert.assertEquals(150, metrics.getLatencySucceededGetClusterNodesRetrieved(),
415+
ASSERT_DOUBLE_DELTA);
416+
goodSubCluster.getClusterNodes(300);
417+
Assert.assertEquals(totalGoodBefore + 2, metrics.getNumSucceededGetClusterNodesRetrieved());
418+
Assert.assertEquals(225, metrics.getLatencySucceededGetClusterNodesRetrieved(),
419+
ASSERT_DOUBLE_DELTA);
420+
}
421+
422+
@Test
423+
public void testGetClusterNodesFailed() {
424+
long totalBadBefore = metrics.getClusterNodesFailedRetrieved();
425+
badSubCluster.getClusterNodes();
426+
Assert.assertEquals(totalBadBefore + 1, metrics.getClusterNodesFailedRetrieved());
395427
}
396428
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
4545
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
4646
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
47+
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
48+
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
4749
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
4850
import org.apache.hadoop.yarn.api.records.ApplicationId;
4951
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -641,4 +643,16 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception{
641643
Assert.assertNotNull(responseGet);
642644
Assert.assertTrue(responseGet.getApplicationList().isEmpty());
643645
}
646+
647+
@Test
648+
public void testGetClusterNodesRequest() throws Exception {
649+
LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request");
650+
// null request
651+
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.",
652+
() -> interceptor.getClusterNodes(null));
653+
// normal request.
654+
GetClusterNodesResponse response =
655+
interceptor.getClusterNodes(GetClusterNodesRequest.newInstance());
656+
Assert.assertEquals(subClusters.size(), response.getNodeReports().size());
657+
}
644658
}

0 commit comments

Comments
 (0)