diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 9e2e2d89cf..3dc6fe4745 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2304,4 +2304,12 @@ static Set getPropertySet() { public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; + /** + * Limits the amount of data that can be written to LocalFileSystem by a Task. + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty(type = "long") + public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes"; + public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1; + } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java index a53d0d2e7e..4c44985eed 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime; +import java.io.IOException; import java.util.Collection; import java.util.EnumSet; import java.util.Map; @@ -26,6 +27,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -35,6 +38,11 @@ import org.apache.tez.runtime.metrics.TaskCounterUpdater; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT; public abstract class RuntimeTask { @@ -54,6 +62,9 @@ public abstract class RuntimeTask { private final TaskStatistics statistics; private final AtomicBoolean progressNotified = new AtomicBoolean(false); + private final long lfsBytesWriteLimit; + private static final Logger LOG = LoggerFactory.getLogger(RuntimeTask.class); + protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) { this.taskSpec = taskSpec; @@ -71,6 +82,8 @@ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf, } else { this.counterUpdater = null; } + this.lfsBytesWriteLimit = + tezConf.getLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT); } protected enum State { @@ -182,4 +195,36 @@ protected void setTaskDone() { protected final boolean isUpdatingSystemCounters() { return counterUpdater != null; } + + /** + * Check whether the task has exceeded any configured limits. + * + * @throws LocalWriteLimitException in case the limit is exceeded. + */ + public void checkTaskLimits() throws LocalWriteLimitException { + // check the limit for writing to local file system + if (lfsBytesWriteLimit >= 0) { + Long lfsBytesWritten = null; + try { + LocalFileSystem localFS = FileSystem.getLocal(tezConf); + lfsBytesWritten = FileSystem.getGlobalStorageStatistics().get(localFS.getScheme()).getLong("bytesWritten"); + } catch (IOException e) { + LOG.warn("Could not get LocalFileSystem bytesWritten counter"); + } + if (lfsBytesWritten != null && lfsBytesWritten > lfsBytesWriteLimit) { + throw new LocalWriteLimitException( + "Too much write to local file system." + " current value is " + lfsBytesWritten + " the limit is " + + lfsBytesWriteLimit); + } + } + } + + /** + * Exception thrown when the task exceeds some configured limits. + */ + public static class LocalWriteLimitException extends IOException { + public LocalWriteLimitException(String str) { + super(str); + } + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java index 99d8bbca47..5b1a9544b1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java @@ -44,6 +44,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; +import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException; import org.apache.tez.runtime.api.*; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; @@ -141,6 +142,8 @@ static class HeartbeatCallable implements Callable { private static final float LOG_COUNTER_BACKOFF = 1.3f; private static final int HEAP_MEMORY_USAGE_UPDATE_INTERVAL = 5000; // 5 seconds + private static final int LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL = 10000; // 10 seconds + private final RuntimeTask task; private final EventMetaData updateEventMetadata; @@ -165,6 +168,9 @@ static class HeartbeatCallable implements Callable { private long usedMemory = 0; private long heapMemoryUsageUpdatedTime = System.currentTimeMillis() - HEAP_MEMORY_USAGE_UPDATE_INTERVAL; + private long localFileSystemBytesWrittenCheckInterval = + System.currentTimeMillis() - LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL; + /* * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send / * log counters. @@ -262,6 +268,17 @@ private synchronized ResponseWrapper heartbeat(Collection eventsArg) t sendCounters = true; prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get(); } + try { + long now = System.currentTimeMillis(); + if (now - localFileSystemBytesWrittenCheckInterval > LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL) { + task.checkTaskLimits(); + localFileSystemBytesWrittenCheckInterval = now; + } + } catch (LocalWriteLimitException lwle) { + LOG.error("Local FileSystem write limit exceeded", lwle); + askedToDie.set(true); + return new ResponseWrapper(true, 1); + } updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata); events.add(updateEvent); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java index 147d17655b..7ecd74fb72 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskReporter.java @@ -18,6 +18,7 @@ package org.apache.tez.runtime.task; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -27,7 +28,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -36,12 +41,21 @@ import com.google.common.collect.Lists; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; +import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; @@ -55,6 +69,9 @@ @SuppressWarnings("rawtypes") public class TestTaskReporter { + private static final File TEST_DIR = + new File(System.getProperty("test.build.data"), TestTaskReporter.class.getName()).getAbsoluteFile(); + @Test(timeout = 10000) public void testContinuousHeartbeatsOnMaxEvents() throws Exception { @@ -218,6 +235,38 @@ public void testStatusUpdateAfterInitializationAndCounterFlag() { } + @Test + public void testLocalFileSystemBytesWrittenLimit() throws IOException { + TaskSpec mockSpec = mock(TaskSpec.class); + when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class))); + when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class))); + TezConfiguration tezConf = new TezConfiguration(); + LogicalIOProcessorRuntimeTask lio1 = + new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null, + Runtime.getRuntime().maxMemory(), true, null, null); + + LocalFileSystem localFS = FileSystem.getLocal(tezConf); + FileSystem.clearStatistics(); + Path tmpPath = + new Path(TEST_DIR + "/testLocalFileSystemBytesWrittenLimit" + new Random(System.currentTimeMillis()).nextInt()); + try (FSDataOutputStream out = localFS.create(tmpPath, true)) { + out.write(new byte[1024]); + } + // Check limits with default shouldn't throw exception. + lio1.checkTaskLimits(); + + tezConf.setLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, 10); + lio1 = new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null, + Runtime.getRuntime().maxMemory(), true, null, null); + + try { + lio1.checkTaskLimits(); + Assert.fail("Expected to throw LocalWriteLimitException"); + } catch (LocalWriteLimitException localWriteLimitException) { + Assert.assertTrue(localWriteLimitException.getMessage().contains("Too much write to local file system")); + } + } + private List createEvents(int numEvents) { List list = Lists.newArrayListWithCapacity(numEvents); for (int i = 0; i < numEvents; i++) {