diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index 810a806228..e6a74321f1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -18,6 +18,9 @@ import java.security.PrivilegedExceptionAction; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; @@ -75,6 +78,7 @@ public TaskRunner2CallableResult run() throws Exception { LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID()); TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit"); + IOStatisticsContext.getCurrentIOStatisticsContext().reset(); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { @@ -116,6 +120,11 @@ public TaskRunner2CallableResult run() throws Exception { // For a successful task, however, this should be almost no delay since close has already happened. maybeFixInterruptStatus(); LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get()); + String ioStats = IOStatisticsLogging.ioStatisticsToPrettyString( + IOStatisticsContext.getCurrentIOStatisticsContext().getIOStatistics()); + if (StringUtils.isNotEmpty(ioStats)) { + LOG.info("TaskAttemptId={}, {}", task.getTaskAttemptID(), ioStats); + } task.getOutputContexts().forEach(outputContext -> outputContext.trapEvents(new TezTrapEventHandler(outputContext, this.tezUmbilical)));