Skip to content

Commit d8ca2db

Browse files
authored
YARN-11730. Mark unreported nodes as LOST on RM Startup/HA failover (#7049) Contributed by Arjun Mohnot.
Reviewed-by: Shilun Fan <slfan1989@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
1 parent 81faae6 commit d8ca2db

6 files changed

Lines changed: 229 additions & 1 deletion

File tree

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,13 @@ public static boolean isAclEnabled(Configuration conf) {
12771277
RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs";
12781278
public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600;
12791279

1280+
/**
1281+
* Enable/disable tracking of unregistered nodes.
1282+
**/
1283+
public static final String ENABLE_TRACKING_FOR_UNREGISTERED_NODES =
1284+
RM_PREFIX + "enable-tracking-for-unregistered-nodes";
1285+
public static final boolean DEFAULT_ENABLE_TRACKING_FOR_UNREGISTERED_NODES = false;
1286+
12801287
/**
12811288
* Period in seconds of the poll timer task inside DecommissioningNodesWatcher
12821289
* to identify and take care of DECOMMISSIONING nodes missing regular heart beat.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5810,4 +5810,13 @@
58105810
<value>30s</value>
58115811
</property>
58125812

5813+
<property>
5814+
<description>
5815+
The setting that controls whether the ResourceManager should track the nodes as
5816+
lost when they are unregistered and not reported to the RM.
5817+
It doesn't account for decommissioned nodes. Default is false.
5818+
</description>
5819+
<name>yarn.resourcemanager.enable-tracking-for-unregistered-nodes</name>
5820+
<value>false</value>
5821+
</property>
58135822
</configuration>

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ private void refreshHostsReader(
280280
StringUtils.join(",", hostsReader.getExcludedHosts()) + "}");
281281

282282
handleExcludeNodeList(graceful, timeout);
283+
markUnregisteredNodesAsLost(yarnConf);
283284
}
284285

285286
private void setDecommissionedNMs() {
@@ -387,6 +388,115 @@ private void handleExcludeNodeList(boolean graceful, int timeout) {
387388
updateInactiveNodes();
388389
}
389390

391+
/**
392+
* Marks the unregistered nodes as LOST
393+
* if the feature is enabled via a configuration flag.
394+
*
395+
* This method finds nodes that are present in the include list but are not
396+
* registered with the ResourceManager. Such nodes are then marked as LOST.
397+
*
398+
* The steps are as follows:
399+
* 1. Retrieve all hostnames of registered nodes from RM.
400+
* 2. Identify the nodes present in the include list but are not registered
401+
* 3. Remove nodes from the exclude list
402+
* 4. Dispatch LOST events for filtered nodes to mark them as LOST.
403+
*
404+
* @param yarnConf Configuration object that holds the YARN configurations.
405+
*/
406+
private void markUnregisteredNodesAsLost(Configuration yarnConf) {
407+
// Check if tracking unregistered nodes is enabled in the configuration
408+
if (!yarnConf.getBoolean(YarnConfiguration.ENABLE_TRACKING_FOR_UNREGISTERED_NODES,
409+
YarnConfiguration.DEFAULT_ENABLE_TRACKING_FOR_UNREGISTERED_NODES)) {
410+
LOG.debug("Unregistered node tracking is disabled. " +
411+
"Skipping marking unregistered nodes as LOST.");
412+
return;
413+
}
414+
415+
// Set to store all registered hostnames from both active and inactive lists
416+
Set<String> registeredHostNames = gatherRegisteredHostNames();
417+
// Event handler to dispatch LOST events
418+
EventHandler eventHandler = this.rmContext.getDispatcher().getEventHandler();
419+
420+
// Identify nodes that are in the include list but are not registered
421+
// and are not in the exclude list
422+
List<String> nodesToMarkLost = new ArrayList<>();
423+
HostDetails hostDetails = hostsReader.getHostDetails();
424+
Set<String> includes = hostDetails.getIncludedHosts();
425+
Set<String> excludes = hostDetails.getExcludedHosts();
426+
427+
for (String includedNode : includes) {
428+
if (!registeredHostNames.contains(includedNode) && !excludes.contains(includedNode)) {
429+
LOG.info("Lost node: {}", includedNode);
430+
nodesToMarkLost.add(includedNode);
431+
}
432+
}
433+
434+
// Dispatch LOST events for the identified lost nodes
435+
for (String lostNode : nodesToMarkLost) {
436+
dispatchLostEvent(eventHandler, lostNode);
437+
}
438+
439+
// Log successful completion of marking unregistered nodes as LOST
440+
LOG.info("Successfully marked unregistered nodes as LOST");
441+
}
442+
443+
/**
444+
* Gathers all registered hostnames from both active and inactive RMNodes.
445+
*
446+
* @return A set of registered hostnames.
447+
*/
448+
private Set<String> gatherRegisteredHostNames() {
449+
Set<String> registeredHostNames = new HashSet<>();
450+
LOG.info("Getting all the registered hostnames");
451+
452+
// Gather all registered nodes (active) from RM into the set
453+
for (RMNode node : this.rmContext.getRMNodes().values()) {
454+
registeredHostNames.add(node.getHostName());
455+
}
456+
457+
// Gather all inactive nodes from RM into the set
458+
for (RMNode node : this.rmContext.getInactiveRMNodes().values()) {
459+
registeredHostNames.add(node.getHostName());
460+
}
461+
462+
return registeredHostNames;
463+
}
464+
465+
/**
466+
* Dispatches a LOST event for a specified lost node.
467+
*
468+
* @param eventHandler The EventHandler used to dispatch the LOST event.
469+
* @param lostNode The hostname of the lost node for which the event is
470+
* being dispatched.
471+
*/
472+
private void dispatchLostEvent(EventHandler eventHandler, String lostNode) {
473+
// Generate a NodeId for the lost node with a special port -2
474+
NodeId nodeId = createLostNodeId(lostNode);
475+
RMNodeEvent lostEvent = new RMNodeEvent(nodeId, RMNodeEventType.EXPIRE);
476+
RMNodeImpl rmNode = new RMNodeImpl(nodeId, this.rmContext, lostNode, -2, -2,
477+
new UnknownNode(lostNode), Resource.newInstance(0, 0), "unknown");
478+
479+
try {
480+
// Dispatch the LOST event to signal the node is no longer active
481+
eventHandler.handle(lostEvent);
482+
483+
// After successful dispatch, update the node status in RMContext
484+
// Set the node's timestamp for when it became untracked
485+
rmNode.setUntrackedTimeStamp(Time.monotonicNow());
486+
487+
// Add the node to the active and inactive node maps in RMContext
488+
this.rmContext.getRMNodes().put(nodeId, rmNode);
489+
this.rmContext.getInactiveRMNodes().put(nodeId, rmNode);
490+
491+
LOG.info("Successfully dispatched LOST event and deactivated node: {}, Node ID: {}",
492+
lostNode, nodeId);
493+
} catch (Exception e) {
494+
// Log any exception encountered during event dispatch
495+
LOG.error("Error dispatching LOST event for node: {}, Node ID: {} - {}",
496+
lostNode, nodeId, e.getMessage());
497+
}
498+
}
499+
390500
@VisibleForTesting
391501
public int getNodeRemovalCheckInterval() {
392502
return nodeRemovalCheckInterval;
@@ -711,6 +821,20 @@ public static NodeId createUnknownNodeId(String host) {
711821
return NodeId.newInstance(host, -1);
712822
}
713823

824+
/**
825+
* Creates a NodeId for a node marked as LOST.
826+
*
827+
* The NodeId combines the hostname with a special port value of -2, indicating
828+
* that the node is lost in the cluster.
829+
*
830+
* @param host The hostname of the lost node.
831+
* @return NodeId Unique identifier for the lost node, with the port set to -2.
832+
*/
833+
public static NodeId createLostNodeId(String host) {
834+
// Create a NodeId with the given host and port -2 to signify the node is lost.
835+
return NodeId.newInstance(host, -2);
836+
}
837+
714838
/**
715839
* A Node instance needed upon startup for populating inactive nodes Map.
716840
* It only knows its hostname/ip.

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1603,14 +1603,26 @@ protected void serviceStart() throws Exception {
16031603
int port = webApp.port();
16041604
WebAppUtils.setRMWebAppPort(conf, port);
16051605
}
1606+
1607+
// Refresh node state before the service startup to reflect the unregistered
1608+
// nodemanagers as LOST if the tracking for unregistered nodes flag is enabled.
1609+
// For HA setup, refreshNodes is already being called before the active
1610+
// transition.
1611+
Configuration yarnConf = getConfig();
1612+
if (!this.rmContext.isHAEnabled() && yarnConf.getBoolean(
1613+
YarnConfiguration.ENABLE_TRACKING_FOR_UNREGISTERED_NODES,
1614+
YarnConfiguration.DEFAULT_ENABLE_TRACKING_FOR_UNREGISTERED_NODES)) {
1615+
this.rmContext.getNodesListManager().refreshNodes(yarnConf);
1616+
}
1617+
16061618
super.serviceStart();
16071619

16081620
// Non HA case, start after RM services are started.
16091621
if (!this.rmContext.isHAEnabled()) {
16101622
transitionToActive();
16111623
}
16121624
}
1613-
1625+
16141626
protected void doSecureLogin() throws IOException {
16151627
InetSocketAddress socAddr = getBindAddress(conf);
16161628
SecurityUtil.login(this.conf, YarnConfiguration.RM_KEYTAB,

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ RMNodeEventType.STARTED, new AddNodeTransition())
224224
.addTransition(NodeState.NEW, NodeState.DECOMMISSIONED,
225225
RMNodeEventType.DECOMMISSION,
226226
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
227+
.addTransition(NodeState.NEW, NodeState.LOST,
228+
RMNodeEventType.EXPIRE,
229+
new DeactivateNodeTransition(NodeState.LOST))
227230
.addTransition(NodeState.NEW, NodeState.NEW,
228231
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
229232
new AddContainersToBeRemovedFromNMTransition())
@@ -958,6 +961,16 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
958961
if (previousRMNode != null) {
959962
ClusterMetrics.getMetrics().decrDecommisionedNMs();
960963
}
964+
965+
// Check if the node was lost before
966+
NodeId lostNodeId = NodesListManager.createLostNodeId(nodeId.getHost());
967+
RMNode previousRMLostNode = rmNode.context.getInactiveRMNodes().remove(lostNodeId);
968+
if (previousRMLostNode != null) {
969+
// Remove the record of the lost node and update the metrics
970+
rmNode.context.getRMNodes().remove(lostNodeId);
971+
ClusterMetrics.getMetrics().decrNumLostNMs();
972+
}
973+
961974
containers = startEvent.getNMContainerStatuses();
962975
final Resource allocatedResource = Resource.newInstance(
963976
Resources.none());

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3303,4 +3303,67 @@ public void testDecommissionWithSelectiveStates() throws Exception {
33033303

33043304
rm.close();
33053305
}
3306+
3307+
/**
3308+
* Test case to verify the behavior of ResourceManager when unregistered nodes
3309+
* are marked as 'LOST' and node metrics are correctly updated in the system.
3310+
*
3311+
* @throws Exception if any unexpected behavior occurs
3312+
*/
3313+
@Test
3314+
public void testMarkUnregisteredNodesAsLost() throws Exception {
3315+
// Step 1: Create a Configuration object to hold the settings.
3316+
Configuration conf = new Configuration();
3317+
3318+
// Step 2: Setup the host files.
3319+
// Include the following hosts: test_host1, test_host2, test_host3, test_host4
3320+
writeToHostsFile(hostFile, "test_host1", "test_host2", "test_host3", "test_host4");
3321+
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath());
3322+
3323+
// Exclude the following host: test_host4
3324+
writeToHostsFile(excludeHostFile, "test_host4");
3325+
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath());
3326+
3327+
// Enable tracking for unregistered nodes in the ResourceManager configuration
3328+
conf.setBoolean(YarnConfiguration.ENABLE_TRACKING_FOR_UNREGISTERED_NODES, true);
3329+
3330+
// Step 3: Create a MockRM (ResourceManager) instance to simulate RM behavior
3331+
rm = new MockRM(conf);
3332+
RMContext rmContext = rm.getRMContext(); // Retrieve the ResourceManager context
3333+
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); // Get cluster metrics for nodes
3334+
rm.start(); // Start the ResourceManager instance
3335+
3336+
// Step 4: Register and simulate node activity for "test_host1"
3337+
TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event dispatch
3338+
MockNM nm1 = rm.registerNode("test_host1:1234", 5120); // Register test_host1 with 5120MB
3339+
nm1.nodeHeartbeat(true); // Send heartbeat to simulate the node being alive
3340+
TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event processing
3341+
3342+
// Step 5: Validate that test_host3 is marked as a LOST node
3343+
Assert.assertNotNull(clusterMetrics); // Ensure metrics are not null
3344+
assertEquals("test_host3 should be a lost NM!",
3345+
NodeState.LOST,
3346+
rmContext.getInactiveRMNodes().get(
3347+
rm.getNodesListManager().createLostNodeId("test_host3")).getState());
3348+
3349+
// Step 6: Validate node metrics for lost, active, and decommissioned nodes
3350+
// Two nodes are lost
3351+
assertEquals("There should be 2 Lost NM!", 2, clusterMetrics.getNumLostNMs());
3352+
// One node is active
3353+
assertEquals("There should be 1 Active NM!", 1, clusterMetrics.getNumActiveNMs());
3354+
// One node is decommissioned
3355+
assertEquals("There should be 1 Decommissioned NM!", 1,
3356+
clusterMetrics.getNumDecommisionedNMs());
3357+
3358+
// Step 7: Register and simulate node activity for "test_host3"
3359+
MockNM nm3 = rm.registerNode("test_host3:5678", 10240); // Register test_host3 with 10240MB
3360+
nm3.nodeHeartbeat(true); // Send heartbeat to simulate the node being alive
3361+
TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event dispatch and processing
3362+
3363+
// Step 8: Validate updated node metrics after registering test_host3
3364+
assertEquals("There should be 1 Lost NM!", 1,
3365+
clusterMetrics.getNumLostNMs()); // Only one node is lost now
3366+
assertEquals("There should be 2 Active NM!", 2,
3367+
clusterMetrics.getNumActiveNMs()); // Two nodes are now active
3368+
}
33063369
}

0 commit comments

Comments
 (0)