From 73bc5d3ac0ec2627f3704259cc25bb1068295dce Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 2 Jun 2022 20:23:25 -0700 Subject: [PATCH 1/7] YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation. --- .../yarn/server/router/RouterMetrics.java | 109 +++++++++++++++- .../clientrm/FederationClientInterceptor.java | 117 +++++++++++++++++- 2 files changed, 218 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 1399b2f4b228b..b02b3e155fa18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -75,6 +75,12 @@ public final class RouterMetrics { private MutableGaugeInt numListReservationsFailedRetrieved; @Metric("# of getResourceTypeInfo failed to be retrieved") private MutableGaugeInt numGetResourceTypeInfo; + @Metric("# of failApplicationAttempt failed to be retrieved") + private MutableGaugeInt numFailAppAttemptFailedRetrieved; + @Metric("# of updateApplicationPriority failed to be retrieved") + private MutableGaugeInt numUpdateAppPriorityFailedRetrieved; + @Metric("# of updateApplicationPriority failed to be retrieved") + private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -114,6 +120,12 @@ public final class RouterMetrics { private MutableRate totalSucceededListReservationsRetrieved; @Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)") private MutableRate totalSucceededGetResourceTypeInfoRetrieved; + @Metric("Total number of successful Retrieved failApplicationAttempt and latency(ms)") + private MutableRate totalSucceededFailAppAttemptRetrieved; + @Metric("Total number of successful Retrieved updateApplicationPriority and latency(ms)") + private MutableRate totalSucceededUpdateAppPriorityRetrieved; + @Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)") + private MutableRate totalSucceededUpdateAppTimeoutsRetrieved; /** * Provide quantile counters for all latencies. @@ -135,8 +147,11 @@ public final class RouterMetrics { private MutableQuantiles getContainerLatency; private MutableQuantiles listReservationsLatency; private MutableQuantiles listResourceTypeInfoLatency; + private MutableQuantiles failAppAttemptLatency; + private MutableQuantiles updateAppPriorityLatency; + private MutableQuantiles updateAppTimeoutsLatency; - private static volatile RouterMetrics INSTANCE = null; + private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; private RouterMetrics() { @@ -201,25 +216,37 @@ private RouterMetrics() { listResourceTypeInfoLatency = registry.newQuantiles("getResourceTypeInfoLatency", "latency of get resource type info", "ops", "latency", 10); + + failAppAttemptLatency = + registry.newQuantiles("failApplicationAttemptLatency", + "latency of fail application attempt", "ops", "latency", 10); + + updateAppPriorityLatency = + registry.newQuantiles("updateApplicationPriorityLatency", + "latency of update application priority", "ops", "latency", 10); + + updateAppTimeoutsLatency = + registry.newQuantiles("updateApplicationTimeoutsLatency", + "latency of update application timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { if (!isInitialized.get()) { synchronized (RouterMetrics.class) { - if (INSTANCE == null) { - INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics", + if (instance == null) { + instance = DefaultMetricsSystem.instance().register("RouterMetrics", "Metrics for the Yarn Router", new RouterMetrics()); isInitialized.set(true); } } } - return INSTANCE; + return instance; } @VisibleForTesting synchronized static void destroy() { isInitialized.set(false); - INSTANCE = null; + instance = null; } @VisibleForTesting @@ -307,6 +334,21 @@ public long getNumSucceededGetResourceTypeInfoRetrieved() { return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededFailAppAttemptRetrieved() { + return totalSucceededFailAppAttemptRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededUpdateAppPriorityRetrieved() { + return totalSucceededUpdateAppPriorityRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededUpdateAppTimeoutsRetrieved() { + return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -392,6 +434,21 @@ public double getLatencySucceededGetResourceTypeInfoRetrieved() { return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededFailAppAttemptRetrieved() { + return totalSucceededFailAppAttemptRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededUpdateAppPriorityRetrieved() { + return totalSucceededUpdateAppPriorityRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededUpdateAppTimeoutsRetrieved() { + return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -477,6 +534,21 @@ public int getGetResourceTypeInfoRetrieved() { return numGetResourceTypeInfo.value(); } + @VisibleForTesting + public int getFailApplicationAttemptFailedRetrieved() { + return numFailAppAttemptFailedRetrieved.value(); + } + + @VisibleForTesting + public int getUpdateApplicationPriorityFailedRetrieved() { + return numUpdateAppPriorityFailedRetrieved.value(); + } + + @VisibleForTesting + public int getUpdateApplicationTimeoutsFailedRetrieved() { + return numUpdateAppTimeoutsFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -562,6 +634,21 @@ public void succeededGetResourceTypeInfoRetrieved(long duration) { listResourceTypeInfoLatency.add(duration); } + public void succeededFailAppAttemptRetrieved(long duration) { + totalSucceededFailAppAttemptRetrieved.add(duration); + failAppAttemptLatency.add(duration); + } + + public void succeededUpdateAppPriorityRetrieved(long duration) { + totalSucceededUpdateAppPriorityRetrieved.add(duration); + updateAppPriorityLatency.add(duration); + } + + public void succeededUpdateAppTimeoutsRetrieved(long duration) { + totalSucceededUpdateAppTimeoutsRetrieved.add(duration); + updateAppTimeoutsLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -629,4 +716,16 @@ public void incrListReservationsFailedRetrieved() { public void incrResourceTypeInfoFailedRetrieved() { numGetResourceTypeInfo.incr(); } + + public void incrFailAppAttemptFailedRetrieved() { + numFailAppAttemptFailedRetrieved.incr(); + } + + public void incrUpdateAppPriorityFailedRetrieved() { + numUpdateAppPriorityFailedRetrieved.incr(); + } + + public void incrUpdateApplicationTimeoutsRetrieved() { + numUpdateAppTimeoutsFailedRetrieved.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index f92e3566ca2be..55d1ff92e733c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -1213,14 +1213,91 @@ public CancelDelegationTokenResponse cancelDelegationToken( @Override public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationAttemptId() == null + || request.getApplicationAttemptId().getApplicationId() == null) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing failApplicationAttempt request or applicationId " + + "or applicationAttemptId information.", null); + } + long startTime = clock.getTime(); + SubClusterId subClusterId = null; + ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId(); + + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException("ApplicationAttempt " + + request.getApplicationAttemptId() + " belongs to Application " + + request.getApplicationAttemptId().getApplicationId() + + " does not exist in FederationStateStore.", e); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + FailApplicationAttemptResponse response; + try { + response = clientRMProxy.failApplicationAttempt(request); + } catch (Exception e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + LOG.error("Unable to get the applicationAttempt report for {} " + + "to SubCluster {}, error = {}.", + request.getApplicationAttemptId(), subClusterId.getId(), e); + throw e; + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the report of " + + "the applicationAttempt {} to SubCluster {}.", + request.getApplicationAttemptId(), subClusterId.getId()); + } + + long stopTime = clock.getTime(); + routerMetrics.succeededFailAppAttemptRetrieved(stopTime - startTime); + return response; } @Override public UpdateApplicationPriorityResponse updateApplicationPriority( UpdateApplicationPriorityRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationId() == null + || request.getApplicationPriority() == null) { + routerMetrics.incrUpdateAppPriorityFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing updateApplicationPriority request or applicationId " + + "or applicationPriority information.", null); + } + + long startTime = clock.getTime(); + SubClusterId subClusterId = null; + ApplicationId applicationId = request.getApplicationId(); + + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException e) { + routerMetrics.incrUpdateAppPriorityFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + + request.getApplicationId() + + " does not exist in FederationStateStore.", e); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + + UpdateApplicationPriorityResponse response; + try { + response = clientRMProxy.updateApplicationPriority(request); + } catch (Exception e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + LOG.error("Unable to update application priority for {} " + + "to SubCluster {}, error = {}.", + request.getApplicationId(), subClusterId.getId(), e); + throw e; + } + + long stopTime = clock.getTime(); + routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime); + return response; } @Override @@ -1233,7 +1310,41 @@ public SignalContainerResponse signalToContainer( public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( UpdateApplicationTimeoutsRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getApplicationId() == null + || request.getApplicationTimeouts() == null) { + routerMetrics.incrUpdateApplicationTimeoutsRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing updateApplicationTimeouts request or applicationId " + + "or applicationTimeouts information.", null); + } + + long startTime = clock.getTime(); + SubClusterId subClusterId = null; + ApplicationId applicationId = request.getApplicationId(); + try { + subClusterId = getApplicationHomeSubCluster(applicationId); + } catch (YarnException e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + RouterServerUtil.logAndThrowException("Application " + + request.getApplicationId() + + " does not exist in FederationStateStore.", e); + } + + ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); + UpdateApplicationTimeoutsResponse response; + try { + response = clientRMProxy.updateApplicationTimeouts(request); + } catch (Exception e) { + routerMetrics.incrFailAppAttemptFailedRetrieved(); + LOG.error("Unable to update application priority for {} " + + "to SubCluster {}, error = {}.", + request.getApplicationId(), subClusterId.getId(), e); + throw e; + } + + long stopTime = clock.getTime(); + routerMetrics.succeededUpdateAppTimeoutsRetrieved(stopTime - startTime); + return response; } @Override From 25fa852d37d435cf3ad99ac86664f2b0c91e2705 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 3 Jun 2022 02:22:00 -0700 Subject: [PATCH 2/7] YARN-11159. Add Metrics Junit Test. --- .../yarn/server/router/TestRouterMetrics.java | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index a4df82f9dcbfc..4b1049e8b647e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -398,6 +398,21 @@ public void getResourceTypeInfo() { LOG.info("Mocked: failed getResourceTypeInfo call"); metrics.incrResourceTypeInfoFailedRetrieved(); } + + public void getFailApplicationAttempt() { + LOG.info("Mocked: failed failApplicationAttempt call"); + metrics.incrFailAppAttemptFailedRetrieved(); + } + + public void getUpdateApplicationPriority() { + LOG.info("Mocked: failed updateApplicationPriority call"); + metrics.incrUpdateAppPriorityFailedRetrieved(); + } + + public void getUpdateApplicationTimeouts() { + LOG.info("Mocked: failed updateApplicationTimeouts call"); + metrics.incrUpdateApplicationTimeoutsRetrieved(); + } } // Records successes for all calls @@ -493,6 +508,21 @@ public void getResourceTypeInfo(long duration) { LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration); metrics.succeededGetResourceTypeInfoRetrieved(duration); } + + public void getFailApplicationAttempt(long duration) { + LOG.info("Mocked: successful failApplicationAttempt call with duration {}", duration); + metrics.succeededFailAppAttemptRetrieved(duration); + } + + public void getUpdateApplicationPriority(long duration) { + LOG.info("Mocked: successful updateApplicationPriority call with duration {}", duration); + metrics.succeededUpdateAppPriorityRetrieved(duration); + } + + public void getUpdateApplicationTimeouts(long duration) { + LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration); + metrics.succeededUpdateAppTimeoutsRetrieved(duration); + } } @Test @@ -708,4 +738,72 @@ public void testGetResourceTypeInfoFailed() { Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved()); } + @Test + public void testSucceededFailApplicationAttempt() { + long totalGoodBefore = metrics.getNumSucceededFailAppAttemptRetrieved(); + goodSubCluster.getFailApplicationAttempt(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededFailAppAttemptRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getFailApplicationAttempt(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededFailAppAttemptRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testFailApplicationAttemptFailed() { + long totalBadBefore = metrics.getFailApplicationAttemptFailedRetrieved(); + badSubCluster.getFailApplicationAttempt(); + Assert.assertEquals(totalBadBefore + 1, metrics.getFailApplicationAttemptFailedRetrieved()); + } + + @Test + public void testSucceededUpdateApplicationPriority() { + long totalGoodBefore = metrics.getNumSucceededUpdateAppPriorityRetrieved(); + goodSubCluster.getUpdateApplicationPriority(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededUpdateAppPriorityRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getUpdateApplicationPriority(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededUpdateAppPriorityRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testUpdateApplicationPriorityFailed() { + long totalBadBefore = metrics.getUpdateApplicationPriorityFailedRetrieved(); + badSubCluster.getUpdateApplicationPriority(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getUpdateApplicationPriorityFailedRetrieved()); + } + + @Test + public void testSucceededUpdateAppTimeoutsRetrieved() { + long totalGoodBefore = metrics.getNumSucceededUpdateAppTimeoutsRetrieved(); + goodSubCluster.getUpdateApplicationTimeouts(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededUpdateAppTimeoutsRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getUpdateApplicationTimeouts(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededUpdateAppTimeoutsRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testUpdateAppTimeoutsFailed() { + long totalBadBefore = metrics.getUpdateApplicationTimeoutsFailedRetrieved(); + badSubCluster.getUpdateApplicationTimeouts(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getUpdateApplicationTimeoutsFailedRetrieved()); + } + } From 3e755e3b8a3a521ed86181abf353de3974ca1305 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 3 Jun 2022 04:11:40 -0700 Subject: [PATCH 3/7] YARN-11159. Add FederationClientInterceptor Junit Test. --- .../clientrm/FederationClientInterceptor.java | 20 +- .../TestFederationClientInterceptor.java | 174 ++++++++++++++++-- .../src/test/resources/yarn-site.xml | 4 + 3 files changed, 181 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 55d1ff92e733c..7946284769329 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -1241,7 +1241,7 @@ public FailApplicationAttemptResponse failApplicationAttempt( } catch (Exception e) { routerMetrics.incrFailAppAttemptFailedRetrieved(); LOG.error("Unable to get the applicationAttempt report for {} " + - "to SubCluster {}, error = {}.", + "to SubCluster {}.", request.getApplicationAttemptId(), subClusterId.getId(), e); throw e; } @@ -1290,11 +1290,17 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( } catch (Exception e) { routerMetrics.incrFailAppAttemptFailedRetrieved(); LOG.error("Unable to update application priority for {} " + - "to SubCluster {}, error = {}.", + "to SubCluster {}.", request.getApplicationId(), subClusterId.getId(), e); throw e; } + if (response == null) { + LOG.error("No response when update application priority of " + + "the applicationId {} to SubCluster {}.", + applicationId, subClusterId.getId()); + } + long stopTime = clock.getTime(); routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime); return response; @@ -1336,12 +1342,18 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( response = clientRMProxy.updateApplicationTimeouts(request); } catch (Exception e) { routerMetrics.incrFailAppAttemptFailedRetrieved(); - LOG.error("Unable to update application priority for {} " + - "to SubCluster {}, error = {}.", + LOG.error("Unable to update application timeout for {} " + + "to SubCluster {}.", request.getApplicationId(), subClusterId.getId(), e); throw e; } + if (response == null) { + LOG.error("No response when update application timeout of " + + "the applicationId {} to SubCluster {}.", + applicationId, subClusterId.getId()); + } + long stopTime = clock.getTime(); routerMetrics.succeededUpdateAppTimeoutsRetrieved(stopTime - startTime); return response; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 8fa52e8f92bb3..c71dba8f09cec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.Set; import java.util.stream.Collectors; @@ -65,6 +66,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -75,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -83,6 +91,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -109,6 +118,8 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { private final static int NUM_SUBCLUSTER = 4; + private final static int APP_PRIORITY_ZERO = 10; + @Override public void setUp() { super.setUpConfig(); @@ -197,7 +208,7 @@ public void testSubmitApplication() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -208,11 +219,11 @@ public void testSubmitApplication() } private SubmitApplicationRequest mockSubmitApplicationRequest( - ApplicationId appId) { + ApplicationId appId, int priority) { ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ApplicationSubmissionContext context = ApplicationSubmissionContext .newInstance(appId, MockApps.newAppName(), "default", - Priority.newInstance(0), amContainerSpec, false, false, -1, + Priority.newInstance(priority), amContainerSpec, false, false, -1, Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), "MockApp"); @@ -234,7 +245,7 @@ public void testSubmitApplicationMultipleSubmission() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); // First attempt SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -302,7 +313,7 @@ public void testForceKillApplication() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); // Submit the application we are going to kill later SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -377,7 +388,7 @@ public void testGetApplicationReport() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); // Submit the application we want the report later SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -457,7 +468,7 @@ public void testGetApplicationAttemptReport() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); // Submit the application we want the applicationAttempt report later SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -586,7 +597,7 @@ public void testGetApplicationsResponse() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); SubmitApplicationResponse response = interceptor.submitApplication(request); Assert.assertNotNull(response); @@ -628,7 +639,7 @@ public void testGetApplicationsApplicationTypeNotExists() throws Exception{ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); SubmitApplicationResponse response = interceptor.submitApplication(request); Assert.assertNotNull(response); @@ -659,7 +670,7 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); SubmitApplicationResponse response = interceptor.submitApplication(request); Assert.assertNotNull(response); @@ -782,7 +793,7 @@ public void testGetContainersRequest() throws Exception { // normal request ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -825,7 +836,7 @@ public void testGetContainerReportRequest() throws Exception { // normal request ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -870,7 +881,7 @@ public void getApplicationAttempts() throws Exception { // normal request ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -898,4 +909,141 @@ public void testGetResourceTypeInfoRequest() throws Exception { interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance()); Assert.assertEquals(2, response.getResourceTypeInfo().size()); } + + @Test + public void testFailApplicationAttempt() throws Exception { + LOG.info("Test FederationClientInterceptor : Fail Application Attempt request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " + + "or applicationId or applicationAttemptId information." , + () -> interceptor.failApplicationAttempt(null)); + + // normal request + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + + // Wait for app to start + while(attemptsResponse.getApplicationAttemptList().size() == 0) { + attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + } + Assert.assertNotNull(attemptsResponse); + + ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList(). + get(0).getApplicationAttemptId(); + + FailApplicationAttemptRequest requestFailAppAttempt = + FailApplicationAttemptRequest.newInstance(attemptId); + FailApplicationAttemptResponse responseFailAppAttempt = + interceptor.failApplicationAttempt(requestFailAppAttempt); + + Assert.assertNotNull(responseFailAppAttempt); + } + + @Test + public void testUpdateApplicationPriority() throws Exception { + LOG.info("Test FederationClientInterceptor : Update Application Priority request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " + + "or applicationId or applicationPriority information." , + () -> interceptor.updateApplicationPriority(null)); + + // normal request + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + + // Wait for app to start + while(attemptsResponse.getApplicationAttemptList().size() == 0) { + attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + } + Assert.assertNotNull(attemptsResponse); + + Priority priority = Priority.newInstance(20); + UpdateApplicationPriorityRequest requestUpdateAppPriority = + UpdateApplicationPriorityRequest.newInstance(appId, priority); + UpdateApplicationPriorityResponse responseAppPriority = + interceptor.updateApplicationPriority(requestUpdateAppPriority); + + Assert.assertNotNull(responseAppPriority); + Assert.assertEquals(20, + responseAppPriority.getApplicationPriority().getPriority()); + } + + @Test + public void testUpdateApplicationTimeouts() throws Exception { + LOG.info("Test FederationClientInterceptor : Update Application Timeouts request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationTimeouts request " + + "or applicationId or applicationTimeouts information.", + () -> interceptor.updateApplicationTimeouts(null)); + + // normal request + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + + // Submit the application + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + // Call GetApplicationAttempts + GetApplicationAttemptsRequest attemptsRequest = + GetApplicationAttemptsRequest.newInstance(appId); + GetApplicationAttemptsResponse attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + + // Wait for app to start + while(attemptsResponse.getApplicationAttemptList().size() == 0) { + attemptsResponse = + interceptor.getApplicationAttempts(attemptsRequest); + } + Assert.assertNotNull(attemptsResponse); + + String formatISO8601 = + Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); + Map applicationTimeouts = new HashMap<>(); + applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, formatISO8601); + + UpdateApplicationTimeoutsRequest timeoutsRequest = + UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts); + UpdateApplicationTimeoutsResponse timeoutsResponse = + interceptor.updateApplicationTimeouts(timeoutsRequest); + + String responseTimeOut = + timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); + Assert.assertNotNull(timeoutsResponse); + Assert.assertEquals(formatISO8601, responseTimeOut); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml index f3e0de3604b60..310a1612486bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/resources/yarn-site.xml @@ -27,4 +27,8 @@ yarn.resourcemanager.webapp.address 0.0.0.0:8080 + + yarn.cluster.max-application-priority + 50 + From c1be4c1f3eec5a5b3b820244cb24a22be17547b7 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 4 Jun 2022 19:33:15 -0700 Subject: [PATCH 4/7] YARN-11159. Fix CheckStyle And Replace logAndThrowException. --- .../clientrm/FederationClientInterceptor.java | 19 ++++++------------- .../TestFederationClientInterceptor.java | 4 ++-- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 7946284769329..f4feaa3ea99fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -1240,10 +1240,8 @@ public FailApplicationAttemptResponse failApplicationAttempt( response = clientRMProxy.failApplicationAttempt(request); } catch (Exception e) { routerMetrics.incrFailAppAttemptFailedRetrieved(); - LOG.error("Unable to get the applicationAttempt report for {} " + - "to SubCluster {}.", - request.getApplicationAttemptId(), subClusterId.getId(), e); - throw e; + RouterServerUtil.logAndThrowException("Unable to get the applicationAttempt report for " + + request.getApplicationAttemptId() + " to SubCluster " + subClusterId.getId(), e); } if (response == null) { @@ -1283,16 +1281,13 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( } ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - UpdateApplicationPriorityResponse response; try { response = clientRMProxy.updateApplicationPriority(request); } catch (Exception e) { routerMetrics.incrFailAppAttemptFailedRetrieved(); - LOG.error("Unable to update application priority for {} " + - "to SubCluster {}.", - request.getApplicationId(), subClusterId.getId(), e); - throw e; + RouterServerUtil.logAndThrowException("Unable to update application priority for " + + request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e); } if (response == null) { @@ -1342,10 +1337,8 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( response = clientRMProxy.updateApplicationTimeouts(request); } catch (Exception e) { routerMetrics.incrFailAppAttemptFailedRetrieved(); - LOG.error("Unable to update application timeout for {} " + - "to SubCluster {}.", - request.getApplicationId(), subClusterId.getId(), e); - throw e; + RouterServerUtil.logAndThrowException("Unable to update application timeout for " + + request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e); } if (response == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index c71dba8f09cec..51f1dd826318e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -916,7 +916,7 @@ public void testFailApplicationAttempt() throws Exception { // null request LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " + - "or applicationId or applicationAttemptId information." , + "or applicationId or applicationAttemptId information.", () -> interceptor.failApplicationAttempt(null)); // normal request @@ -960,7 +960,7 @@ public void testUpdateApplicationPriority() throws Exception { // null request LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " + - "or applicationId or applicationPriority information." , + "or applicationId or applicationPriority information.", () -> interceptor.updateApplicationPriority(null)); // normal request From 561cca02792094bbab9a4a694752c498ef4de01e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 4 Jun 2022 21:41:39 -0700 Subject: [PATCH 5/7] YARN-11159. Fix CheckStyle And Replace logAndThrowException. --- .../server/router/clientrm/FederationClientInterceptor.java | 6 +++--- .../router/clientrm/TestFederationClientInterceptor.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index f4feaa3ea99fd..fec62d4b0804f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -1235,7 +1235,7 @@ public FailApplicationAttemptResponse failApplicationAttempt( } ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - FailApplicationAttemptResponse response; + FailApplicationAttemptResponse response = null; try { response = clientRMProxy.failApplicationAttempt(request); } catch (Exception e) { @@ -1281,7 +1281,7 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( } ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - UpdateApplicationPriorityResponse response; + UpdateApplicationPriorityResponse response = null; try { response = clientRMProxy.updateApplicationPriority(request); } catch (Exception e) { @@ -1332,7 +1332,7 @@ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( } ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); - UpdateApplicationTimeoutsResponse response; + UpdateApplicationTimeoutsResponse response = null; try { response = clientRMProxy.updateApplicationTimeouts(request); } catch (Exception e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 51f1dd826318e..6f88f5571e24e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -118,7 +118,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { private final static int NUM_SUBCLUSTER = 4; - private final static int APP_PRIORITY_ZERO = 10; + private final static int APP_PRIORITY_ZERO = 0; @Override public void setUp() { From b9b9e23065a94142f25fe7b9171fd33578218902 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 8 Jun 2022 07:54:04 -0700 Subject: [PATCH 6/7] YARN-11159. fix code and checkstyle. --- .../TestFederationClientInterceptor.java | 93 +++++++++++-------- .../TestableFederationClientInterceptor.java | 3 + 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 6f88f5571e24e..9d6e509d1df8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; @@ -90,7 +91,11 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -208,7 +213,7 @@ public void testSubmitApplication() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -219,11 +224,11 @@ public void testSubmitApplication() } private SubmitApplicationRequest mockSubmitApplicationRequest( - ApplicationId appId, int priority) { + ApplicationId appId) { ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ApplicationSubmissionContext context = ApplicationSubmissionContext .newInstance(appId, MockApps.newAppName(), "default", - Priority.newInstance(priority), amContainerSpec, false, false, -1, + Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1, Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), "MockApp"); @@ -245,7 +250,7 @@ public void testSubmitApplicationMultipleSubmission() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // First attempt SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -313,7 +318,7 @@ public void testForceKillApplication() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application we are going to kill later SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -388,7 +393,7 @@ public void testGetApplicationReport() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application we want the report later SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -468,7 +473,7 @@ public void testGetApplicationAttemptReport() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application we want the applicationAttempt report later SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -597,7 +602,7 @@ public void testGetApplicationsResponse() ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationResponse response = interceptor.submitApplication(request); Assert.assertNotNull(response); @@ -639,7 +644,7 @@ public void testGetApplicationsApplicationTypeNotExists() throws Exception{ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationResponse response = interceptor.submitApplication(request); Assert.assertNotNull(response); @@ -670,7 +675,7 @@ public void testGetApplicationsApplicationStateNotExists() throws Exception { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); SubmitApplicationResponse response = interceptor.submitApplication(request); Assert.assertNotNull(response); @@ -793,7 +798,7 @@ public void testGetContainersRequest() throws Exception { // normal request ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -836,7 +841,7 @@ public void testGetContainerReportRequest() throws Exception { // normal request ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -881,7 +886,7 @@ public void getApplicationAttempts() throws Exception { // normal request ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -920,9 +925,8 @@ public void testFailApplicationAttempt() throws Exception { () -> interceptor.failApplicationAttempt(null)); // normal request - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -930,17 +934,20 @@ public void testFailApplicationAttempt() throws Exception { Assert.assertNotNull(response); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); + Assert.assertNotNull(subClusterId); + + MockRM mockRM = interceptor.getMockRMs().get(subClusterId); + mockRM.waitForState(appId, RMAppState.ACCEPTED); + RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); + mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.SCHEDULED); + // Call GetApplicationAttempts GetApplicationAttemptsRequest attemptsRequest = GetApplicationAttemptsRequest.newInstance(appId); GetApplicationAttemptsResponse attemptsResponse = interceptor.getApplicationAttempts(attemptsRequest); - - // Wait for app to start - while(attemptsResponse.getApplicationAttemptList().size() == 0) { - attemptsResponse = - interceptor.getApplicationAttempts(attemptsRequest); - } Assert.assertNotNull(attemptsResponse); ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList(). @@ -964,9 +971,8 @@ public void testUpdateApplicationPriority() throws Exception { () -> interceptor.updateApplicationPriority(null)); // normal request - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -974,17 +980,20 @@ public void testUpdateApplicationPriority() throws Exception { Assert.assertNotNull(response); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); + Assert.assertNotNull(subClusterId); + + MockRM mockRM = interceptor.getMockRMs().get(subClusterId); + mockRM.waitForState(appId, RMAppState.ACCEPTED); + RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); + mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.SCHEDULED); + // Call GetApplicationAttempts GetApplicationAttemptsRequest attemptsRequest = GetApplicationAttemptsRequest.newInstance(appId); GetApplicationAttemptsResponse attemptsResponse = interceptor.getApplicationAttempts(attemptsRequest); - - // Wait for app to start - while(attemptsResponse.getApplicationAttemptList().size() == 0) { - attemptsResponse = - interceptor.getApplicationAttempts(attemptsRequest); - } Assert.assertNotNull(attemptsResponse); Priority priority = Priority.newInstance(20); @@ -1008,9 +1017,8 @@ public void testUpdateApplicationTimeouts() throws Exception { () -> interceptor.updateApplicationTimeouts(null)); // normal request - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); - SubmitApplicationRequest request = mockSubmitApplicationRequest(appId, APP_PRIORITY_ZERO); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); // Submit the application SubmitApplicationResponse response = interceptor.submitApplication(request); @@ -1018,21 +1026,24 @@ public void testUpdateApplicationTimeouts() throws Exception { Assert.assertNotNull(response); Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId); + Assert.assertNotNull(subClusterId); + + MockRM mockRM = interceptor.getMockRMs().get(subClusterId); + mockRM.waitForState(appId, RMAppState.ACCEPTED); + RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId); + mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.SCHEDULED); + // Call GetApplicationAttempts GetApplicationAttemptsRequest attemptsRequest = GetApplicationAttemptsRequest.newInstance(appId); GetApplicationAttemptsResponse attemptsResponse = interceptor.getApplicationAttempts(attemptsRequest); - - // Wait for app to start - while(attemptsResponse.getApplicationAttemptList().size() == 0) { - attemptsResponse = - interceptor.getApplicationAttempts(attemptsRequest); - } Assert.assertNotNull(attemptsResponse); String formatISO8601 = - Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); + Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); Map applicationTimeouts = new HashMap<>(); applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, formatISO8601); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index c97d05324259b..202a286696a21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -115,4 +115,7 @@ protected void registerBadSubCluster(SubClusterId badSC) throws IOException { } } + public ConcurrentHashMap getMockRMs() { + return mockRMs; + } } From 820cdf33f43c0eb488e87dad3aaedff7638be13e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Wed, 8 Jun 2022 15:29:03 -0700 Subject: [PATCH 7/7] YARN-11159. fix var name. --- .../router/clientrm/TestFederationClientInterceptor.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 9d6e509d1df8c..9ead9fbe721ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -30,7 +30,6 @@ import java.util.Set; import java.util.stream.Collectors; -import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; @@ -1042,10 +1041,10 @@ public void testUpdateApplicationTimeouts() throws Exception { interceptor.getApplicationAttempts(attemptsRequest); Assert.assertNotNull(attemptsResponse); - String formatISO8601 = + String appTimeout = Times.formatISO8601(System.currentTimeMillis() + 5 * 1000); Map applicationTimeouts = new HashMap<>(); - applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, formatISO8601); + applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, appTimeout); UpdateApplicationTimeoutsRequest timeoutsRequest = UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts); @@ -1055,6 +1054,6 @@ public void testUpdateApplicationTimeouts() throws Exception { String responseTimeOut = timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME); Assert.assertNotNull(timeoutsResponse); - Assert.assertEquals(formatISO8601, responseTimeOut); + Assert.assertEquals(appTimeout, responseTimeOut); } }