Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2228,4 +2228,13 @@ static Set<String> getPropertySet() {
@ConfigurationProperty
public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";

/**
* Long value. Specifies the timeout when waiting all initializers to be finished.
* Default time unit is: ms. Set to a negative number to disable .
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="long")
public static final String TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT =
TEZ_AM_PREFIX + "initialize.wait.initializers.timeout";
public static final long TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT_DEFAULT = -1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private final TezExecutors sharedExecutor;
/** nanoTime of the task initialization start. */
private Long initStartTimeNs = null;
private Long waitInitializersFinishTimeout;

public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
Expand Down Expand Up @@ -225,6 +226,8 @@ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
this.maxEventBacklog = tezConf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG,
TezConfiguration.TEZ_TASK_MAX_EVENT_BACKLOG_DEFAULT);
this.sharedExecutor = sharedExecutor;
this.waitInitializersFinishTimeout = tezConf.getLong(TezConfiguration.TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT,
TezConfiguration.TEZ_AM_INITIALIZE_WAIT_INITIALIZERS_FINISH_TIMEOUT_DEFAULT);
}

/**
Expand Down Expand Up @@ -267,10 +270,19 @@ public void initialize() throws Exception {
int completedTasks = 0;
while (completedTasks < numTasks) {
LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
Future<Void> future = initializerCompletionService.take();
Future<Void> future;
if (waitInitializersFinishTimeout >= 0) {
future = initializerCompletionService.poll(waitInitializersFinishTimeout, TimeUnit.MILLISECONDS);
} else {
future = initializerCompletionService.take();
}
try {
future.get();
completedTasks++;
if (future != null) {
future.get();
completedTasks++;
} else {
throw new RuntimeException("Timed out while waiting for all initializers to finish");
}
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
throw (Exception) e.getCause();
Expand Down