Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
Expand Down Expand Up @@ -3543,7 +3544,10 @@ static class AsyncSchedulingConfiguration {

this.asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
AsyncScheduleThread ast = new AsyncScheduleThread(cs);
ast.setUncaughtExceptionHandler(
new RMCriticalThreadUncaughtExceptionHandler(cs.rmContext));
asyncSchedulerThreads.add(ast);
}
this.resourceCommitterService = new ResourceCommitterService(cs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
Expand Down Expand Up @@ -135,6 +136,57 @@ public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
rm2.stop();
}

@Test(timeout = 30000)
public void testAsyncScheduleThreadExit() throws Exception {
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// register NM
rm1.registerNode("192.1.1.1:1234", 8192, 8);
rm1.drainEvents();

// make sure async-scheduling thread is correct at beginning
checkAsyncSchedulerThreads(Thread.currentThread());

// test async-scheduling thread exit
try{
// set resource calculator to be null to simulate
// NPE in async-scheduling thread
CapacityScheduler cs =
(CapacityScheduler) rm1.getRMContext().getScheduler();
cs.setResourceCalculator(null);

// wait for rm1 to be transitioned to standby
GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState()
== HAServiceProtocol.HAServiceState.STANDBY, 100, 5000);

// failover rm2 to rm1
HAServiceProtocol.StateChangeRequestInfo requestInfo =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
rm2.adminService.transitionToStandby(requestInfo);
GenericTestUtils.waitFor(() -> {
try {
// this call may fail when rm1 is still initializing
// in StandByTransitionRunnable thread
rm1.adminService.transitionToActive(requestInfo);
return true;
} catch (Exception e) {
return false;
}
}, 100, 3000);

// wait for rm1 to be transitioned to active again
GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState()
== HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000);

// make sure async-scheduling thread is correct after failover
checkAsyncSchedulerThreads(Thread.currentThread());
} finally {
rm1.stop();
rm2.stop();
}
}

private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.contrib.java.lang.system.internal.NoExitSecurityManager;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -1072,6 +1074,37 @@ public Boolean answer(InvocationOnMock invocation) throws Exception {
rm.stop();
}

@Test(timeout = 30000)
public void testAsyncScheduleThreadExit() throws Exception {
// init RM & NM
final MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("192.168.0.1:1234", 8 * GB);
rm.drainEvents();

// Set no exit security manager to catch System.exit
SecurityManager originalSecurityManager = System.getSecurityManager();
NoExitSecurityManager noExitSecurityManager =
new NoExitSecurityManager(originalSecurityManager);
System.setSecurityManager(noExitSecurityManager);

// test async-scheduling thread exit
try{
// set resource calculator to be null to simulate
// NPE in async-scheduling thread
CapacityScheduler cs =
(CapacityScheduler) rm.getRMContext().getScheduler();
cs.setResourceCalculator(null);

// wait for RM to be shutdown until timeout
GenericTestUtils.waitFor(noExitSecurityManager::isCheckExitCalled,
100, 5000);
} finally {
System.setSecurityManager(originalSecurityManager);
rm.stop();
}
}

private ResourceCommitRequest createAllocateFromReservedProposal(
int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
SchedulerNode allocateNode, SchedulerNode reservedNode,
Expand Down
Loading