-
Notifications
You must be signed in to change notification settings - Fork 9.2k
YARN-11003. Make RMNode aware of all (OContainer inclusive) allocated resources #3646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
9e60671
YARN-11003. Make RMNode aware of all (OContainer inclusive) allocated…
afchung b6ae804
Add more tests and address review comments
afchung 2ee6bf8
Correct assertEquals parameter order
afchung 7457e09
Address review comments, add node transition resource accounting
afchung File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,7 @@ | |
| import org.apache.hadoop.yarn.api.records.ContainerId; | ||
| import org.apache.hadoop.yarn.api.records.ContainerState; | ||
| import org.apache.hadoop.yarn.api.records.ContainerStatus; | ||
| import org.apache.hadoop.yarn.api.records.ExecutionType; | ||
| import org.apache.hadoop.yarn.api.records.NodeId; | ||
| import org.apache.hadoop.yarn.api.records.NodeState; | ||
| import org.apache.hadoop.yarn.api.records.Resource; | ||
|
|
@@ -79,6 +80,7 @@ | |
| import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; | ||
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; | ||
| import org.apache.hadoop.yarn.util.Records; | ||
| import org.apache.hadoop.yarn.util.resource.Resources; | ||
| import org.junit.After; | ||
| import org.junit.Assert; | ||
| import org.junit.Before; | ||
|
|
@@ -231,6 +233,24 @@ private RMNodeStatusEvent getMockRMNodeStatusEventWithoutRunningApps() { | |
| return event; | ||
| } | ||
|
|
||
| private static ContainerStatus getMockContainerStatus( | ||
| final ContainerId containerId, final Resource capability, | ||
| final ContainerState containerState) { | ||
| return getMockContainerStatus(containerId, capability, containerState, | ||
| ExecutionType.GUARANTEED); | ||
| } | ||
|
|
||
| private static ContainerStatus getMockContainerStatus( | ||
| final ContainerId containerId, final Resource capability, | ||
| final ContainerState containerState, final ExecutionType executionType) { | ||
| final ContainerStatus containerStatus = mock(ContainerStatus.class); | ||
| doReturn(containerId).when(containerStatus).getContainerId(); | ||
| doReturn(containerState).when(containerStatus).getState(); | ||
| doReturn(capability).when(containerStatus).getCapability(); | ||
| doReturn(executionType).when(containerStatus).getExecutionType(); | ||
| return containerStatus; | ||
| } | ||
|
|
||
| @Test (timeout = 5000) | ||
| public void testExpiredContainer() { | ||
| NodeStatus mockNodeStatus = createMockNodeStatus(); | ||
|
|
@@ -248,8 +268,8 @@ public void testExpiredContainer() { | |
| // Now verify that scheduler isn't notified of an expired container | ||
| // by checking number of 'completedContainers' it got in the previous event | ||
| RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null); | ||
| ContainerStatus containerStatus = mock(ContainerStatus.class); | ||
| doReturn(completedContainerId).when(containerStatus).getContainerId(); | ||
| ContainerStatus containerStatus = getMockContainerStatus( | ||
| completedContainerId, null, ContainerState.COMPLETE); | ||
| doReturn(Collections.singletonList(containerStatus)). | ||
| when(statusEvent).getContainers(); | ||
| node.handle(statusEvent); | ||
|
|
@@ -321,12 +341,13 @@ public void testContainerUpdate() throws InterruptedException{ | |
| RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null); | ||
| RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null); | ||
|
|
||
| ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class); | ||
| ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class); | ||
| ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class); | ||
| ContainerStatus containerStatusFromNode1 = getMockContainerStatus( | ||
| completedContainerIdFromNode1, null, ContainerState.COMPLETE); | ||
| ContainerStatus containerStatusFromNode2_1 = getMockContainerStatus( | ||
| completedContainerIdFromNode2_1, null, ContainerState.COMPLETE); | ||
| ContainerStatus containerStatusFromNode2_2 = getMockContainerStatus( | ||
| completedContainerIdFromNode2_2, null, ContainerState.COMPLETE); | ||
|
|
||
| doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1) | ||
| .getContainerId(); | ||
| doReturn(Collections.singletonList(containerStatusFromNode1)) | ||
| .when(statusEventFromNode1).getContainers(); | ||
| node.handle(statusEventFromNode1); | ||
|
|
@@ -336,13 +357,9 @@ public void testContainerUpdate() throws InterruptedException{ | |
|
|
||
| completedContainers.clear(); | ||
|
|
||
| doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1) | ||
| .getContainerId(); | ||
| doReturn(Collections.singletonList(containerStatusFromNode2_1)) | ||
| .when(statusEventFromNode2_1).getContainers(); | ||
|
|
||
| doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2) | ||
| .getContainerId(); | ||
| doReturn(Collections.singletonList(containerStatusFromNode2_2)) | ||
| .when(statusEventFromNode2_2).getContainers(); | ||
|
|
||
|
|
@@ -358,6 +375,119 @@ public void testContainerUpdate() throws InterruptedException{ | |
| .getContainerId()); | ||
| } | ||
|
|
||
| /** | ||
| * Tests that allocated container resources are counted correctly in | ||
| * {@link org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode} | ||
| * upon a node update. Resources should be counted for both GUARANTEED | ||
| * and OPPORTUNISTIC containers. | ||
| */ | ||
| @Test (timeout = 5000) | ||
| public void testAllocatedContainerUpdate() { | ||
| NodeStatus mockNodeStatus = createMockNodeStatus(); | ||
| //Start the node | ||
| node.handle(new RMNodeStartedEvent(null, null, null, mockNodeStatus)); | ||
goiri marked this conversation as resolved.
Show resolved
Hide resolved
bibinchundatt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Make sure that the node starts with no allocated resources | ||
| Assert.assertEquals(node.getAllocatedContainerResource(), Resources.none()); | ||
|
|
||
| ApplicationId app0 = BuilderUtils.newApplicationId(0, 0); | ||
| final ContainerId newContainerId = BuilderUtils.newContainerId( | ||
| BuilderUtils.newApplicationAttemptId(app0, 0), 0); | ||
| final ContainerId runningContainerId = BuilderUtils.newContainerId( | ||
| BuilderUtils.newApplicationAttemptId(app0, 0), 1); | ||
|
|
||
| rmContext.getRMApps().put(app0, Mockito.mock(RMApp.class)); | ||
|
|
||
| RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null); | ||
|
|
||
| final List<ContainerStatus> containerStatuses = new ArrayList<>(); | ||
|
|
||
| // Use different memory and VCores for new and running state containers | ||
| // to test that they add up correctly | ||
| final Resource newContainerCapability = | ||
| Resource.newInstance(100, 1); | ||
goiri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| final Resource runningContainerCapability = | ||
| Resource.newInstance(200, 2); | ||
| final Resource completedContainerCapability = | ||
| Resource.newInstance(50, 3); | ||
| final ContainerStatus newContainerStatusFromNode = getMockContainerStatus( | ||
| newContainerId, newContainerCapability, ContainerState.NEW); | ||
| final ContainerStatus runningContainerStatusFromNode = | ||
| getMockContainerStatus(runningContainerId, runningContainerCapability, | ||
| ContainerState.RUNNING); | ||
|
|
||
| containerStatuses.addAll(Arrays.asList( | ||
| newContainerStatusFromNode, runningContainerStatusFromNode)); | ||
| doReturn(containerStatuses).when(statusEventFromNode1).getContainers(); | ||
| node.handle(statusEventFromNode1); | ||
| Assert.assertEquals(node.getAllocatedContainerResource(), | ||
| Resource.newInstance(300, 3)); | ||
|
|
||
| final ContainerId newOppContainerId = BuilderUtils.newContainerId( | ||
| BuilderUtils.newApplicationAttemptId(app0, 0), 2); | ||
| final ContainerId runningOppContainerId = BuilderUtils.newContainerId( | ||
| BuilderUtils.newApplicationAttemptId(app0, 0), 3); | ||
|
|
||
| // Use the same resource capability as in previous for opportunistic case | ||
| RMNodeStatusEvent statusEventFromNode2 = getMockRMNodeStatusEvent(null); | ||
| final ContainerStatus newOppContainerStatusFromNode = | ||
| getMockContainerStatus(newOppContainerId, newContainerCapability, | ||
| ContainerState.NEW, ExecutionType.OPPORTUNISTIC); | ||
| final ContainerStatus runningOppContainerStatusFromNode = | ||
| getMockContainerStatus(runningOppContainerId, | ||
| runningContainerCapability, ContainerState.RUNNING, | ||
| ExecutionType.OPPORTUNISTIC); | ||
|
|
||
| containerStatuses.addAll(Arrays.asList( | ||
| newOppContainerStatusFromNode, runningOppContainerStatusFromNode)); | ||
|
|
||
| // Pass in both guaranteed and opportunistic container statuses | ||
| doReturn(containerStatuses).when(statusEventFromNode2).getContainers(); | ||
|
|
||
| node.handle(statusEventFromNode2); | ||
|
|
||
| // The result here should be double the first check, | ||
| // since allocated resources are doubled, just | ||
| // with different execution types | ||
| Assert.assertEquals(node.getAllocatedContainerResource(), | ||
| Resource.newInstance(600, 6)); | ||
|
|
||
| RMNodeStatusEvent statusEventFromNode3 = getMockRMNodeStatusEvent(null); | ||
| final ContainerId completedContainerId = BuilderUtils.newContainerId( | ||
| BuilderUtils.newApplicationAttemptId(app0, 0), 4); | ||
| final ContainerId completedOppContainerId = BuilderUtils.newContainerId( | ||
| BuilderUtils.newApplicationAttemptId(app0, 0), 5); | ||
| final ContainerStatus completedContainerStatusFromNode = | ||
| getMockContainerStatus(completedContainerId, completedContainerCapability, | ||
| ContainerState.COMPLETE, ExecutionType.OPPORTUNISTIC); | ||
| final ContainerStatus completedOppContainerStatusFromNode = | ||
| getMockContainerStatus(completedOppContainerId, | ||
| completedContainerCapability, ContainerState.COMPLETE, | ||
| ExecutionType.OPPORTUNISTIC); | ||
|
|
||
| containerStatuses.addAll(Arrays.asList( | ||
| completedContainerStatusFromNode, completedOppContainerStatusFromNode)); | ||
|
|
||
| doReturn(containerStatuses).when(statusEventFromNode3).getContainers(); | ||
| node.handle(statusEventFromNode3); | ||
|
|
||
| // Adding completed containers should not have changed | ||
| // the resources allocated | ||
| Assert.assertEquals(node.getAllocatedContainerResource(), | ||
|
||
| Resource.newInstance(600, 6)); | ||
|
|
||
| RMNodeStatusEvent emptyStatusEventFromNode = | ||
| getMockRMNodeStatusEvent(null); | ||
|
|
||
| doReturn(Collections.emptyList()) | ||
| .when(emptyStatusEventFromNode).getContainers(); | ||
| node.handle(emptyStatusEventFromNode); | ||
|
|
||
| // Passing an empty containers list should yield no resources allocated | ||
| Assert.assertEquals(node.getAllocatedContainerResource(), | ||
| Resources.none()); | ||
| } | ||
|
|
||
| @Test (timeout = 5000) | ||
| public void testStatusChange(){ | ||
| NodeStatus mockNodeStatus = createMockNodeStatus(); | ||
|
|
@@ -376,14 +506,14 @@ public void testStatusChange(){ | |
| RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); | ||
| RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null); | ||
|
|
||
| ContainerStatus containerStatus1 = mock(ContainerStatus.class); | ||
| ContainerStatus containerStatus2 = mock(ContainerStatus.class); | ||
| ContainerStatus containerStatus1 = getMockContainerStatus( | ||
| completedContainerId1, null, null); | ||
| ContainerStatus containerStatus2 = getMockContainerStatus( | ||
| completedContainerId2, null, null); | ||
|
|
||
| doReturn(completedContainerId1).when(containerStatus1).getContainerId(); | ||
| doReturn(Collections.singletonList(containerStatus1)) | ||
| .when(statusEvent1).getContainers(); | ||
|
|
||
| doReturn(completedContainerId2).when(containerStatus2).getContainerId(); | ||
| doReturn(Collections.singletonList(containerStatus2)) | ||
| .when(statusEvent2).getContainers(); | ||
|
|
||
|
|
@@ -1153,9 +1283,9 @@ public void testForHandlingDuplicatedCompltedContainers() { | |
|
|
||
| RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); | ||
|
|
||
| ContainerStatus containerStatus1 = mock(ContainerStatus.class); | ||
| ContainerStatus containerStatus1 = getMockContainerStatus( | ||
| completedContainerId1, null, ContainerState.COMPLETE); | ||
|
|
||
| doReturn(completedContainerId1).when(containerStatus1).getContainerId(); | ||
| doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1) | ||
| .getContainers(); | ||
|
|
||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start with upper case in comments.. This will include the sum of O+G containers queued + running + paused on the node.. Comment cane be more explanatory..