-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11122. Support getClusterNodes API in FederationClientInterceptor #4274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
7bd9675
2af73f5
87277d1
6630ce6
07f6090
1bf7332
b4867c0
aa49c3a
c6e404b
c7ccca5
407c039
7845199
f16651f
dde385a
72359e0
0887df5
dbd5cb3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,6 +55,8 @@ public final class RouterMetrics { | |
| private MutableGaugeInt numAppAttemptsFailedRetrieved; | ||
| @Metric("# of getClusterMetrics failed to be retrieved") | ||
| private MutableGaugeInt numGetClusterMetricsFailedRetrieved; | ||
| @Metric("# of getClusterNodes failed to be retrieved") | ||
| private MutableGaugeInt numGetClusterNodesFailedRetrieved; | ||
|
|
||
| // Aggregate metrics are shared, and don't have to be looked up per call | ||
| @Metric("Total number of successful Submitted apps and latency(ms)") | ||
|
|
@@ -74,7 +76,9 @@ public final class RouterMetrics { | |
| @Metric("Total number of successful Retrieved getClusterMetrics and " | ||
| + "latency(ms)") | ||
| private MutableRate totalSucceededGetClusterMetricsRetrieved; | ||
|
|
||
| @Metric("Total number of successful Retrieved getClusterNodes and " | ||
| + "latency(ms)") | ||
| private MutableRate totalSucceededGetClusterNodesRetrieved; | ||
|
|
||
| /** | ||
| * Provide quantile counters for all latencies. | ||
|
|
@@ -86,6 +90,7 @@ public final class RouterMetrics { | |
| private MutableQuantiles getApplicationsReportLatency; | ||
| private MutableQuantiles getApplicationAttemptReportLatency; | ||
| private MutableQuantiles getClusterMetricsLatency; | ||
| private MutableQuantiles getClusterNodesLatency; | ||
|
|
||
| private static volatile RouterMetrics INSTANCE = null; | ||
| private static MetricsRegistry registry; | ||
|
|
@@ -112,6 +117,10 @@ private RouterMetrics() { | |
| getClusterMetricsLatency = | ||
| registry.newQuantiles("getClusterMetricsLatency", | ||
| "latency of get cluster metrics", "ops", "latency", 10); | ||
|
|
||
| getClusterNodesLatency = | ||
| registry.newQuantiles("getClusterNodesLatency", | ||
|
||
| "latency of get cluster nodes", "ops", "latency", 10); | ||
| } | ||
|
|
||
| public static RouterMetrics getMetrics() { | ||
|
|
@@ -168,6 +177,11 @@ public long getNumSucceededGetClusterMetricsRetrieved(){ | |
| return totalSucceededGetClusterMetricsRetrieved.lastStat().numSamples(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public long getNumSucceededGetClusterNodesRetrieved(){ | ||
| return totalSucceededGetClusterNodesRetrieved.lastStat().numSamples(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public double getLatencySucceededAppsCreated() { | ||
| return totalSucceededAppsCreated.lastStat().mean(); | ||
|
|
@@ -203,6 +217,11 @@ public double getLatencySucceededGetClusterMetricsRetrieved() { | |
| return totalSucceededGetClusterMetricsRetrieved.lastStat().mean(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public double getLatencySucceededGetClusterNodesRetrieved() { | ||
| return totalSucceededGetClusterNodesRetrieved.lastStat().mean(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public int getAppsFailedCreated() { | ||
| return numAppsFailedCreated.value(); | ||
|
|
@@ -273,6 +292,11 @@ public void succeededGetClusterMetricsRetrieved(long duration) { | |
| getClusterMetricsLatency.add(duration); | ||
| } | ||
|
|
||
| public void succeededGetClusterNodesRetrieved(long duration) { | ||
| totalSucceededGetClusterNodesRetrieved.add(duration); | ||
| getClusterNodesLatency.add(duration); | ||
| } | ||
|
|
||
| public void incrAppsFailedCreated() { | ||
| numAppsFailedCreated.incr(); | ||
| } | ||
|
|
@@ -301,4 +325,7 @@ public void incrGetClusterMetricsFailedRetrieved() { | |
| numGetClusterMetricsFailedRetrieved.incr(); | ||
| } | ||
|
|
||
| public void incrClusterNodesFailedRetrieved() { | ||
| numGetClusterNodesFailedRetrieved.incr(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| package org.apache.hadoop.yarn.server.router.clientrm; | ||
|
|
||
| import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; | ||
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
| import java.io.IOException; | ||
| import java.lang.reflect.Method; | ||
|
|
@@ -790,8 +791,32 @@ <R> Map<SubClusterId, R> invokeConcurrent(Collection<SubClusterId> clusterIds, | |
|
|
||
| @Override | ||
| public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) | ||
| throws YarnException, IOException { | ||
| throw new NotImplementedException("Code is not implemented"); | ||
| throws YarnException, IOException { | ||
|
||
| if (request == null) { | ||
| routerMetrics.incrClusterNodesFailedRetrieved(); | ||
| RouterServerUtil.logAndThrowException( | ||
|
||
| "Missing getClusterNodes request.", null); | ||
| } | ||
| long startTime = clock.getTime(); | ||
| Map<SubClusterId, SubClusterInfo> subclusters = | ||
| federationFacade.getSubClusters(true); | ||
| Map<SubClusterId, GetClusterNodesResponse> clusterNodes = Maps.newHashMap(); | ||
| for (Map.Entry<SubClusterId, SubClusterInfo> entry : subclusters.entrySet()) { | ||
|
||
| SubClusterId subClusterId = entry.getKey(); | ||
| ApplicationClientProtocol client = null; | ||
| try { | ||
| client = getClientRMProxyForSubCluster(subClusterId); | ||
| GetClusterNodesResponse response = client.getClusterNodes(request); | ||
| clusterNodes.put(subClusterId, response); | ||
| } catch (Exception ex) { | ||
| routerMetrics.incrClusterNodesFailedRetrieved(); | ||
| LOG.error("Unable to get cluster nodes due to exception.", ex); | ||
| } | ||
| } | ||
| long stopTime = clock.getTime(); | ||
| routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime); | ||
| // Merge the NodesResponse | ||
| return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,17 +17,14 @@ | |
| */ | ||
| package org.apache.hadoop.yarn.server.router.clientrm; | ||
|
|
||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.*; | ||
|
||
|
|
||
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; | ||
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationId; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationReport; | ||
| import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; | ||
| import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; | ||
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; | ||
| import org.apache.hadoop.yarn.api.records.*; | ||
| import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; | ||
| import org.apache.hadoop.yarn.util.Records; | ||
| import org.apache.hadoop.yarn.util.resource.Resources; | ||
|
|
||
| /** | ||
|
|
@@ -194,4 +191,24 @@ private static boolean mergeUamToReport(String appName, | |
| return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) || | ||
| appName.startsWith(PARTIAL_REPORT)); | ||
| } | ||
|
|
||
| /** | ||
| * Merges a list of GetClusterNodesResponse. | ||
| * | ||
| * @param responses a list of GetClusterNodesResponse to merge | ||
| * @return the merged GetClusterNodesResponse | ||
| */ | ||
| public static GetClusterNodesResponse mergeClusterNodesResponse( | ||
| Collection<GetClusterNodesResponse> responses) { | ||
| GetClusterNodesResponse clusterNodesResponse = Records.newRecord( | ||
| GetClusterNodesResponse.class); | ||
|
||
| List<NodeReport> nodeReports = new ArrayList<>(); | ||
| for (GetClusterNodesResponse response : responses) { | ||
| if (response != null && response.getNodeReports() != null) { | ||
| nodeReports.addAll(response.getNodeReports()); | ||
| } | ||
| } | ||
| clusterNodesResponse.setNodeReports(nodeReports); | ||
| return clusterNodesResponse; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with the new checkstyle, you could make this one line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be a line indeed, I'll fix it.