Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2304,4 +2304,12 @@ static Set<String> getPropertySet() {
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";

/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty(type = "long")
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime;

import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
Expand All @@ -26,6 +27,8 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
Expand All @@ -35,6 +38,11 @@
import org.apache.tez.runtime.metrics.TaskCounterUpdater;

import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT;

public abstract class RuntimeTask {

Expand All @@ -54,6 +62,9 @@ public abstract class RuntimeTask {
private final TaskStatistics statistics;
private final AtomicBoolean progressNotified = new AtomicBoolean(false);

private final long lfsBytesWriteLimit;
private static final Logger LOG = LoggerFactory.getLogger(RuntimeTask.class);

protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
this.taskSpec = taskSpec;
Expand All @@ -71,6 +82,8 @@ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
} else {
this.counterUpdater = null;
}
this.lfsBytesWriteLimit =
tezConf.getLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT);
}

protected enum State {
Expand Down Expand Up @@ -182,4 +195,36 @@ protected void setTaskDone() {
protected final boolean isUpdatingSystemCounters() {
return counterUpdater != null;
}

/**
* Check whether the task has exceeded any configured limits.
*
* @throws LocalWriteLimitException in case the limit is exceeded.
*/
public void checkTaskLimits() throws LocalWriteLimitException {
// check the limit for writing to local file system
if (lfsBytesWriteLimit >= 0) {
Long lfsBytesWritten = null;
try {
LocalFileSystem localFS = FileSystem.getLocal(tezConf);
lfsBytesWritten = FileSystem.getGlobalStorageStatistics().get(localFS.getScheme()).getLong("bytesWritten");
} catch (IOException e) {
LOG.warn("Could not get LocalFileSystem bytesWritten counter");
}
if (lfsBytesWritten != null && lfsBytesWritten > lfsBytesWriteLimit) {
throw new LocalWriteLimitException(
"Too much write to local file system." + " current value is " + lfsBytesWritten + " the limit is "
+ lfsBytesWriteLimit);
}
}
}

/**
* Exception thrown when the task exceeds some configured limits.
*/
public static class LocalWriteLimitException extends IOException {
public LocalWriteLimitException(String str) {
super(str);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException;
import org.apache.tez.runtime.api.*;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
Expand Down Expand Up @@ -141,6 +142,8 @@ static class HeartbeatCallable implements Callable<Boolean> {
private static final float LOG_COUNTER_BACKOFF = 1.3f;
private static final int HEAP_MEMORY_USAGE_UPDATE_INTERVAL = 5000; // 5 seconds

private static final int LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL = 10000; // 10 seconds

private final RuntimeTask task;
private final EventMetaData updateEventMetadata;

Expand All @@ -165,6 +168,9 @@ static class HeartbeatCallable implements Callable<Boolean> {
private long usedMemory = 0;
private long heapMemoryUsageUpdatedTime = System.currentTimeMillis() - HEAP_MEMORY_USAGE_UPDATE_INTERVAL;

private long localFileSystemBytesWrittenCheckInterval =
System.currentTimeMillis() - LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL;

/*
* Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
* log counters.
Expand Down Expand Up @@ -262,6 +268,17 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
sendCounters = true;
prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
}
try {
long now = System.currentTimeMillis();
if (now - localFileSystemBytesWrittenCheckInterval > LOCAL_FILE_SYSTEM_BYTES_WRITTEN_CHECK_INTERVAL) {
task.checkTaskLimits();
localFileSystemBytesWrittenCheckInterval = now;
}
} catch (LocalWriteLimitException lwle) {
LOG.error("Local FileSystem write limit exceeded", lwle);
askedToDie.set(true);
return new ResponseWrapper(true, 1);
}
updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata);
events.add(updateEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime.task;

import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand All @@ -27,7 +28,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -36,12 +41,21 @@

import com.google.common.collect.Lists;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.RuntimeTask.LocalWriteLimitException;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
Expand All @@ -55,6 +69,9 @@
@SuppressWarnings("rawtypes")
public class TestTaskReporter {

private static final File TEST_DIR =
new File(System.getProperty("test.build.data"), TestTaskReporter.class.getName()).getAbsoluteFile();

@Test(timeout = 10000)
public void testContinuousHeartbeatsOnMaxEvents() throws Exception {

Expand Down Expand Up @@ -218,6 +235,38 @@ public void testStatusUpdateAfterInitializationAndCounterFlag() {

}

@Test
public void testLocalFileSystemBytesWrittenLimit() throws IOException {
TaskSpec mockSpec = mock(TaskSpec.class);
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
TezConfiguration tezConf = new TezConfiguration();
LogicalIOProcessorRuntimeTask lio1 =
new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null,
Runtime.getRuntime().maxMemory(), true, null, null);

LocalFileSystem localFS = FileSystem.getLocal(tezConf);
FileSystem.clearStatistics();
Path tmpPath =
new Path(TEST_DIR + "/testLocalFileSystemBytesWrittenLimit" + new Random(System.currentTimeMillis()).nextInt());
try (FSDataOutputStream out = localFS.create(tmpPath, true)) {
out.write(new byte[1024]);
}
// Check limits with default shouldn't throw exception.
lio1.checkTaskLimits();

tezConf.setLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, 10);
lio1 = new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null,
Runtime.getRuntime().maxMemory(), true, null, null);

try {
lio1.checkTaskLimits();
Assert.fail("Expected to throw LocalWriteLimitException");
} catch (LocalWriteLimitException localWriteLimitException) {
Assert.assertTrue(localWriteLimitException.getMessage().contains("Too much write to local file system"));
}
}

private List<TezEvent> createEvents(int numEvents) {
List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
for (int i = 0; i < numEvents; i++) {
Expand Down