diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 1ffd70a3ff..57d57977bc 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2228,4 +2228,13 @@ static Set getPropertySet() { @ConfigurationProperty public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties"; + /** + * Long value. Specifies the timeout when waiting all initializers to be finished. + * Default time unit is: ms. Set to a negative number to disable . + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="long") + public static final String TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT = + TEZ_AM_PREFIX + "initialize.wait.initializers.timeout"; + public static final long TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT_DEFAULT = -1; } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 583cc0099a..c9791f54e2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -164,6 +164,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final TezExecutors sharedExecutor; /** nanoTime of the task initialization start. */ private Long initStartTimeNs = null; + private Long waitInitializersFinishTimeout; public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, @@ -225,6 +226,8 @@ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG, TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT); this.sharedExecutor = sharedExecutor; + this.waitInitializersFinishTimeout = tezConf.getLong(TezConfiguration.TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT, + TezConfiguration.TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT_DEFAULT); } /** @@ -267,10 +270,19 @@ public void initialize() throws Exception { int completedTasks = 0; while (completedTasks < numTasks) { LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish"); - Future future = initializerCompletionService.take(); + Future future; + if (waitInitializersFinishTimeout >= 0) { + future = initializerCompletionService.poll(waitInitializersFinishTimeout, TimeUnit.MILLISECONDS); + } else { + future = initializerCompletionService.take(); + } try { - future.get(); - completedTasks++; + if (future != null) { + future.get(); + completedTasks++; + } else { + throw new RuntimeException("Timed out while waiting for all initializers to finish"); + } } catch (ExecutionException e) { if (e.getCause() instanceof Exception) { throw (Exception) e.getCause();