From c85b7012b90fff2cf323009664b14db1e998408a Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 27 Oct 2017 16:59:57 -0700 Subject: [PATCH 01/13] Notify affected Application Masters when a node enters DECOMMISSIONING state --- .../server/resourcemanager/rmnode/RMNodeImpl.java | 5 +++++ .../server/resourcemanager/TestRMNodeTransitions.java | 11 +++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d0bfecfa0024b..4419522caad02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1160,6 +1160,11 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Update NM metrics during graceful decommissioning. rmNode.updateMetricsForGracefulDecommission(initState, finalState); rmNode.decommissioningTimeout = timeout; + // Notify NodesListManager to notify all RMApp so that each Application Master + // could take any required actions. + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); if (rmNode.originalTotalCapability == null){ rmNode.originalTotalCapability = Resources.clone(rmNode.totalCapability); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index ba806ab090253..e0ca4dd505a1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -27,7 +27,9 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -98,13 +100,16 @@ public void handle(SchedulerEvent event) { } private NodesListManagerEvent nodesListManagerEvent = null; - + + private List nodesListManagerEventsNodeStateSequence = new LinkedList<>(); + private class TestNodeListManagerEventDispatcher implements EventHandler { @Override public void handle(NodesListManagerEvent event) { nodesListManagerEvent = event; + nodesListManagerEventsNodeStateSequence.add(event.getNode().getState()); } } @@ -150,7 +155,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); nodesListManagerEvent = null; - + nodesListManagerEventsNodeStateSequence.clear(); } @After @@ -721,6 +726,8 @@ private RMNodeImpl getDecommissioningNode() { node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.GRACEFUL_DECOMMISSION)); Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); + Assert.assertEquals(Arrays.asList(NodeState.NEW, NodeState.RUNNING), + nodesListManagerEventsNodeStateSequence); Assert .assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs()); Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1, From ed2561e3bfb821f6b981ac5e76761c7ae8475e01 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 1 Nov 2017 17:41:27 -0700 Subject: [PATCH 02/13] Add decommission timeout field to NodeReport --- .../hadoop/yarn/api/records/NodeReport.java | 16 +++++- .../src/main/proto/yarn_protos.proto | 1 + .../yarn/client/ProtocolHATestBase.java | 2 +- .../hadoop/yarn/client/cli/TestYarnCLI.java | 2 +- .../api/records/impl/pb/NodeReportPBImpl.java | 16 ++++++ .../yarn/server/utils/BuilderUtils.java | 9 +-- .../resourcemanager/ClientRMService.java | 2 +- .../DecommissioningNodesWatcher.java | 38 ++----------- .../resourcemanager/DefaultAMSProcessor.java | 2 +- .../resourcemanager/NodesListManager.java | 34 ++++++++++- .../server/resourcemanager/rmnode/RMNode.java | 2 +- .../rmnode/RMNodeDecommissioningEvent.java | 8 +-- .../yarn/server/resourcemanager/MockRM.java | 15 ++++- .../resourcemanager/TestClientRMService.java | 44 +++++++++++++- .../TestDecommissioningNodesWatcher.java | 3 +- .../TestResourceTrackerService.java | 57 +++++++++++++++++++ .../TestAMRMRPCNodeUpdates.java | 48 +++++++++++++++- 17 files changed, 245 insertions(+), 54 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java index 885a3b4b35ab5..1f41e179a1754 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java @@ -53,7 +53,7 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { return newInstance(nodeId, nodeState, httpAddress, rackName, used, - capability, numContainers, healthReport, lastHealthReportTime, null); + capability, numContainers, healthReport, lastHealthReportTime, null, null); } @Private @@ -61,7 +61,7 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime, - Set nodeLabels) { + Set nodeLabels, Integer decommissioningTimeout) { NodeReport nodeReport = Records.newRecord(NodeReport.class); nodeReport.setNodeId(nodeId); nodeReport.setNodeState(nodeState); @@ -73,6 +73,7 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, nodeReport.setHealthReport(healthReport); nodeReport.setLastHealthReportTime(lastHealthReportTime); nodeReport.setNodeLabels(nodeLabels); + nodeReport.setDecommissioningTimeout(decommissioningTimeout); return nodeReport; } @@ -227,4 +228,15 @@ public void setAggregatedContainersUtilization(ResourceUtilization @Private @Unstable public abstract void setNodeUtilization(ResourceUtilization nodeUtilization); + + /** + * Optional decommissioning timeout in seconds (null indicates absent timeout). + * @return the decommissioning timeout in second. + */ + public abstract Integer getDecommissioningTimeout(); + + /** + * Set the decommissioning timeout in seconds (null indicates absent timeout). + * */ + public abstract void setDecommissioningTimeout(Integer decommissioningTimeout); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index e69c07bcbb0d6..fa27be5bacb21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -345,6 +345,7 @@ message NodeReportProto { repeated string node_labels = 10; optional ResourceUtilizationProto containers_utilization = 11; optional ResourceUtilizationProto node_utilization = 12; + optional uint32 decommissioning_timeout = 13; } message NodeIdToLabelsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index f4005e9021c2e..77fb75bd21b5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -657,7 +657,7 @@ public List createFakeNodeReports() { NodeId nodeId = NodeId.newInstance("localhost", 0); NodeReport report = NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost", - "rack1", null, null, 4, null, 1000l, null); + "rack1", null, null, 4, null, 1000l); List reports = new ArrayList(); reports.add(report); return reports; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 84cfb0ad22291..e9338ffbf8acd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -1992,7 +1992,7 @@ private List getNodeReports(int noOfNodes, NodeState state, NodeReport nodeReport = NodeReport.newInstance(NodeId .newInstance("host" + i, 0), state, "host" + 1 + ":8888", "rack1", Records.newRecord(Resource.class), Records - .newRecord(Resource.class), 0, "", 0, nodeLabels); + .newRecord(Resource.class), 0, "", 0, nodeLabels, null); if (!emptyResourceUtilization) { ResourceUtilization containersUtilization = ResourceUtilization .newInstance(1024, 2048, 4); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java index 0d205e90da173..b7accf758a927 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java @@ -387,4 +387,20 @@ public void setNodeUtilization(ResourceUtilization nodeResourceUtilization) { } this.nodeUtilization = nodeResourceUtilization; } + + @Override + public Integer getDecommissioningTimeout() { + NodeReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasDecommissioningTimeout()) ? p.getDecommissioningTimeout() : null; + } + + @Override + public void setDecommissioningTimeout(Integer decommissioningTimeout) { + maybeInitBuilder(); + if (decommissioningTimeout == null || decommissioningTimeout < 0) { + builder.clearDecommissioningTimeout(); + return; + } + builder.setDecommissioningTimeout(decommissioningTimeout); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index fec26815817ba..1a90e785cede0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -187,23 +187,23 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { return newNodeReport(nodeId, nodeState, httpAddress, rackName, used, - capability, numContainers, healthReport, lastHealthReportTime, null); + capability, numContainers, healthReport, lastHealthReportTime, null, null); } public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime, - Set nodeLabels) { + Set nodeLabels, Integer decommissioningTimeout) { return newNodeReport(nodeId, nodeState, httpAddress, rackName, used, capability, numContainers, healthReport, lastHealthReportTime, - nodeLabels, null, null); + nodeLabels, null, null, decommissioningTimeout); } public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime, Set nodeLabels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, Integer decommissioningTimeout) { NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class); nodeReport.setNodeId(nodeId); nodeReport.setNodeState(nodeState); @@ -217,6 +217,7 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, nodeReport.setNodeLabels(nodeLabels); nodeReport.setAggregatedContainersUtilization(containersUtilization); nodeReport.setNodeUtilization(nodeUtilization); + nodeReport.setDecommissioningTimeout(decommissioningTimeout); return nodeReport; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e9c6eac6ef051..63e998a041b38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1040,7 +1040,7 @@ private NodeReport createNodeReports(RMNode rmNode) { rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(), - rmNode.getNodeUtilization()); + rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout()); return report; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java index 9631803e3fd54..ca3eb79841486 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -72,11 +72,6 @@ public class DecommissioningNodesWatcher { private final RMContext rmContext; - // Default timeout value in mills. - // Negative value indicates no timeout. 0 means immediate. - private long defaultTimeoutMs = - 1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT; - // Once a RMNode is observed in DECOMMISSIONING state, // All its ContainerStatus update are tracked inside DecomNodeContext. class DecommissioningNodeContext { @@ -105,16 +100,15 @@ class DecommissioningNodeContext { private long lastUpdateTime; - public DecommissioningNodeContext(NodeId nodeId) { + public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) { this.nodeId = nodeId; this.appIds = new HashSet(); this.decommissioningStartTime = mclock.getTime(); - this.timeoutMs = defaultTimeoutMs; + this.timeoutMs = 1000L * timeoutSec; } - void updateTimeout(Integer timeoutSec) { - this.timeoutMs = (timeoutSec == null)? - defaultTimeoutMs : (1000L * timeoutSec); + void updateTimeout(int timeoutSec) { + this.timeoutMs = 1000L * timeoutSec; } } @@ -132,7 +126,6 @@ public DecommissioningNodesWatcher(RMContext rmContext) { } public void init(Configuration conf) { - readDecommissioningTimeout(conf); int v = conf.getInt( YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL, YarnConfiguration @@ -162,7 +155,8 @@ public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { } } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { if (context == null) { - context = new DecommissioningNodeContext(rmNode.getNodeID()); + context = new DecommissioningNodeContext(rmNode.getNodeID(), + rmNode.getDecommissioningTimeout()); decomNodes.put(rmNode.getNodeID(), context); context.nodeState = rmNode.getState(); context.decommissionedTime = 0; @@ -416,24 +410,4 @@ private void logDecommissioningNodesStatus() { LOG.debug("Decommissioning node: " + sb.toString()); } } - - // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. - // This enables DecommissioningNodesWatcher to pick up new value - // without ResourceManager restart. - private void readDecommissioningTimeout(Configuration conf) { - try { - if (conf == null) { - conf = new YarnConfiguration(); - } - int v = conf.getInt( - YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, - YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); - if (defaultTimeoutMs != 1000L * v) { - defaultTimeoutMs = 1000L * v; - LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs); - } - } catch (Exception e) { - LOG.info("Error readDecommissioningTimeout ", e); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 9774a1a7b6ff5..d629f30616a98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -344,7 +344,7 @@ private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { rmNode.getHttpAddress(), rmNode.getRackName(), used, rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), - rmNode.getNodeLabels()); + rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout()); updatedNodeReports.add(report); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index edd173b851398..5c832526ad973 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -71,6 +71,12 @@ public class NodesListManager extends CompositeService implements private HostsFileReader hostsReader; private Configuration conf; private final RMContext rmContext; + + // Default decommissioning timeout value in seconds. + // Negative value indicates no timeout. 0 means immediate. + private int defaultDecTimeoutSecs = + YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT; + private final Configuration dynamicConf = new YarnConfiguration(); private String includesFile; private String excludesFile; @@ -214,6 +220,9 @@ public void refreshNodes(Configuration yarnConf, boolean graceful) private void refreshHostsReader( Configuration yarnConf, boolean graceful, Integer timeout) throws IOException, YarnException { + // resolve the default timeout to the decommission timeout that is + // configured at this moment + timeout = (timeout == null) ? readDecommissioningTimeout() : timeout; if (null == yarnConf) { yarnConf = new YarnConfiguration(); } @@ -610,7 +619,30 @@ public void refreshNodesForcefully() { } } } - + + // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. + // This enables NodesListManager to pick up new value without ResourceManager restart. + private int readDecommissioningTimeout() { + try { + dynamicConf.reloadConfiguration(); + int configuredDefaultDecTimeoutSecs = + dynamicConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); + if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) { + defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs; + LOG.info("Use new decommissioningTimeoutSecs: " + defaultDecTimeoutSecs); + } + } catch (Exception e) { + LOG.warn("Error readDecommissioningTimeout " + e.getMessage()); + } + return defaultDecTimeoutSecs; + } + + @VisibleForTesting + Configuration getDynamicConf() { + return this.dynamicConf; + } + /** * A NodeId instance needed upon startup for populating inactive nodes Map. * It only knows the hostname/ip and marks the port to -1 or invalid. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index ab15c95bd97f3..328c040a27986 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -183,7 +183,7 @@ void updateNodeHeartbeatResponseForUpdatedContainers( void setUntrackedTimeStamp(long timeStamp); /* * Optional decommissioning timeout in second - * (null indicates default timeout). + * (null indicates absent timeout). * @return the decommissioning timeout in second. */ Integer getDecommissioningTimeout(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java index 9955e9ea4fdc2..47e85395a41b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java @@ -25,17 +25,17 @@ * */ public class RMNodeDecommissioningEvent extends RMNodeEvent { - // Optional decommissioning timeout in second. - private final Integer decommissioningTimeout; + // decommissioning timeout in second. + private final int decommissioningTimeout; // Create instance with optional timeout // (timeout could be null which means use default). - public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) { + public RMNodeDecommissioningEvent(NodeId nodeId, int timeout) { super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION); this.decommissioningTimeout = timeout; } - public Integer getDecommissioningTimeout() { + public int getDecommissioningTimeout() { return this.decommissioningTimeout; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index d128b0205fb46..3f9ebd3457d87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -916,12 +917,24 @@ public void waitForState(NodeId nodeId, NodeState finalState) Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); } - + + + public void sendNodeGracefulDecommission(MockNM nm, int timeout) throws Exception { + RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); + Assert.assertNotNull("node shouldn't be null", node); + node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout)); + } + public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); + Assert.assertNotNull("node shouldn't be null", node); node.handle(new RMNodeEvent(nm.getNodeId(), event)); } + + public Integer getDecommissioningTimeout(NodeId nodeid) { + return this.getRMContext().getRMNodes().get(nodeid).getDecommissioningTimeout(); + } public KillApplicationResponse killApp(ApplicationId appId) throws Exception { ApplicationClientProtocol client = getClientRMService(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index ae57dfb2ca842..8979e69d2335e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -179,7 +179,46 @@ public class TestClientRMService { private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; - + + + @Test + public void testGetDecommissioningClusterNodes() throws Exception { + MockRM rm = new MockRM() { + protected ClientRMService createClientRMService() { + return new ClientRMService(this.rmContext, scheduler, + this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, + this.getRMContext().getRMDelegationTokenSecretManager()); + }; + }; + rm.start(); + + MockNM decommissioningNodeWithTimeout = rm.registerNode("host1:1234", 1024); + rm.sendNodeStarted(decommissioningNodeWithTimeout); + decommissioningNodeWithTimeout.nodeHeartbeat(true); + Integer decommissioningTimeout = 600; + rm.sendNodeGracefulDecommission(decommissioningNodeWithTimeout, decommissioningTimeout); + rm.waitForState(decommissioningNodeWithTimeout.getNodeId(), NodeState.DECOMMISSIONING); + + // Create a client. + Configuration conf = new Configuration(); + YarnRPC rpc = YarnRPC.create(conf); + InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); + LOG.info("Connecting to ResourceManager at " + rmAddress); + ApplicationClientProtocol client = + (ApplicationClientProtocol) rpc + .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + + // Make call + List nodeReports = client.getClusterNodes( + GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.DECOMMISSIONING))).getNodeReports(); + Assert.assertEquals(1, nodeReports.size()); + Assert.assertEquals(decommissioningTimeout, + nodeReports.iterator().next().getDecommissioningTimeout()); + + rpc.stopProxy(client, conf); + rm.close(); + } + @Test public void testGetClusterNodes() throws Exception { MockRM rm = new MockRM() { @@ -228,6 +267,7 @@ protected ClientRMService createClientRMService() { // Check node's label = x Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x")); + Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); // Now make the node unhealthy. node.nodeHeartbeat(false); @@ -251,6 +291,7 @@ protected ClientRMService createClientRMService() { nodeReports.get(0).getNodeState()); Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y")); + Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); // Remove labels of host1 map = new HashMap>(); @@ -267,6 +308,7 @@ protected ClientRMService createClientRMService() { for (NodeReport report : nodeReports) { Assert.assertTrue(report.getNodeLabels() != null && report.getNodeLabels().isEmpty()); + Assert.assertNull(report.getDecommissioningTimeout()); } rpc.stopProxy(client, conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java index 690de308e2385..25934c5c49099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -70,7 +70,8 @@ public void testDecommissioningNodesWatcher() throws Exception { // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION); - rm.waitForState(id1, NodeState.DECOMMISSIONING); + rm.sendNodeGracefulDecommission(nm1, + YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); // Update status with decreasing number of running containers until 0. watcher.update(node1, createNodeStatus(id1, app, 12)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 2bd745b522082..547a6e5d4a729 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -26,7 +26,10 @@ import static org.mockito.Mockito.when; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -290,6 +293,60 @@ public void testGracefulDecommissionNoApp() throws Exception { Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); } + + @Test + public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + .getAbsolutePath()); + writeToHostsFile(""); + final File yarnSite = new File(TEMP_DIR + File.separator + + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + conf.writeXml(new FileWriter(yarnSite)); + rm = new MockRM(conf); + rm.start(); + rm.getNodesListManager().getDynamicConf().addResource(yarnSite.toURI().toURL()); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:5678", 5120); + MockNM nm3 = rm.registerNode("host3:9101", 5120); + + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); + + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + + rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); + rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); + rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); + + // Graceful decommission both host1 and host2, with non default timeout for host1 + final Integer nm1DecommissionTimeout = 20; + writeToHostsFile(nm1.getNodeId().getHost() + " " + nm1DecommissionTimeout, + nm2.getNodeId().getHost()); + rm.getNodesListManager().refreshNodes(conf, true); + rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); + rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); + Assert.assertEquals(nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId())); + Integer defaultDecTimeout = + conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); + Assert.assertEquals(defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId())); + + // Graceful decommission host3 with a new default timeout + final Integer newDefaultDecTimeout = defaultDecTimeout + 10; + writeToHostsFile(nm3.getNodeId().getHost()); + Configuration updatedConf = new YarnConfiguration(conf); + updatedConf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + newDefaultDecTimeout); + updatedConf.writeXml(new FileWriter(yarnSite)); + rm.getNodesListManager().refreshNodes(conf, true); + rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); + Assert.assertEquals(newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId())); + } /** * Graceful decommission node with running application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index f9f0b746233a8..ca12b57b40fb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -81,8 +81,14 @@ private void syncNodeLost(MockNM nm) throws Exception { rm.sendNodeLost(nm); rm.drainEvents(); } - - private AllocateResponse allocate(final ApplicationAttemptId attemptId, + + private void syncNodeGracefulDecommission(MockNM nm, int timeout) throws Exception { + rm.sendNodeGracefulDecommission(nm, timeout); + rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING); + rm.drainEvents(); + } + + private AllocateResponse allocate(final ApplicationAttemptId attemptId, final AllocateRequest req) throws Exception { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(attemptId.toString()); @@ -97,7 +103,38 @@ public AllocateResponse run() throws Exception { } }); } - + + @Test + public void testAMRMDecommissioningNodes() throws Exception { + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000); + rm.drainEvents(); + + RMApp app1 = rm.submitApp(2000); + + // Trigger the scheduling so the AM gets 'launched' on nm1 + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + + // register AM returns no unusable node + am1.registerAppAttempt(); + + Integer decommissioningTimeout = 600; + syncNodeGracefulDecommission(nm2, decommissioningTimeout); + + AllocateRequest allocateRequest1 = + AllocateRequest.newInstance(0, 0F, null, null, null); + AllocateResponse response1 = + allocate(attempt1.getAppAttemptId(), allocateRequest1); + List updatedNodes = response1.getUpdatedNodes(); + Assert.assertEquals(1, updatedNodes.size()); + Assert.assertEquals(decommissioningTimeout, + updatedNodes.iterator().next().getDecommissioningTimeout()); + } + + @Test public void testAMRMUnusableNodes() throws Exception { @@ -138,6 +175,7 @@ public void testAMRMUnusableNodes() throws Exception { NodeReport nr = updatedNodes.iterator().next(); Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState()); + Assert.assertNull(nr.getDecommissioningTimeout()); // resending the allocate request returns the same result response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1); @@ -146,6 +184,7 @@ public void testAMRMUnusableNodes() throws Exception { nr = updatedNodes.iterator().next(); Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState()); + Assert.assertNull(nr.getDecommissioningTimeout()); syncNodeLost(nm3); @@ -159,6 +198,7 @@ public void testAMRMUnusableNodes() throws Exception { nr = updatedNodes.iterator().next(); Assert.assertEquals(nm3.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.LOST, nr.getNodeState()); + Assert.assertNull(nr.getDecommissioningTimeout()); // registering another AM gives it the complete failed list RMApp app2 = rm.submitApp(2000); @@ -190,6 +230,7 @@ public void testAMRMUnusableNodes() throws Exception { nr = updatedNodes.iterator().next(); Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.RUNNING, nr.getNodeState()); + Assert.assertNull(nr.getDecommissioningTimeout()); allocateRequest2 = AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null, @@ -200,6 +241,7 @@ public void testAMRMUnusableNodes() throws Exception { nr = updatedNodes.iterator().next(); Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.RUNNING, nr.getNodeState()); + Assert.assertNull(nr.getDecommissioningTimeout()); // subsequent allocate calls should return no updated nodes allocateRequest2 = From 506f7defbf217c6b8b525a90604e2484573bba8d Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 1 Nov 2017 18:19:49 -0700 Subject: [PATCH 03/13] fix TestClientRMService adapt test to decommission timeout checks being independent of received heartbeats --- .../yarn/server/resourcemanager/TestClientRMService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 8979e69d2335e..5f72e212e38c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -192,9 +192,12 @@ protected ClientRMService createClientRMService() { }; rm.start(); - MockNM decommissioningNodeWithTimeout = rm.registerNode("host1:1234", 1024); + int nodeMemory = 1024; + MockNM decommissioningNodeWithTimeout = rm.registerNode("host1:1234", nodeMemory); rm.sendNodeStarted(decommissioningNodeWithTimeout); decommissioningNodeWithTimeout.nodeHeartbeat(true); + // keep the node busy so it doesn't get DECOMMISSIONED immediately + RMApp app1 = rm.submitApp(nodeMemory / 2); Integer decommissioningTimeout = 600; rm.sendNodeGracefulDecommission(decommissioningNodeWithTimeout, decommissioningTimeout); rm.waitForState(decommissioningNodeWithTimeout.getNodeId(), NodeState.DECOMMISSIONING); From 9a82f314feca6e06e067ceb1b64e0e4e4fad3882 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 2 Nov 2017 11:07:29 -0700 Subject: [PATCH 04/13] use xml format for excludes files with timeouts in this version that is the only way to specify a timeout in the excludes file --- .../resourcemanager/TestClientRMService.java | 3 +- .../TestResourceTrackerService.java | 79 ++++++++++++++----- 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 5f72e212e38c4..5408e093a5421 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -196,8 +196,9 @@ protected ClientRMService createClientRMService() { MockNM decommissioningNodeWithTimeout = rm.registerNode("host1:1234", nodeMemory); rm.sendNodeStarted(decommissioningNodeWithTimeout); decommissioningNodeWithTimeout.nodeHeartbeat(true); + rm.waitForState(decommissioningNodeWithTimeout.getNodeId(), NodeState.RUNNING); // keep the node busy so it doesn't get DECOMMISSIONED immediately - RMApp app1 = rm.submitApp(nodeMemory / 2); + rm.submitApp(nodeMemory); Integer decommissioningTimeout = 600; rm.sendNodeGracefulDecommission(decommissioningNodeWithTimeout, decommissioningTimeout); rm.waitForState(decommissioningNodeWithTimeout.getNodeId(), NodeState.DECOMMISSIONING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 547a6e5d4a729..8296664951413 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -26,9 +26,7 @@ import static org.mockito.Mockito.when; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; @@ -42,7 +40,14 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.OutputKeys; + +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.MetricsSystem; @@ -102,14 +107,19 @@ import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Element; public class TestResourceTrackerService extends NodeLabelTestBase { private final static File TEMP_DIR = new File(System.getProperty( "test.build.data", "/tmp"), "decommision"); - private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); + private final File hostFile = + new File(TEMP_DIR + File.separator + "hostFile.txt"); private final File excludeHostFile = new File(TEMP_DIR + File.separator + "excludeHostFile.txt"); + private final File excludeHostXmlFile = + new File(TEMP_DIR + File.separator + "excludeHostFile.xml"); private MockRM rm; @@ -297,9 +307,9 @@ public void testGracefulDecommissionNoApp() throws Exception { @Test public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception { Configuration conf = new Configuration(); - conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile .getAbsolutePath()); - writeToHostsFile(""); + writeToHostsXmlFile(excludeHostXmlFile, Pair.of("", null)); final File yarnSite = new File(TEMP_DIR + File.separator + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); conf.writeXml(new FileWriter(yarnSite)); @@ -307,9 +317,10 @@ public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception rm.start(); rm.getNodesListManager().getDynamicConf().addResource(yarnSite.toURI().toURL()); - MockNM nm1 = rm.registerNode("host1:1234", 5120); - MockNM nm2 = rm.registerNode("host2:5678", 5120); - MockNM nm3 = rm.registerNode("host3:9101", 5120); + int nodeMemory = 1024; + MockNM nm1 = rm.registerNode("host1:1234", nodeMemory); + MockNM nm2 = rm.registerNode("host2:5678", nodeMemory); + MockNM nm3 = rm.registerNode("host3:9101", nodeMemory); NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); @@ -322,11 +333,12 @@ public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); - + // Graceful decommission both host1 and host2, with non default timeout for host1 final Integer nm1DecommissionTimeout = 20; - writeToHostsFile(nm1.getNodeId().getHost() + " " + nm1DecommissionTimeout, - nm2.getNodeId().getHost()); + writeToHostsXmlFile( + excludeHostXmlFile, Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout), + Pair.of(nm2.getNodeId().getHost(), null)); rm.getNodesListManager().refreshNodes(conf, true); rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); @@ -338,7 +350,7 @@ public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception // Graceful decommission host3 with a new default timeout final Integer newDefaultDecTimeout = defaultDecTimeout + 10; - writeToHostsFile(nm3.getNodeId().getHost()); + writeToHostsXmlFile(excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null)); Configuration updatedConf = new YarnConfiguration(conf); updatedConf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, newDefaultDecTimeout); @@ -2024,16 +2036,20 @@ private void testNodeRemovalUtilUnhealthy(boolean doGraceful) rm.stop(); } + private void ensureFileExists(File file) throws IOException { + if (!file.exists()) { + TEMP_DIR.mkdirs(); + file.createNewFile(); + } + } + private void writeToHostsFile(String... hosts) throws IOException { writeToHostsFile(hostFile, hosts); } - + private void writeToHostsFile(File file, String... hosts) throws IOException { - if (!file.exists()) { - TEMP_DIR.mkdirs(); - file.createNewFile(); - } + ensureFileExists(file); FileOutputStream fStream = null; try { fStream = new FileOutputStream(file); @@ -2048,7 +2064,32 @@ private void writeToHostsFile(File file, String... hosts) } } } - + + private void writeToHostsXmlFile(File file, Pair... hostsAndTimeouts) + throws Exception { + ensureFileExists(file); + DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); + Document doc = dbFactory.newDocumentBuilder().newDocument(); + Element hosts = doc.createElement("hosts"); + doc.appendChild(hosts); + for (Pair hostsAndTimeout : hostsAndTimeouts) { + Element host = doc.createElement("host"); + hosts.appendChild(host); + Element name = doc.createElement("name"); + host.appendChild(name); + name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft())); + if (hostsAndTimeout.getRight() != null) { + Element timeout = doc.createElement("timeout"); + host.appendChild(timeout); + timeout.appendChild(doc.createTextNode(hostsAndTimeout.getRight().toString())); + } + } + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + Transformer transformer = transformerFactory.newTransformer(); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.transform(new DOMSource(doc), new StreamResult(file)); + } + private void checkDecommissionedNMCount(MockRM rm, int count) throws InterruptedException { int waitCount = 0; From 0ad050852b35480ee253e60a82e22f50ba856450 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 2 Nov 2017 13:41:53 -0700 Subject: [PATCH 05/13] load dynamic timeout like in hadoop trunk replace dynamic conf by using the configuration passed by AdminService --- .../resourcemanager/NodesListManager.java | 18 +++++++--------- .../resourcemanager/TestClientRMService.java | 14 ++++++------- .../TestResourceTrackerService.java | 21 +++++++------------ 3 files changed, 21 insertions(+), 32 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 5c832526ad973..4f862442c3776 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -76,7 +76,6 @@ public class NodesListManager extends CompositeService implements // Negative value indicates no timeout. 0 means immediate. private int defaultDecTimeoutSecs = YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT; - private final Configuration dynamicConf = new YarnConfiguration(); private String includesFile; private String excludesFile; @@ -222,7 +221,7 @@ private void refreshHostsReader( throws IOException, YarnException { // resolve the default timeout to the decommission timeout that is // configured at this moment - timeout = (timeout == null) ? readDecommissioningTimeout() : timeout; + timeout = (timeout == null) ? readDecommissioningTimeout(yarnConf) : timeout; if (null == yarnConf) { yarnConf = new YarnConfiguration(); } @@ -261,7 +260,7 @@ private void setDecomissionedNMs() { // Gracefully decommission excluded nodes that are not already // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes // that are already DECOMMISSIONED or DECOMMISSIONING. - private void handleExcludeNodeList(boolean graceful, Integer timeout) { + private void handleExcludeNodeList(boolean graceful, int timeout) { // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned. List nodesToRecom = new ArrayList(); @@ -622,11 +621,13 @@ public void refreshNodesForcefully() { // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. // This enables NodesListManager to pick up new value without ResourceManager restart. - private int readDecommissioningTimeout() { + private int readDecommissioningTimeout(Configuration conf) { try { - dynamicConf.reloadConfiguration(); + if (conf == null) { + conf = new YarnConfiguration(); + } int configuredDefaultDecTimeoutSecs = - dynamicConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) { defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs; @@ -638,11 +639,6 @@ private int readDecommissioningTimeout() { return defaultDecTimeoutSecs; } - @VisibleForTesting - Configuration getDynamicConf() { - return this.dynamicConf; - } - /** * A NodeId instance needed upon startup for populating inactive nodes Map. * It only knows the hostname/ip and marks the port to -1 or invalid. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 5408e093a5421..7bbce3bab5e0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -193,15 +193,13 @@ protected ClientRMService createClientRMService() { rm.start(); int nodeMemory = 1024; - MockNM decommissioningNodeWithTimeout = rm.registerNode("host1:1234", nodeMemory); - rm.sendNodeStarted(decommissioningNodeWithTimeout); - decommissioningNodeWithTimeout.nodeHeartbeat(true); - rm.waitForState(decommissioningNodeWithTimeout.getNodeId(), NodeState.RUNNING); - // keep the node busy so it doesn't get DECOMMISSIONED immediately - rm.submitApp(nodeMemory); + MockNM nm1 = rm.registerNode("host1:1234", nodeMemory); + rm.sendNodeStarted(nm1); + nm1.nodeHeartbeat(true); + rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); Integer decommissioningTimeout = 600; - rm.sendNodeGracefulDecommission(decommissioningNodeWithTimeout, decommissioningTimeout); - rm.waitForState(decommissioningNodeWithTimeout.getNodeId(), NodeState.DECOMMISSIONING); + rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout); + rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); // Create a client. Configuration conf = new Configuration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 8296664951413..82190673fb0d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -309,31 +309,28 @@ public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile .getAbsolutePath()); + writeToHostsXmlFile(excludeHostXmlFile, Pair.of("", null)); - final File yarnSite = new File(TEMP_DIR + File.separator + - YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - conf.writeXml(new FileWriter(yarnSite)); rm = new MockRM(conf); rm.start(); - rm.getNodesListManager().getDynamicConf().addResource(yarnSite.toURI().toURL()); - + int nodeMemory = 1024; MockNM nm1 = rm.registerNode("host1:1234", nodeMemory); MockNM nm2 = rm.registerNode("host2:5678", nodeMemory); MockNM nm3 = rm.registerNode("host3:9101", nodeMemory); - + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); - + Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); - + rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); - + // Graceful decommission both host1 and host2, with non default timeout for host1 final Integer nm1DecommissionTimeout = 20; writeToHostsXmlFile( @@ -347,14 +344,12 @@ public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); Assert.assertEquals(defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId())); - + // Graceful decommission host3 with a new default timeout final Integer newDefaultDecTimeout = defaultDecTimeout + 10; writeToHostsXmlFile(excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null)); - Configuration updatedConf = new YarnConfiguration(conf); - updatedConf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, newDefaultDecTimeout); - updatedConf.writeXml(new FileWriter(yarnSite)); rm.getNodesListManager().refreshNodes(conf, true); rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); Assert.assertEquals(newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId())); From 258e29a722b19d02c3e7a747e64c22496646033d Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 17 Nov 2017 10:49:50 -0800 Subject: [PATCH 06/13] add new NodesListManagerEventType.NODE_DECOMMISSIONING use it for notifying nodes transitions to DECOMMISSIONG state --- .../resourcemanager/NodesListManager.java | 42 ++++++++++--------- .../NodesListManagerEventType.java | 3 +- .../rmapp/RMAppNodeUpdateEvent.java | 3 +- .../resourcemanager/rmnode/RMNodeImpl.java | 2 +- .../TestRMNodeTransitions.java | 2 +- 5 files changed, 28 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 4f862442c3776..7b64f935f46d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -470,6 +470,20 @@ private boolean isValidNode( .contains(ip)) && !(excludeList.contains(hostName) || excludeList.contains(ip)); } + + private void sendRMAppNodeUpdateEventToNonFinalizedApps( + RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) { + for(RMApp app : rmContext.getRMApps().values()) { + if (!app.isAppFinalStateStored()) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, + appNodeUpdateType)); + } + } + } @Override public void handle(NodesListManagerEvent event) { @@ -477,30 +491,18 @@ public void handle(NodesListManagerEvent event) { switch (event.getType()) { case NODE_UNUSABLE: LOG.debug(eventNode + " reported unusable"); - for(RMApp app: rmContext.getRMApps().values()) { - if (!app.isAppFinalStateStored()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - RMAppNodeUpdateType.NODE_UNUSABLE)); - } - } + sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode, RMAppNodeUpdateType.NODE_UNUSABLE); break; case NODE_USABLE: LOG.debug(eventNode + " reported usable"); - for (RMApp app : rmContext.getRMApps().values()) { - if (!app.isAppFinalStateStored()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - RMAppNodeUpdateType.NODE_USABLE)); - } - } + sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode, RMAppNodeUpdateType.NODE_USABLE); + break; + case NODE_DECOMMISSIONING: + LOG.debug(eventNode + " reported decommissioning"); + sendRMAppNodeUpdateEventToNonFinalizedApps( + eventNode, RMAppNodeUpdateType.NODE_DECOMMISSIONING); break; + default: LOG.error("Ignoring invalid eventtype " + event.getType()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java index 2afc8e6e4a700..db16bc4e121c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java @@ -20,5 +20,6 @@ public enum NodesListManagerEventType { NODE_USABLE, - NODE_UNUSABLE + NODE_UNUSABLE, + NODE_DECOMMISSIONING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java index ba8af9801f77d..d75b047c1edcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java @@ -25,7 +25,8 @@ public class RMAppNodeUpdateEvent extends RMAppEvent { public enum RMAppNodeUpdateType { NODE_USABLE, - NODE_UNUSABLE + NODE_UNUSABLE, + NODE_DECOMMISSIONING } private final RMNode node; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 4419522caad02..4a9e6a916ba43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1164,7 +1164,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // could take any required actions. rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); + NodesListManagerEventType.NODE_DECOMMISSIONING, rmNode)); if (rmNode.originalTotalCapability == null){ rmNode.originalTotalCapability = Resources.clone(rmNode.totalCapability); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index e0ca4dd505a1a..1ef6e397aa128 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -1015,7 +1015,7 @@ public void testResourceUpdateOnDecommissioningNode() { Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); Assert.assertNotNull(nodesListManagerEvent); - Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + Assert.assertEquals(NodesListManagerEventType.NODE_DECOMMISSIONING, nodesListManagerEvent.getType()); } From d881d0640c5b02c1656ac30b47f53ff59ba34148 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 20 Nov 2017 14:21:27 -0800 Subject: [PATCH 07/13] add default implementation for new methods in NodeReport so it works ok for older versions --- .../java/org/apache/hadoop/yarn/api/records/NodeReport.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java index 1f41e179a1754..3b3d89383bb6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java @@ -233,10 +233,12 @@ public void setAggregatedContainersUtilization(ResourceUtilization * Optional decommissioning timeout in seconds (null indicates absent timeout). * @return the decommissioning timeout in second. */ - public abstract Integer getDecommissioningTimeout(); + public Integer getDecommissioningTimeout() { + return null; + } /** * Set the decommissioning timeout in seconds (null indicates absent timeout). * */ - public abstract void setDecommissioningTimeout(Integer decommissioningTimeout); + public void setDecommissioningTimeout(Integer decommissioningTimeout) {} } From 3d4d0cd5b16acefaef0db953dc88c96b7ee9a2c9 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 20 Nov 2017 14:22:38 -0800 Subject: [PATCH 08/13] revert unnecessary changes in RMNodeDecommissioningEvent --- .../rmnode/RMNodeDecommissioningEvent.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java index 47e85395a41b6..9955e9ea4fdc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java @@ -25,17 +25,17 @@ * */ public class RMNodeDecommissioningEvent extends RMNodeEvent { - // decommissioning timeout in second. - private final int decommissioningTimeout; + // Optional decommissioning timeout in second. + private final Integer decommissioningTimeout; // Create instance with optional timeout // (timeout could be null which means use default). - public RMNodeDecommissioningEvent(NodeId nodeId, int timeout) { + public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) { super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION); this.decommissioningTimeout = timeout; } - public int getDecommissioningTimeout() { + public Integer getDecommissioningTimeout() { return this.decommissioningTimeout; } } From 870580750bbcb7f89175e1464204a3586c8a32e1 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 20 Nov 2017 17:17:29 -0800 Subject: [PATCH 09/13] add optional NodeUpdateType field to NodeReport --- .../hadoop/yarn/api/records/NodeReport.java | 18 ++++++++++++++++-- .../yarn/api/records/NodeUpdateType.java | 7 +++++++ .../src/main/proto/yarn_protos.proto | 7 +++++++ .../hadoop/yarn/client/cli/TestYarnCLI.java | 2 +- .../api/records/impl/pb/NodeReportPBImpl.java | 18 ++++++++++++++++++ .../yarn/api/records/impl/pb/ProtoUtils.java | 14 +++++++++++++- .../hadoop/yarn/server/utils/BuilderUtils.java | 12 ++++++++---- .../resourcemanager/ClientRMService.java | 4 ++-- .../resourcemanager/DefaultAMSProcessor.java | 11 ++++++++--- .../server/resourcemanager/rmapp/RMApp.java | 9 ++++++--- .../resourcemanager/rmapp/RMAppImpl.java | 11 +++++------ .../rmapp/RMAppNodeUpdateEvent.java | 7 ++++++- .../applicationsmanager/MockAsm.java | 4 ++-- .../resourcemanager/rmapp/MockRMApp.java | 4 ++-- 14 files changed, 101 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java index 3b3d89383bb6c..31426319f110d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java @@ -53,7 +53,7 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { return newInstance(nodeId, nodeState, httpAddress, rackName, used, - capability, numContainers, healthReport, lastHealthReportTime, null, null); + capability, numContainers, healthReport, lastHealthReportTime, null, null, null); } @Private @@ -61,7 +61,7 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime, - Set nodeLabels, Integer decommissioningTimeout) { + Set nodeLabels, Integer decommissioningTimeout, NodeUpdateType nodeUpdateType) { NodeReport nodeReport = Records.newRecord(NodeReport.class); nodeReport.setNodeId(nodeId); nodeReport.setNodeState(nodeState); @@ -74,6 +74,7 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, nodeReport.setLastHealthReportTime(lastHealthReportTime); nodeReport.setNodeLabels(nodeLabels); nodeReport.setDecommissioningTimeout(decommissioningTimeout); + nodeReport.setNodeUpdateType(nodeUpdateType); return nodeReport; } @@ -241,4 +242,17 @@ public Integer getDecommissioningTimeout() { * Set the decommissioning timeout in seconds (null indicates absent timeout). * */ public void setDecommissioningTimeout(Integer decommissioningTimeout) {} + + /** + * Optional node update type (null indicates absent update type). + * @return the node update. + */ + public NodeUpdateType getNodeUpdateType() { + return NodeUpdateType.NODE_UNUSABLE; + } + + /** + * Set the node update type (null indicates absent node update type). + * */ + public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {} } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java new file mode 100644 index 0000000000000..5eb5ab4e6ab6d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.yarn.api.records; + +public enum NodeUpdateType { + NODE_USABLE, + NODE_UNUSABLE, + NODE_DECOMMISSIONING +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index fa27be5bacb21..b89c3d8b2c64c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -332,6 +332,12 @@ message NodeIdProto { optional int32 port = 2; } +enum NodeUpdateTypeProto { + NODE_USABLE = 0; + NODE_UNUSABLE = 1; + NODE_DECOMMISSIONING = 2; +} + message NodeReportProto { optional NodeIdProto nodeId = 1; optional string httpAddress = 2; @@ -346,6 +352,7 @@ message NodeReportProto { optional ResourceUtilizationProto containers_utilization = 11; optional ResourceUtilizationProto node_utilization = 12; optional uint32 decommissioning_timeout = 13; + optional NodeUpdateTypeProto node_update_type = 14; } message NodeIdToLabelsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index e9338ffbf8acd..20244593dee94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -1992,7 +1992,7 @@ private List getNodeReports(int noOfNodes, NodeState state, NodeReport nodeReport = NodeReport.newInstance(NodeId .newInstance("host" + i, 0), state, "host" + 1 + ":8888", "rack1", Records.newRecord(Resource.class), Records - .newRecord(Resource.class), 0, "", 0, nodeLabels, null); + .newRecord(Resource.class), 0, "", 0, nodeLabels, null, null); if (!emptyResourceUtilization) { ResourceUtilization containersUtilization = ResourceUtilization .newInstance(1024, 2048, 4); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java index b7accf758a927..5269ab7681921 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -403,4 +404,21 @@ public void setDecommissioningTimeout(Integer decommissioningTimeout) { } builder.setDecommissioningTimeout(decommissioningTimeout); } + + @Override + public NodeUpdateType getNodeUpdateType() { + NodeReportProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasNodeUpdateType()) ? + ProtoUtils.convertFromProtoFormat(p.getNodeUpdateType()) : null; + } + + @Override + public void setNodeUpdateType(NodeUpdateType nodeUpdateType) { + maybeInitBuilder(); + if (nodeUpdateType == null) { + builder.clearNodeUpdateType(); + return; + } + builder.setNodeUpdateType(ProtoUtils.convertToProtoFormat(nodeUpdateType)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 528cf8e600327..e0acaa7e2ab8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; @@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -342,7 +344,17 @@ public static ContainerTypeProto convertToProtoFormat(ContainerType e) { public static ContainerType convertFromProtoFormat(ContainerTypeProto e) { return ContainerType.valueOf(e.name()); } - + + /* + * NodeUpdateType + */ + public static NodeUpdateTypeProto convertToProtoFormat(NodeUpdateType e) { + return NodeUpdateTypeProto.valueOf(e.name()); + } + public static NodeUpdateType convertFromProtoFormat(NodeUpdateTypeProto e) { + return NodeUpdateType.valueOf(e.name()); + } + /* * ExecutionType */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 1a90e785cede0..8df8a21896827 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -187,23 +188,25 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { return newNodeReport(nodeId, nodeState, httpAddress, rackName, used, - capability, numContainers, healthReport, lastHealthReportTime, null, null); + capability, numContainers, healthReport, lastHealthReportTime, null, null, null); } public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime, - Set nodeLabels, Integer decommissioningTimeout) { + Set nodeLabels, Integer decommissioningTimeout, + NodeUpdateType nodeUpdateType) { return newNodeReport(nodeId, nodeState, httpAddress, rackName, used, capability, numContainers, healthReport, lastHealthReportTime, - nodeLabels, null, null, decommissioningTimeout); + nodeLabels, null, null, decommissioningTimeout, nodeUpdateType); } public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime, Set nodeLabels, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization, Integer decommissioningTimeout) { + ResourceUtilization nodeUtilization, Integer decommissioningTimeout, + NodeUpdateType nodeUpdateType) { NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class); nodeReport.setNodeId(nodeId); nodeReport.setNodeState(nodeState); @@ -218,6 +221,7 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, nodeReport.setAggregatedContainersUtilization(containersUtilization); nodeReport.setNodeUtilization(nodeUtilization); nodeReport.setDecommissioningTimeout(decommissioningTimeout); + nodeReport.setNodeUpdateType(nodeUpdateType); return nodeReport; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 63e998a041b38..ce4165f010d74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1024,7 +1024,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) return response; } - private NodeReport createNodeReports(RMNode rmNode) { + private NodeReport createNodeReports(RMNode rmNode) { SchedulerNodeReport schedulerNodeReport = scheduler.getNodeReport(rmNode.getNodeID()); Resource used = BuilderUtils.newResource(0, 0); @@ -1040,7 +1040,7 @@ private NodeReport createNodeReports(RMNode rmNode) { rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(), - rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout()); + rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(), null); return report; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index d629f30616a98..097eb7d4eb76d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionMessage; @@ -83,6 +84,8 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.Set; /** @@ -326,10 +329,11 @@ public void allocate(ApplicationAttemptId appAttemptId, } private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { - List updatedNodes = new ArrayList<>(); + Map updatedNodes = new HashMap<>(); if(app.pullRMNodeUpdates(updatedNodes) > 0) { List updatedNodeReports = new ArrayList<>(); - for(RMNode rmNode: updatedNodes) { + for(Map.Entry rmNodeEntry: updatedNodes.entrySet()) { + RMNode rmNode = rmNodeEntry.getKey(); SchedulerNodeReport schedulerNodeReport = getScheduler().getNodeReport(rmNode.getNodeID()); Resource used = BuilderUtils.newResource(0, 0); @@ -344,7 +348,8 @@ private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { rmNode.getHttpAddress(), rmNode.getRackName(), used, rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), - rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout()); + rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout(), + rmNodeEntry.getValue()); updatedNodeReports.add(report); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 93c41b6747c85..72c070358ffcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -154,10 +155,12 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * received by the RMApp. Updates can be node becoming lost or becoming * healthy etc. The method clears the information from the {@link RMApp}. So * each call to this method gives the delta from the previous call. - * @param updatedNodes Collection into which the updates are transferred - * @return the number of nodes added to the {@link Collection} + * @param updatedNodes Map into which the updates are transferred, with each + * node updates as the key, and the {@link NodeUpdateType} for that update + * as the corresponding value. + * @return the number of nodes added to the {@link Map} */ - int pullRMNodeUpdates(Collection updatedNodes); + int pullRMNodeUpdates(Map updatedNodes); /** * The finish time of the {@link RMApp} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index cfb8a74f59a47..cd4eba7d6899d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -20,11 +20,9 @@ import java.net.InetAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -59,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -145,7 +144,7 @@ public class RMAppImpl implements RMApp, Recoverable { private final Map attempts = new LinkedHashMap(); private final long submitTime; - private final Set updatedNodes = new HashSet(); + private final Map updatedNodes = new HashMap<>(); private final String applicationType; private final Set applicationTags; @@ -671,11 +670,11 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { } @Override - public int pullRMNodeUpdates(Collection updatedNodes) { + public int pullRMNodeUpdates(Map updatedNodes) { this.writeLock.lock(); try { int updatedNodeCount = this.updatedNodes.size(); - updatedNodes.addAll(this.updatedNodes); + updatedNodes.putAll(this.updatedNodes); this.updatedNodes.clear(); return updatedNodeCount; } finally { @@ -981,7 +980,7 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) { private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { NodeState nodeState = node.getState(); - updatedNodes.add(node); + updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type)); LOG.debug("Received node update event:" + type + " for node:" + node + " with state:" + nodeState); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java index d75b047c1edcf..2666aacd33bf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class RMAppNodeUpdateEvent extends RMAppEvent { @@ -26,7 +27,11 @@ public class RMAppNodeUpdateEvent extends RMAppEvent { public enum RMAppNodeUpdateType { NODE_USABLE, NODE_UNUSABLE, - NODE_DECOMMISSIONING + NODE_DECOMMISSIONING; + + public static NodeUpdateType convertToNodeUpdateType(RMAppNodeUpdateType rmAppNodeUpdateType) { + return NodeUpdateType.valueOf(rmAppNodeUpdateType.name()); + } } private final RMNode node; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 399df02465e64..9ef48dbf8831c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -154,7 +154,7 @@ public FinalApplicationStatus getFinalApplicationStatus() { throw new UnsupportedOperationException("Not supported yet."); } @Override - public int pullRMNodeUpdates(Collection updatedNodes) { + public int pullRMNodeUpdates(Map updatedNodes) { throw new UnsupportedOperationException("Not supported yet."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 39a7f995ab61e..6c64a6709e72c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; -import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -235,7 +235,7 @@ public FinalApplicationStatus getFinalApplicationStatus() { } @Override - public int pullRMNodeUpdates(Collection updatedNodes) { + public int pullRMNodeUpdates(Map updatedNodes) { throw new UnsupportedOperationException("Not supported yet."); } From cb503b0a49008a4388f103fd5313fe68af189a3e Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 21 Nov 2017 15:45:26 -0800 Subject: [PATCH 10/13] add assertions for NodeReport.getNodeUpdateType --- .../hadoop/yarn/server/resourcemanager/MockRM.java | 1 - .../server/resourcemanager/TestClientRMService.java | 8 ++++++-- .../TestDecommissioningNodesWatcher.java | 2 +- .../applicationsmanager/TestAMRMRPCNodeUpdates.java | 11 +++++++++-- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 3f9ebd3457d87..8bdb03ed277b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -918,7 +918,6 @@ public void waitForState(NodeId nodeId, NodeState finalState) node.getState()); } - public void sendNodeGracefulDecommission(MockNM nm, int timeout) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); Assert.assertNotNull("node shouldn't be null", node); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 7bbce3bab5e0b..b075b3f54f787 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -214,8 +214,9 @@ protected ClientRMService createClientRMService() { List nodeReports = client.getClusterNodes( GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.DECOMMISSIONING))).getNodeReports(); Assert.assertEquals(1, nodeReports.size()); - Assert.assertEquals(decommissioningTimeout, - nodeReports.iterator().next().getDecommissioningTimeout()); + NodeReport nr = nodeReports.iterator().next(); + Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); + Assert.assertNull(nr.getNodeUpdateType()); rpc.stopProxy(client, conf); rm.close(); @@ -270,6 +271,7 @@ protected ClientRMService createClientRMService() { // Check node's label = x Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); + Assert.assertNull(nodeReports.get(0).getNodeUpdateType()); // Now make the node unhealthy. node.nodeHeartbeat(false); @@ -294,6 +296,7 @@ protected ClientRMService createClientRMService() { Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); + Assert.assertNull(nodeReports.get(0).getNodeUpdateType()); // Remove labels of host1 map = new HashMap>(); @@ -311,6 +314,7 @@ protected ClientRMService createClientRMService() { Assert.assertTrue(report.getNodeLabels() != null && report.getNodeLabels().isEmpty()); Assert.assertNull(report.getDecommissioningTimeout()); + Assert.assertNull(report.getNodeUpdateType()); } rpc.stopProxy(client, conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java index 25934c5c49099..3057f27d8f749 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java @@ -69,9 +69,9 @@ public void testDecommissioningNodesWatcher() throws Exception { MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1); // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. - rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION); rm.sendNodeGracefulDecommission(nm1, YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); + rm.waitForState(id1, NodeState.DECOMMISSIONING); // Update status with decreasing number of running containers until 0. watcher.update(node1, createNodeStatus(id1, app, 12)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index ca12b57b40fb6..54bf41a24583e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.NodeUpdateType; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -130,8 +131,9 @@ public void testAMRMDecommissioningNodes() throws Exception { allocate(attempt1.getAppAttemptId(), allocateRequest1); List updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); - Assert.assertEquals(decommissioningTimeout, - updatedNodes.iterator().next().getDecommissioningTimeout()); + NodeReport nr = updatedNodes.iterator().next(); + Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); + Assert.assertEquals(NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType()); } @@ -176,6 +178,7 @@ public void testAMRMUnusableNodes() throws Exception { Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState()); Assert.assertNull(nr.getDecommissioningTimeout()); + Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType()); // resending the allocate request returns the same result response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1); @@ -185,6 +188,7 @@ public void testAMRMUnusableNodes() throws Exception { Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState()); Assert.assertNull(nr.getDecommissioningTimeout()); + Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType()); syncNodeLost(nm3); @@ -199,6 +203,7 @@ public void testAMRMUnusableNodes() throws Exception { Assert.assertEquals(nm3.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.LOST, nr.getNodeState()); Assert.assertNull(nr.getDecommissioningTimeout()); + Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType()); // registering another AM gives it the complete failed list RMApp app2 = rm.submitApp(2000); @@ -231,6 +236,7 @@ public void testAMRMUnusableNodes() throws Exception { Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.RUNNING, nr.getNodeState()); Assert.assertNull(nr.getDecommissioningTimeout()); + Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType()); allocateRequest2 = AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null, @@ -242,6 +248,7 @@ public void testAMRMUnusableNodes() throws Exception { Assert.assertEquals(nm4.getNodeId(), nr.getNodeId()); Assert.assertEquals(NodeState.RUNNING, nr.getNodeState()); Assert.assertNull(nr.getDecommissioningTimeout()); + Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType()); // subsequent allocate calls should return no updated nodes allocateRequest2 = From af2bf8532f4c2eaf8546b2999debc8e80473608c Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 21 Nov 2017 16:00:45 -0800 Subject: [PATCH 11/13] fix boxing-unboxing bug --- .../hadoop/yarn/server/resourcemanager/NodesListManager.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 7b64f935f46d9..c9c8f86133721 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -221,7 +221,9 @@ private void refreshHostsReader( throws IOException, YarnException { // resolve the default timeout to the decommission timeout that is // configured at this moment - timeout = (timeout == null) ? readDecommissioningTimeout(yarnConf) : timeout; + if (null == timeout) { + timeout = readDecommissioningTimeout(yarnConf); + } if (null == yarnConf) { yarnConf = new YarnConfiguration(); } From 32c441d6085f8005970a74588188ca9d75695d01 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 21 Nov 2017 18:29:16 -0800 Subject: [PATCH 12/13] fix checkstyle errors --- .../hadoop/yarn/api/records/NodeReport.java | 21 +++++----- .../yarn/api/records/NodeUpdateType.java | 22 +++++++++++ .../yarn/client/ProtocolHATestBase.java | 14 +++---- .../api/records/impl/pb/NodeReportPBImpl.java | 18 ++++++--- .../yarn/server/utils/BuilderUtils.java | 3 +- .../resourcemanager/ClientRMService.java | 3 +- .../resourcemanager/NodesListManager.java | 20 ++++++---- .../rmapp/RMAppNodeUpdateEvent.java | 3 +- .../resourcemanager/rmnode/RMNodeImpl.java | 4 +- .../yarn/server/resourcemanager/MockRM.java | 9 +++-- .../resourcemanager/TestClientRMService.java | 8 ++-- .../TestRMNodeTransitions.java | 4 +- .../TestResourceTrackerService.java | 39 ++++++++++++------- .../TestAMRMRPCNodeUpdates.java | 11 ++++-- 14 files changed, 118 insertions(+), 61 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java index 31426319f110d..b78677f89668b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java @@ -53,7 +53,8 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { return newInstance(nodeId, nodeState, httpAddress, rackName, used, - capability, numContainers, healthReport, lastHealthReportTime, null, null, null); + capability, numContainers, healthReport, lastHealthReportTime, + null, null, null); } @Private @@ -61,7 +62,8 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime, - Set nodeLabels, Integer decommissioningTimeout, NodeUpdateType nodeUpdateType) { + Set nodeLabels, Integer decommissioningTimeout, + NodeUpdateType nodeUpdateType) { NodeReport nodeReport = Records.newRecord(NodeReport.class); nodeReport.setNodeId(nodeId); nodeReport.setNodeState(nodeState); @@ -188,8 +190,8 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, public abstract void setLastHealthReportTime(long lastHealthReport); /** - * Get labels of this node - * @return labels of this node + * Get labels of this node. + * @return labels of this node. */ @Public @Stable @@ -200,8 +202,8 @@ public static NodeReport newInstance(NodeId nodeId, NodeState nodeState, public abstract void setNodeLabels(Set nodeLabels); /** - * Get containers aggregated resource utilization in a node - * @return containers resource utilization + * Get containers aggregated resource utilization in a node. + * @return containers resource utilization. */ @Public @Stable @@ -219,8 +221,8 @@ public void setAggregatedContainersUtilization(ResourceUtilization } /** - * Get node resource utilization - * @return node resource utilization + * Get node resource utilization. + * @return node resource utilization. */ @Public @Stable @@ -231,7 +233,8 @@ public void setAggregatedContainersUtilization(ResourceUtilization public abstract void setNodeUtilization(ResourceUtilization nodeUtilization); /** - * Optional decommissioning timeout in seconds (null indicates absent timeout). + * Optional decommissioning timeout in seconds (null indicates absent + * timeout). * @return the decommissioning timeout in second. */ public Integer getDecommissioningTimeout() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java index 5eb5ab4e6ab6d..9152a6a3d9a49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java @@ -1,5 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.yarn.api.records; +/** + *

Taxonomy of the NodeState that a + * Node might transition into.

+ * */ public enum NodeUpdateType { NODE_USABLE, NODE_UNUSABLE, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 77fb75bd21b5e..54537cec08889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -618,7 +618,7 @@ public CancelDelegationTokenResponse cancelDelegationToken( } public ApplicationReport createFakeAppReport() { - ApplicationId appId = ApplicationId.newInstance(1000l, 1); + ApplicationId appId = ApplicationId.newInstance(1000L, 1); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); // create a fake application report @@ -626,7 +626,7 @@ public ApplicationReport createFakeAppReport() { ApplicationReport.newInstance(appId, attemptId, "fakeUser", "fakeQueue", "fakeApplicationName", "localhost", 0, null, YarnApplicationState.FINISHED, "fake an application report", "", - 1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f, + 1000L, 1200L, FinalApplicationStatus.FAILED, null, "", 50f, "fakeApplicationType", null); return report; } @@ -638,7 +638,7 @@ public List createFakeAppReports() { } public ApplicationId createFakeAppId() { - return ApplicationId.newInstance(1000l, 1); + return ApplicationId.newInstance(1000L, 1); } public ApplicationAttemptId createFakeApplicationAttemptId() { @@ -657,7 +657,7 @@ public List createFakeNodeReports() { NodeId nodeId = NodeId.newInstance("localhost", 0); NodeReport report = NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost", - "rack1", null, null, 4, null, 1000l); + "rack1", null, null, 4, null, 1000L); List reports = new ArrayList(); reports.add(report); return reports; @@ -680,8 +680,8 @@ public List createFakeQueueUserACLInfoList() { public ApplicationAttemptReport createFakeApplicationAttemptReport() { return ApplicationAttemptReport.newInstance( createFakeApplicationAttemptId(), "localhost", 0, "", "", "", - YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000l, - 1200l); + YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000L, + 1200L); } public List @@ -694,7 +694,7 @@ YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000l, public ContainerReport createFakeContainerReport() { return ContainerReport.newInstance(createFakeContainerId(), null, - NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0, + NodeId.newInstance("localhost", 0), null, 1000L, 1200L, "", "", 0, ContainerState.COMPLETE, "http://" + NodeId.newInstance("localhost", 0).toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java index 5269ab7681921..6ee62377782dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java @@ -151,8 +151,9 @@ public NodeId getNodeId() { @Override public void setNodeId(NodeId nodeId) { maybeInitBuilder(); - if (nodeId == null) + if (nodeId == null) { builder.clearNodeId(); + } this.nodeId = nodeId; } @@ -178,8 +179,9 @@ public void setNodeState(NodeState nodeState) { @Override public void setCapability(Resource capability) { maybeInitBuilder(); - if (capability == null) + if (capability == null) { builder.clearCapability(); + } this.capability = capability; } @@ -216,8 +218,9 @@ public void setRackName(String rackName) { @Override public void setUsed(Resource used) { maybeInitBuilder(); - if (used == null) + if (used == null) { builder.clearUsed(); + } this.used = used; } @@ -235,8 +238,9 @@ public int hashCode() { @Override public boolean equals(Object other) { - if (other == null) + if (other == null) { return false; + } if (other.getClass().isAssignableFrom(this.getClass())) { return this.getProto().equals(this.getClass().cast(other).getProto()); } @@ -279,8 +283,9 @@ private void mergeLocalToBuilder() { } private void mergeLocalToProto() { - if (viaProto) + if (viaProto) { maybeInitBuilder(); + } mergeLocalToBuilder(); proto = builder.build(); viaProto = true; @@ -392,7 +397,8 @@ public void setNodeUtilization(ResourceUtilization nodeResourceUtilization) { @Override public Integer getDecommissioningTimeout() { NodeReportProtoOrBuilder p = viaProto ? proto : builder; - return (p.hasDecommissioningTimeout()) ? p.getDecommissioningTimeout() : null; + return (p.hasDecommissioningTimeout()) + ? p.getDecommissioningTimeout() : null; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 8df8a21896827..83f912f91381c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -188,7 +188,8 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { return newNodeReport(nodeId, nodeState, httpAddress, rackName, used, - capability, numContainers, healthReport, lastHealthReportTime, null, null, null); + capability, numContainers, healthReport, lastHealthReportTime, + null, null, null); } public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index ce4165f010d74..55a3f0b131a68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1040,7 +1040,8 @@ private NodeReport createNodeReports(RMNode rmNode) { rmNode.getTotalCapability(), numContainers, rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(), - rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(), null); + rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(), + null); return report; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index c9c8f86133721..f0d3a65e815ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -493,11 +493,13 @@ public void handle(NodesListManagerEvent event) { switch (event.getType()) { case NODE_UNUSABLE: LOG.debug(eventNode + " reported unusable"); - sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode, RMAppNodeUpdateType.NODE_UNUSABLE); + sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode, + RMAppNodeUpdateType.NODE_UNUSABLE); break; case NODE_USABLE: LOG.debug(eventNode + " reported usable"); - sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode, RMAppNodeUpdateType.NODE_USABLE); + sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode, + RMAppNodeUpdateType.NODE_USABLE); break; case NODE_DECOMMISSIONING: LOG.debug(eventNode + " reported decommissioning"); @@ -624,18 +626,20 @@ public void refreshNodesForcefully() { } // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. - // This enables NodesListManager to pick up new value without ResourceManager restart. - private int readDecommissioningTimeout(Configuration conf) { + // This enables NodesListManager to pick up new value without + // ResourceManager restart. + private int readDecommissioningTimeout(Configuration pConf) { try { - if (conf == null) { - conf = new YarnConfiguration(); + if (pConf == null) { + pConf = new YarnConfiguration(); } int configuredDefaultDecTimeoutSecs = - conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, + pConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) { defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs; - LOG.info("Use new decommissioningTimeoutSecs: " + defaultDecTimeoutSecs); + LOG.info("Use new decommissioningTimeoutSecs: " + + defaultDecTimeoutSecs); } } catch (Exception e) { LOG.warn("Error readDecommissioningTimeout " + e.getMessage()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java index 2666aacd33bf8..e3af71098def7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java @@ -29,7 +29,8 @@ public enum RMAppNodeUpdateType { NODE_UNUSABLE, NODE_DECOMMISSIONING; - public static NodeUpdateType convertToNodeUpdateType(RMAppNodeUpdateType rmAppNodeUpdateType) { + public static NodeUpdateType convertToNodeUpdateType( + RMAppNodeUpdateType rmAppNodeUpdateType) { return NodeUpdateType.valueOf(rmAppNodeUpdateType.name()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 4a9e6a916ba43..2b013a04c5320 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1160,8 +1160,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Update NM metrics during graceful decommissioning. rmNode.updateMetricsForGracefulDecommission(initState, finalState); rmNode.decommissioningTimeout = timeout; - // Notify NodesListManager to notify all RMApp so that each Application Master - // could take any required actions. + // Notify NodesListManager to notify all RMApp so that each + // Application Master could take any required actions. rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_DECOMMISSIONING, rmNode)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 8bdb03ed277b1..e2ef55555d02e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -918,8 +918,10 @@ public void waitForState(NodeId nodeId, NodeState finalState) node.getState()); } - public void sendNodeGracefulDecommission(MockNM nm, int timeout) throws Exception { - RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); + public void sendNodeGracefulDecommission( + MockNM nm, int timeout) throws Exception { + RMNodeImpl node = (RMNodeImpl) + getRMContext().getRMNodes().get(nm.getNodeId()); Assert.assertNotNull("node shouldn't be null", node); node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout)); } @@ -932,7 +934,8 @@ public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { } public Integer getDecommissioningTimeout(NodeId nodeid) { - return this.getRMContext().getRMNodes().get(nodeid).getDecommissioningTimeout(); + return this.getRMContext().getRMNodes() + .get(nodeid).getDecommissioningTimeout(); } public KillApplicationResponse killApp(ApplicationId appId) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index b075b3f54f787..0ace53de382df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -180,13 +180,13 @@ public class TestClientRMService { private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; - @Test public void testGetDecommissioningClusterNodes() throws Exception { MockRM rm = new MockRM() { protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, - this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, + this.rmAppManager, this.applicationACLsManager, + this.queueACLsManager, this.getRMContext().getRMDelegationTokenSecretManager()); }; }; @@ -212,7 +212,9 @@ protected ClientRMService createClientRMService() { // Make call List nodeReports = client.getClusterNodes( - GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.DECOMMISSIONING))).getNodeReports(); + GetClusterNodesRequest.newInstance( + EnumSet.of(NodeState.DECOMMISSIONING))) + .getNodeReports(); Assert.assertEquals(1, nodeReports.size()); NodeReport nr = nodeReports.iterator().next(); Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 1ef6e397aa128..36571233f0fdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -100,8 +100,8 @@ public void handle(SchedulerEvent event) { } private NodesListManagerEvent nodesListManagerEvent = null; - - private List nodesListManagerEventsNodeStateSequence = new LinkedList<>(); + private List nodesListManagerEventsNodeStateSequence = + new LinkedList<>(); private class TestNodeListManagerEventDispatcher implements EventHandler { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 82190673fb0d1..7b06bb21dc779 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -27,7 +27,6 @@ import java.io.File; import java.io.FileOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -305,7 +304,8 @@ public void testGracefulDecommissionNoApp() throws Exception { } @Test - public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception { + public void testGracefulDecommissionDefaultTimeoutResolution() + throws Exception { Configuration conf = new Configuration(); conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile .getAbsolutePath()); @@ -323,36 +323,45 @@ public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true); - Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); - Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); - Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); + Assert.assertTrue( + NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction())); + Assert.assertTrue( + NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction())); + Assert.assertTrue( + NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction())); rm.waitForState(nm1.getNodeId(), NodeState.RUNNING); rm.waitForState(nm2.getNodeId(), NodeState.RUNNING); rm.waitForState(nm3.getNodeId(), NodeState.RUNNING); - // Graceful decommission both host1 and host2, with non default timeout for host1 + // Graceful decommission both host1 and host2, with + // non default timeout for host1 final Integer nm1DecommissionTimeout = 20; writeToHostsXmlFile( - excludeHostXmlFile, Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout), + excludeHostXmlFile, + Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout), Pair.of(nm2.getNodeId().getHost(), null)); rm.getNodesListManager().refreshNodes(conf, true); rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING); - Assert.assertEquals(nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId())); + Assert.assertEquals( + nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId())); Integer defaultDecTimeout = conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT); - Assert.assertEquals(defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId())); + Assert.assertEquals( + defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId())); // Graceful decommission host3 with a new default timeout final Integer newDefaultDecTimeout = defaultDecTimeout + 10; - writeToHostsXmlFile(excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null)); + writeToHostsXmlFile( + excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null)); conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, newDefaultDecTimeout); rm.getNodesListManager().refreshNodes(conf, true); rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING); - Assert.assertEquals(newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId())); + Assert.assertEquals( + newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId())); } /** @@ -2060,8 +2069,8 @@ private void writeToHostsFile(File file, String... hosts) } } - private void writeToHostsXmlFile(File file, Pair... hostsAndTimeouts) - throws Exception { + private void writeToHostsXmlFile( + File file, Pair... hostsAndTimeouts) throws Exception { ensureFileExists(file); DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); Document doc = dbFactory.newDocumentBuilder().newDocument(); @@ -2076,7 +2085,9 @@ private void writeToHostsXmlFile(File file, Pair... hostsAndTim if (hostsAndTimeout.getRight() != null) { Element timeout = doc.createElement("timeout"); host.appendChild(timeout); - timeout.appendChild(doc.createTextNode(hostsAndTimeout.getRight().toString())); + timeout.appendChild( + doc.createTextNode(hostsAndTimeout.getRight().toString()) + ); } } TransformerFactory transformerFactory = TransformerFactory.newInstance(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index 54bf41a24583e..73e9e51c05cf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -83,13 +83,14 @@ private void syncNodeLost(MockNM nm) throws Exception { rm.drainEvents(); } - private void syncNodeGracefulDecommission(MockNM nm, int timeout) throws Exception { + private void syncNodeGracefulDecommission( + MockNM nm, int timeout) throws Exception { rm.sendNodeGracefulDecommission(nm, timeout); rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING); rm.drainEvents(); } - private AllocateResponse allocate(final ApplicationAttemptId attemptId, + private AllocateResponse allocate(final ApplicationAttemptId attemptId, final AllocateRequest req) throws Exception { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(attemptId.toString()); @@ -132,8 +133,10 @@ public void testAMRMDecommissioningNodes() throws Exception { List updatedNodes = response1.getUpdatedNodes(); Assert.assertEquals(1, updatedNodes.size()); NodeReport nr = updatedNodes.iterator().next(); - Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); - Assert.assertEquals(NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType()); + Assert.assertEquals( + decommissioningTimeout, nr.getDecommissioningTimeout()); + Assert.assertEquals( + NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType()); } From 880a4e6dd15ab507f4a88ab10b0a4dbe72b716f3 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 21 Nov 2017 19:01:12 -0800 Subject: [PATCH 13/13] fix whitespaces --- .../resourcemanager/NodesListManager.java | 10 ++++----- .../yarn/server/resourcemanager/MockRM.java | 6 +++--- .../resourcemanager/TestClientRMService.java | 12 +++++------ .../TestResourceTrackerService.java | 10 ++++----- .../TestAMRMRPCNodeUpdates.java | 21 +++++++++---------- 5 files changed, 29 insertions(+), 30 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index f0d3a65e815ef..647dfa333fe2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -71,7 +71,7 @@ public class NodesListManager extends CompositeService implements private HostsFileReader hostsReader; private Configuration conf; private final RMContext rmContext; - + // Default decommissioning timeout value in seconds. // Negative value indicates no timeout. 0 means immediate. private int defaultDecTimeoutSecs = @@ -472,7 +472,7 @@ private boolean isValidNode( .contains(ip)) && !(excludeList.contains(hostName) || excludeList.contains(ip)); } - + private void sendRMAppNodeUpdateEventToNonFinalizedApps( RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) { for(RMApp app : rmContext.getRMApps().values()) { @@ -506,7 +506,7 @@ public void handle(NodesListManagerEvent event) { sendRMAppNodeUpdateEventToNonFinalizedApps( eventNode, RMAppNodeUpdateType.NODE_DECOMMISSIONING); break; - + default: LOG.error("Ignoring invalid eventtype " + event.getType()); } @@ -624,7 +624,7 @@ public void refreshNodesForcefully() { } } } - + // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml. // This enables NodesListManager to pick up new value without // ResourceManager restart. @@ -646,7 +646,7 @@ private int readDecommissioningTimeout(Configuration pConf) { } return defaultDecTimeoutSecs; } - + /** * A NodeId instance needed upon startup for populating inactive nodes Map. * It only knows the hostname/ip and marks the port to -1 or invalid. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index e2ef55555d02e..711f008d5a7f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -917,7 +917,7 @@ public void waitForState(NodeId nodeId, NodeState finalState) Assert.assertEquals("Node state is not correct (timedout)", finalState, node.getState()); } - + public void sendNodeGracefulDecommission( MockNM nm, int timeout) throws Exception { RMNodeImpl node = (RMNodeImpl) @@ -925,14 +925,14 @@ public void sendNodeGracefulDecommission( Assert.assertNotNull("node shouldn't be null", node); node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout)); } - + public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(nm.getNodeId()); Assert.assertNotNull("node shouldn't be null", node); node.handle(new RMNodeEvent(nm.getNodeId(), event)); } - + public Integer getDecommissioningTimeout(NodeId nodeid) { return this.getRMContext().getRMNodes() .get(nodeid).getDecommissioningTimeout(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 0ace53de382df..1c50dd3f1a840 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -179,7 +179,7 @@ public class TestClientRMService { private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; - + @Test public void testGetDecommissioningClusterNodes() throws Exception { MockRM rm = new MockRM() { @@ -191,7 +191,7 @@ protected ClientRMService createClientRMService() { }; }; rm.start(); - + int nodeMemory = 1024; MockNM nm1 = rm.registerNode("host1:1234", nodeMemory); rm.sendNodeStarted(nm1); @@ -200,7 +200,7 @@ protected ClientRMService createClientRMService() { Integer decommissioningTimeout = 600; rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout); rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); - + // Create a client. Configuration conf = new Configuration(); YarnRPC rpc = YarnRPC.create(conf); @@ -209,7 +209,7 @@ protected ClientRMService createClientRMService() { ApplicationClientProtocol client = (ApplicationClientProtocol) rpc .getProxy(ApplicationClientProtocol.class, rmAddress, conf); - + // Make call List nodeReports = client.getClusterNodes( GetClusterNodesRequest.newInstance( @@ -219,11 +219,11 @@ protected ClientRMService createClientRMService() { NodeReport nr = nodeReports.iterator().next(); Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); Assert.assertNull(nr.getNodeUpdateType()); - + rpc.stopProxy(client, conf); rm.close(); } - + @Test public void testGetClusterNodes() throws Exception { MockRM rm = new MockRM() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 7b06bb21dc779..8931c169eba59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -302,7 +302,7 @@ public void testGracefulDecommissionNoApp() throws Exception { Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction()); Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction()); } - + @Test public void testGracefulDecommissionDefaultTimeoutResolution() throws Exception { @@ -2046,11 +2046,11 @@ private void ensureFileExists(File file) throws IOException { file.createNewFile(); } } - + private void writeToHostsFile(String... hosts) throws IOException { writeToHostsFile(hostFile, hosts); } - + private void writeToHostsFile(File file, String... hosts) throws IOException { ensureFileExists(file); @@ -2068,7 +2068,7 @@ private void writeToHostsFile(File file, String... hosts) } } } - + private void writeToHostsXmlFile( File file, Pair... hostsAndTimeouts) throws Exception { ensureFileExists(file); @@ -2095,7 +2095,7 @@ private void writeToHostsXmlFile( transformer.setOutputProperty(OutputKeys.INDENT, "yes"); transformer.transform(new DOMSource(doc), new StreamResult(file)); } - + private void checkDecommissionedNMCount(MockRM rm, int count) throws InterruptedException { int waitCount = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index 73e9e51c05cf2..dd9bcd08b4990 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -82,14 +82,14 @@ private void syncNodeLost(MockNM nm) throws Exception { rm.sendNodeLost(nm); rm.drainEvents(); } - + private void syncNodeGracefulDecommission( MockNM nm, int timeout) throws Exception { rm.sendNodeGracefulDecommission(nm, timeout); rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING); rm.drainEvents(); } - + private AllocateResponse allocate(final ApplicationAttemptId attemptId, final AllocateRequest req) throws Exception { UserGroupInformation ugi = @@ -105,27 +105,27 @@ public AllocateResponse run() throws Exception { } }); } - + @Test public void testAMRMDecommissioningNodes() throws Exception { MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000); MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000); rm.drainEvents(); - + RMApp app1 = rm.submitApp(2000); - + // Trigger the scheduling so the AM gets 'launched' on nm1 nm1.nodeHeartbeat(true); - + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); - + // register AM returns no unusable node am1.registerAppAttempt(); - + Integer decommissioningTimeout = 600; syncNodeGracefulDecommission(nm2, decommissioningTimeout); - + AllocateRequest allocateRequest1 = AllocateRequest.newInstance(0, 0F, null, null, null); AllocateResponse response1 = @@ -138,8 +138,7 @@ public void testAMRMDecommissioningNodes() throws Exception { Assert.assertEquals( NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType()); } - - + @Test public void testAMRMUnusableNodes() throws Exception {