diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 05eb4b286c..eef0d65424 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -193,6 +193,17 @@ public TezConfiguration(boolean loadDefaults) { @Private public static final int TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT = 10; + /** + * Integer value. Milliseconds while AsyncDispatcher should wait for events to be processed on + * serviceStop. The idea is borrowed from YARN-3999. + */ + @Private + @ConfigurationScope(Scope.AM) + public static final String TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT = TEZ_AM_PREFIX + + "dispatcher.drain-events.timeout"; + @Private + public static final int TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT = 10000; + /** * Boolean value. Execution mode for the Tez application. True implies session mode. If the client * code is written according to best practices then the same code can execute in either mode based diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index c197f1d32e..f9f21ca313 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,19 +143,34 @@ protected void serviceStop() throws Exception { if (drainEventsOnStop) { blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, ignoring any new events."); + long endTime = System.currentTimeMillis() + getConfig() + .getInt(TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT, + TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT); + synchronized (waitForDrained) { - while (!drained && eventHandlingThread.isAlive()) { + while (!drained && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { waitForDrained.wait(1000); - LOG.info("Waiting for AsyncDispatcher to drain."); + LOG.info( + "Waiting for AsyncDispatcher to drain. Current queue size: {}, handler thread state: {}", + eventQueue.size(), eventHandlingThread.getState()); } } - } stopped = true; if (eventHandlingThread != null) { eventHandlingThread.interrupt(); try { - eventHandlingThread.join(); + /* + * The event handling thread can be alive at this point, but in BLOCKED state, which leads + * to app hang, as a BLOCKED thread might never finish under some circumstances + */ + if (eventHandlingThread.getState() == Thread.State.BLOCKED) { + LOG.warn( + "eventHandlingThread is in BLOCKED state, let's not wait for it in order to prevent app hang"); + } else { + eventHandlingThread.join(); + LOG.info("joined event handling thread, state: {}", eventHandlingThread.getState()); + } } catch (InterruptedException ie) { LOG.warn("Interrupted Exception while stopping", ie); } @@ -181,6 +197,10 @@ protected void dispatch(Event event) { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { + if (t instanceof InterruptedException) { + LOG.warn("Interrupted Exception while handling event: " + event.getType(), t); + Thread.currentThread().interrupt(); + } LOG.error("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index c76bd6bace..f6d9587734 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.common.AsyncDispatcher; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.DAGSubmissionTimedOut; @@ -356,7 +357,7 @@ public void run() { amCredentials, UserGroupInformation.getCurrentUser().getShortUserName()); DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf); clientHandler = new DAGClientHandler(dagAppMaster); - + ((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop(); } catch (Throwable t) { LOG.error("Error starting DAGAppMaster", t); if (dagAppMaster != null) {