From d701dc627213f2655334d3f572d5e01871d4fd69 Mon Sep 17 00:00:00 2001 From: Susheel Gupta Date: Tue, 29 Apr 2025 00:47:27 +0530 Subject: [PATCH 1/3] YARN-11801: NPE in FifoCandidatesSelector.selectCandidates when preempting resources for an auto-created queue without child queues --- .../ProportionalCapacityPreemptionPolicy.java | 17 +- .../capacity/AbstractParentQueue.java | 9 ++ ...tProportionalCapacityPreemptionPolicy.java | 151 +++++++++++------- 3 files changed, 111 insertions(+), 66 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 443241a664ab9..e36b075c7896f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -22,6 +22,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -40,8 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .ManagedParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -430,12 +429,14 @@ private void cleanupStaledPreemptionCandidates(long currentTime) { } private Set getLeafQueueNames(TempQueuePerPartition q) { - // Also exclude ParentQueues, which might be without children - if (CollectionUtils.isEmpty(q.children) - && !(q.parentQueue instanceof ManagedParentQueue) - && (q.parentQueue == null - || !q.parentQueue.isEligibleForAutoQueueCreation())) { - return ImmutableSet.of(q.queueName); + // Only consider this a leaf queue if: + // It is a concrete leaf queue (not a childless parent) + if (CollectionUtils.isEmpty(q.children)) { + CSQueue queue = scheduler.getQueue(q.queueName); + if (queue instanceof AbstractLeafQueue) { + return ImmutableSet.of(q.queueName); + } + return Collections.emptySet(); } Set leafQueueNames = new HashSet<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java index 50516dd2bc5fa..111d6bfd9ca38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java @@ -552,6 +552,15 @@ public boolean isEligibleForAutoQueueCreation() { return isDynamicQueue() || queueContext.getConfiguration(). isAutoQueueCreationV2Enabled(getQueuePath()); } + /** + * Check whether this queue supports legacy(v1) dynamic child queue creation. + * @return true if queue is eligible to create child queues dynamically using + * the legacy system, false otherwise + */ + public boolean isEligibleForLegacyAutoQueueCreation() { + return isDynamicQueue() || queueContext.getConfiguration(). + isAutoCreateChildQueueEnabled(getQueuePath()); + } @Override public void reinitialize(CSQueue newlyParsedQueue, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index fe89a698cf2e6..276e24b511a34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -229,7 +229,7 @@ public void testProportionalPreemption() { // A will preempt guaranteed-allocated. verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); } - + @Test public void testMaxCap() { int[][] qData = new int[][]{ @@ -249,7 +249,7 @@ public void testMaxCap() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } - + @Test public void testPreemptCycle() { int[][] qData = new int[][]{ @@ -385,10 +385,10 @@ public void testPerQueueDisablePreemptionHierarchical() { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // verify capacity taken from queueB (appA), not queueE (appC) despite + // verify capacity taken from queueB (appA), not queueE (appC) despite // queueE being far over its absolute capacity because queueA (queueB's // parent) is over capacity and queueD (queueE's parent) is not. - ApplicationAttemptId expectedAttemptOnQueueB = + ApplicationAttemptId expectedAttemptOnQueueB = ApplicationAttemptId.newInstance( appA.getApplicationId(), appA.getAttemptId()); assertTrue("appA should be running on queueB", @@ -401,10 +401,10 @@ public void testPerQueueDisablePreemptionHierarchical() { conf.setPreemptionDisabled("root.queueA.queueB", true); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); - ApplicationAttemptId expectedAttemptOnQueueC = + ApplicationAttemptId expectedAttemptOnQueueC = ApplicationAttemptId.newInstance( appB.getApplicationId(), appB.getAttemptId()); - ApplicationAttemptId expectedAttemptOnQueueE = + ApplicationAttemptId expectedAttemptOnQueueE = ApplicationAttemptId.newInstance( appC.getApplicationId(), appC.getAttemptId()); // Now, all of queueB's (appA) over capacity is not preemptable, so neither @@ -422,7 +422,7 @@ public void testPerQueueDisablePreemptionHierarchical() { @Test public void testPerQueueDisablePreemptionBroadHierarchical() { int[][] qData = new int[][] { - // / A D G + // / A D G // B C E F H I {1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 }, // abs {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap @@ -473,15 +473,15 @@ public void testPerQueueDisablePreemptionBroadHierarchical() { @Test public void testPerQueueDisablePreemptionInheritParent() { int[][] qData = new int[][] { - // / A E + // / A E // B C D F G H {1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar) {1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap - {1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used + {1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used { 200, 0, 0, 0, 0, 200, 200, 0, 0 }, // pending { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved - // appA appB appC appD appE - { 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps + // appA appB appC appD appE + { 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granulrity { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues }; @@ -532,7 +532,7 @@ public void testPerQueuePreemptionNotAllUntouchable() { @Test public void testPerQueueDisablePreemptionRootDisablesAll() { int[][] qData = new int[][] { - // / A D G + // / A D G // B C E F H I {1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 }, // abs {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap @@ -704,7 +704,7 @@ public void testZeroGuar() { // its absolute guaranteed capacity verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } - + @Test public void testZeroGuarOverCap() { int[][] qData = new int[][] { @@ -727,11 +727,11 @@ public void testZeroGuarOverCap() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); } - + @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { - // / A D G + // / A D G // B C E F H I { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap @@ -788,7 +788,7 @@ public void testContainerOrdering(){ assert containers.get(4).equals(rm5); } - + @Test public void testPolicyInitializeAfterSchedulerInitialized() { @SuppressWarnings("resource") @@ -796,9 +796,9 @@ public void testPolicyInitializeAfterSchedulerInitialized() { rm.init(conf); // ProportionalCapacityPreemptionPolicy should be initialized after - // CapacityScheduler initialized. We will - // 1) find SchedulingMonitor from RMActiveService's service list, - // 2) check if ResourceCalculator in policy is null or not. + // CapacityScheduler initialized. We will + // 1) find SchedulingMonitor from RMActiveService's service list, + // 2) check if ResourceCalculator in policy is null or not. // If it's not null, we can come to a conclusion that policy initialized // after scheduler got initialized // Get SchedulingMonitor from SchedulingMonitorManager instead @@ -812,10 +812,10 @@ public void testPolicyInitializeAfterSchedulerInitialized() { assertNotNull(policy.getResourceCalculator()); return; } - + fail("Failed to find SchedulingMonitor service, please check what happened"); } - + @Test public void testSkipAMContainer() { int[][] qData = new int[][] { @@ -832,7 +832,7 @@ public void testSkipAMContainer() { setAMContainer = true; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - + // By skipping AM Container, all other 24 containers of appD will be // preempted verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); @@ -863,13 +863,13 @@ public void testPreemptSkippedAMContainers() { setAMContainer = true; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - + // All 5 containers of appD will be preempted including AM container. verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); // All 5 containers of appC will be preempted including AM container. verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); - + // By skipping AM Container, all other 4 containers of appB will be // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); @@ -879,7 +879,7 @@ public void testPreemptSkippedAMContainers() { verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; } - + @Test public void testAMResourcePercentForSkippedAMContainers() { int[][] qData = new int[][] { @@ -897,7 +897,7 @@ public void testAMResourcePercentForSkippedAMContainers() { setAMResourcePercent = 0.5f; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - + // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb. // Total used AM container size is 20GB, hence 2 AM container has // to be preempted as Queue Capacity is 10Gb. @@ -906,7 +906,7 @@ public void testAMResourcePercentForSkippedAMContainers() { // Including AM Container, all other 4 containers of appC will be // preempted verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); - + // By skipping AM Container, all other 4 containers of appB will be // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); @@ -1073,44 +1073,75 @@ public void testRefreshPreemptionProperties() throws Exception { } @Test - public void testLeafQueueNameExtraction() throws Exception { - ProportionalCapacityPreemptionPolicy policy = - buildPolicy(Q_DATA_FOR_IGNORE); + public void testLeafQueueNameExtractionWithFlexibleAQC() throws Exception { + ProportionalCapacityPreemptionPolicy policy = buildPolicy(Q_DATA_FOR_IGNORE); ParentQueue root = (ParentQueue) mCS.getRootQueue(); + root.addDynamicParentQueue("childlessFlexible"); + ParentQueue dynamicParent = setupDynamicParentQueue("root.dynamicParent", true); + extendRootQueueWithMock(root, dynamicParent); + + policy.editSchedule(); + assertFalse( + "root.dynamicLegacyParent" + " should not be a LeafQueue candidate", + policy.getLeafQueueNames().contains( "root.dynamicParent")); + } + + @Test + public void testLeafQueueNameExtractionWithLegacyAQC() throws Exception { + ProportionalCapacityPreemptionPolicy policy = buildPolicy(Q_DATA_FOR_IGNORE); + ParentQueue root = (ParentQueue) mCS.getRootQueue(); + + root.addDynamicParentQueue("childlessLegacy"); + ParentQueue dynamicParent = setupDynamicParentQueue("root.dynamicLegacyParent", false); + extendRootQueueWithMock(root, dynamicParent); + + policy.editSchedule(); + assertFalse("root.dynamicLegacyParent" + " should not be a LeafQueue candidate", + policy.getLeafQueueNames().contains( "root.dynamicLegacyParent")); + } + + private ParentQueue setupDynamicParentQueue(String queuePath, boolean isFlexible) { + ParentQueue dynamicParent = mockParentQueue(null, 0, new LinkedList<>()); + mockQueueFields(dynamicParent, queuePath); + + if (isFlexible) { + when(dynamicParent.isEligibleForAutoQueueCreation()).thenReturn(true); + } else { + when(dynamicParent.isEligibleForLegacyAutoQueueCreation()).thenReturn(true); + } + + return dynamicParent; + } + + private void extendRootQueueWithMock(ParentQueue root, ParentQueue mockQueue) { List queues = root.getChildQueues(); ArrayList extendedQueues = new ArrayList<>(); - LinkedList pqs = new LinkedList<>(); - ParentQueue dynamicParent = mockParentQueue( - null, 0, pqs); - when(dynamicParent.getQueuePath()).thenReturn("root.dynamicParent"); - when(dynamicParent.getQueueCapacities()).thenReturn( - new QueueCapacities(false)); - QueueResourceQuotas dynamicParentQr = new QueueResourceQuotas(); - dynamicParentQr.setEffectiveMaxResource(Resource.newInstance(1, 1)); - dynamicParentQr.setEffectiveMinResource(Resources.createResource(1)); - dynamicParentQr.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL, - Resource.newInstance(1, 1)); - dynamicParentQr.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL, - Resources.createResource(1)); - when(dynamicParent.getQueueResourceQuotas()).thenReturn(dynamicParentQr); - when(dynamicParent.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL)) - .thenReturn(Resources.createResource(1)); - when(dynamicParent.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL)) - .thenReturn(Resource.newInstance(1, 1)); - ResourceUsage resUsage = new ResourceUsage(); - resUsage.setUsed(Resources.createResource(1024)); - resUsage.setReserved(Resources.createResource(1024)); - when(dynamicParent.getQueueResourceUsage()).thenReturn(resUsage); - when(dynamicParent.isEligibleForAutoQueueCreation()).thenReturn(true); - extendedQueues.add(dynamicParent); + extendedQueues.add(mockQueue); extendedQueues.addAll(queues); when(root.getChildQueues()).thenReturn(extendedQueues); + } - policy.editSchedule(); + private void mockQueueFields(ParentQueue queue, String queuePath) { + when(queue.getQueuePath()).thenReturn(queuePath); + when(queue.getQueueCapacities()).thenReturn(new QueueCapacities(false)); + + QueueResourceQuotas qrq = new QueueResourceQuotas(); + qrq.setEffectiveMaxResource(Resource.newInstance(1, 1)); + qrq.setEffectiveMinResource(Resources.createResource(1)); + qrq.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL, Resource.newInstance(1, 1)); + qrq.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL, Resources.createResource(1)); - assertFalse("dynamicParent should not be a LeafQueue " + - "candidate", policy.getLeafQueueNames().contains("root.dynamicParent")); + when(queue.getQueueResourceQuotas()).thenReturn(qrq); + when(queue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL)) + .thenReturn(Resources.createResource(1)); + when(queue.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL)) + .thenReturn(Resource.newInstance(1, 1)); + + ResourceUsage usage = new ResourceUsage(); + usage.setUsed(Resources.createResource(1024)); + usage.setReserved(Resources.createResource(1024)); + when(queue.getQueueResourceUsage()).thenReturn(usage); } static class IsPreemptionRequestFor @@ -1359,8 +1390,12 @@ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, Resource[] used, Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran) { LeafQueue lq = mock(LeafQueue.class); + + String queuePath = p.getQueuePath() + ".queue" + (char)('A' + i - 1); + when(mCS.getQueue(queuePath)).thenReturn(lq); + ResourceCalculator rc = mCS.getResourceCalculator(); - List appAttemptIdList = + List appAttemptIdList = new ArrayList(); when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), isA(String.class), eq(false))).thenReturn(pending[i]); From 0d6ff0409c8fb2fb6eee797d96a5bcce3a82a5e3 Mon Sep 17 00:00:00 2001 From: Susheel Gupta <38013283+susheelgupta7@users.noreply.github.com> Date: Wed, 30 Apr 2025 22:41:02 +0530 Subject: [PATCH 2/3] removed extra spaces from TestProportionalCapacityPreemptionPolicy.java --- ...tProportionalCapacityPreemptionPolicy.java | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 276e24b511a34..7901fc85ad2df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -229,7 +229,7 @@ public void testProportionalPreemption() { // A will preempt guaranteed-allocated. verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); } - + @Test public void testMaxCap() { int[][] qData = new int[][]{ @@ -249,7 +249,7 @@ public void testMaxCap() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } - + @Test public void testPreemptCycle() { int[][] qData = new int[][]{ @@ -385,10 +385,10 @@ public void testPerQueueDisablePreemptionHierarchical() { }; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - // verify capacity taken from queueB (appA), not queueE (appC) despite + // verify capacity taken from queueB (appA), not queueE (appC) despite // queueE being far over its absolute capacity because queueA (queueB's // parent) is over capacity and queueD (queueE's parent) is not. - ApplicationAttemptId expectedAttemptOnQueueB = + ApplicationAttemptId expectedAttemptOnQueueB = ApplicationAttemptId.newInstance( appA.getApplicationId(), appA.getAttemptId()); assertTrue("appA should be running on queueB", @@ -401,10 +401,10 @@ public void testPerQueueDisablePreemptionHierarchical() { conf.setPreemptionDisabled("root.queueA.queueB", true); ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData); policy2.editSchedule(); - ApplicationAttemptId expectedAttemptOnQueueC = + ApplicationAttemptId expectedAttemptOnQueueC = ApplicationAttemptId.newInstance( appB.getApplicationId(), appB.getAttemptId()); - ApplicationAttemptId expectedAttemptOnQueueE = + ApplicationAttemptId expectedAttemptOnQueueE = ApplicationAttemptId.newInstance( appC.getApplicationId(), appC.getAttemptId()); // Now, all of queueB's (appA) over capacity is not preemptable, so neither @@ -422,7 +422,7 @@ public void testPerQueueDisablePreemptionHierarchical() { @Test public void testPerQueueDisablePreemptionBroadHierarchical() { int[][] qData = new int[][] { - // / A D G + // / A D G // B C E F H I {1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 }, // abs {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap @@ -473,15 +473,15 @@ public void testPerQueueDisablePreemptionBroadHierarchical() { @Test public void testPerQueueDisablePreemptionInheritParent() { int[][] qData = new int[][] { - // / A E + // / A E // B C D F G H {1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar) {1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap - {1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used + {1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used { 200, 0, 0, 0, 0, 200, 200, 0, 0 }, // pending { 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved - // appA appB appC appD appE - { 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps + // appA appB appC appD appE + { 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps { -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granulrity { 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues }; @@ -532,7 +532,7 @@ public void testPerQueuePreemptionNotAllUntouchable() { @Test public void testPerQueueDisablePreemptionRootDisablesAll() { int[][] qData = new int[][] { - // / A D G + // / A D G // B C E F H I {1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 }, // abs {1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap @@ -704,7 +704,7 @@ public void testZeroGuar() { // its absolute guaranteed capacity verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } - + @Test public void testZeroGuarOverCap() { int[][] qData = new int[][] { @@ -727,11 +727,11 @@ public void testZeroGuarOverCap() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); } - + @Test public void testHierarchicalLarge() { int[][] qData = new int[][] { - // / A D G + // / A D G // B C E F H I { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap @@ -788,7 +788,7 @@ public void testContainerOrdering(){ assert containers.get(4).equals(rm5); } - + @Test public void testPolicyInitializeAfterSchedulerInitialized() { @SuppressWarnings("resource") @@ -796,9 +796,9 @@ public void testPolicyInitializeAfterSchedulerInitialized() { rm.init(conf); // ProportionalCapacityPreemptionPolicy should be initialized after - // CapacityScheduler initialized. We will - // 1) find SchedulingMonitor from RMActiveService's service list, - // 2) check if ResourceCalculator in policy is null or not. + // CapacityScheduler initialized. We will + // 1) find SchedulingMonitor from RMActiveService's service list, + // 2) check if ResourceCalculator in policy is null or not. // If it's not null, we can come to a conclusion that policy initialized // after scheduler got initialized // Get SchedulingMonitor from SchedulingMonitorManager instead @@ -812,10 +812,10 @@ public void testPolicyInitializeAfterSchedulerInitialized() { assertNotNull(policy.getResourceCalculator()); return; } - + fail("Failed to find SchedulingMonitor service, please check what happened"); } - + @Test public void testSkipAMContainer() { int[][] qData = new int[][] { @@ -832,7 +832,7 @@ public void testSkipAMContainer() { setAMContainer = true; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - + // By skipping AM Container, all other 24 containers of appD will be // preempted verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD))); @@ -863,13 +863,13 @@ public void testPreemptSkippedAMContainers() { setAMContainer = true; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - + // All 5 containers of appD will be preempted including AM container. verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD))); // All 5 containers of appC will be preempted including AM container. verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); - + // By skipping AM Container, all other 4 containers of appB will be // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); @@ -879,7 +879,7 @@ public void testPreemptSkippedAMContainers() { verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; } - + @Test public void testAMResourcePercentForSkippedAMContainers() { int[][] qData = new int[][] { @@ -897,7 +897,7 @@ public void testAMResourcePercentForSkippedAMContainers() { setAMResourcePercent = 0.5f; ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); policy.editSchedule(); - + // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb. // Total used AM container size is 20GB, hence 2 AM container has // to be preempted as Queue Capacity is 10Gb. @@ -906,7 +906,7 @@ public void testAMResourcePercentForSkippedAMContainers() { // Including AM Container, all other 4 containers of appC will be // preempted verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC))); - + // By skipping AM Container, all other 4 containers of appB will be // preempted verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB))); @@ -1395,7 +1395,7 @@ LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, when(mCS.getQueue(queuePath)).thenReturn(lq); ResourceCalculator rc = mCS.getResourceCalculator(); - List appAttemptIdList = + List appAttemptIdList = new ArrayList(); when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), isA(String.class), eq(false))).thenReturn(pending[i]); From d8d7b37c5953bbd88b329af9e89a96706b2c0926 Mon Sep 17 00:00:00 2001 From: Susheel Gupta <38013283+susheelgupta7@users.noreply.github.com> Date: Mon, 5 May 2025 18:59:45 +0530 Subject: [PATCH 3/3] Update TestProportionalCapacityPreemptionPolicy.java Fixed EOL check --- .../capacity/TestProportionalCapacityPreemptionPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 7901fc85ad2df..6664f90ce6a58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -473,7 +473,7 @@ public void testPerQueueDisablePreemptionBroadHierarchical() { @Test public void testPerQueueDisablePreemptionInheritParent() { int[][] qData = new int[][] { - // / A E + // / A E // B C D F G H {1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar) {1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap