Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ public TezConfiguration(boolean loadDefaults) {
@Private
public static final int TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT = 10;

/**
* Integer value. Milliseconds while AsyncDispatcher should wait for events to be processed on
* serviceStop. The idea is borrowed from YARN-3999.
*/
@Private
@ConfigurationScope(Scope.AM)
public static final String TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT = TEZ_AM_PREFIX
+ "dispatcher.drain-events.timeout";
@Private
public static final int TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT = 10000;

/**
* Boolean value. Execution mode for the Tez application. True implies session mode. If the client
* code is written according to best practices then the same code can execute in either mode based
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -142,19 +143,34 @@ protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
long endTime = System.currentTimeMillis() + getConfig()
.getInt(TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT,
TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT);

synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) {
while (!drained && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain.");
LOG.info(
"Waiting for AsyncDispatcher to drain. Current queue size: {}, handler thread state: {}",
eventQueue.size(), eventHandlingThread.getState());
}
}

}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing the patch. Overall looks good. Minor comments below.

If you look at eventHandlingThread, it would be making a call to "protected void dispatch(Event event)". This method catches "throwable" and does not reset Thread's interrupt status. So in case, interrupt happened during dispatch, it would be silently gobbled up.

Can you add snippet to catch "InterruptedException" in asyncDispatcher::dispatch(), log that it got interrupted and just reset thread's interrupt status with ("Thread.currentThread().interrupt();"). This would ensure that the next iteration in event handler thread gets a chance to bail out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @rbalamohan, makes sense, I added it in a new commit

try {
eventHandlingThread.join();
/*
* The event handling thread can be alive at this point, but in BLOCKED state, which leads
* to app hang, as a BLOCKED thread might never finish under some circumstances
*/
if (eventHandlingThread.getState() == Thread.State.BLOCKED) {
LOG.warn(
"eventHandlingThread is in BLOCKED state, let's not wait for it in order to prevent app hang");
} else {
eventHandlingThread.join();
LOG.info("joined event handling thread, state: {}", eventHandlingThread.getState());
}
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
Expand All @@ -181,6 +197,10 @@ protected void dispatch(Event event) {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
if (t instanceof InterruptedException) {
LOG.warn("Interrupted Exception while handling event: " + event.getType(), t);
Thread.currentThread().interrupt();
}
LOG.error("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
Expand Down
3 changes: 2 additions & 1 deletion tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.AsyncDispatcher;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.DAGSubmissionTimedOut;
Expand Down Expand Up @@ -356,7 +357,7 @@ public void run() {
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
clientHandler = new DAGClientHandler(dagAppMaster);

((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop();
} catch (Throwable t) {
LOG.error("Error starting DAGAppMaster", t);
if (dagAppMaster != null) {
Expand Down