diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java index fc4ddcfcdc..c9a7083c1d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java @@ -20,7 +20,7 @@ import java.util.Map; import java.util.Set; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -74,9 +74,6 @@ public interface AppContext { DAG getCurrentDAG(); - // For testing only! - ThreadPoolExecutor getThreadPool(); - ListeningExecutorService getExecService(); void setDAG(DAG dag); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index cde77b3bf6..518a1fb362 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -47,10 +47,10 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -302,7 +302,6 @@ public class DAGAppMaster extends AbstractService { private Path tezSystemStagingDir; private FileSystem recoveryFS; - private ThreadPoolExecutor rawExecutor; private ListeningExecutorService execService; // TODO May not need to be a bidi map @@ -623,9 +622,9 @@ public synchronized void serviceInit(final Configuration conf) throws Exception TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT); // NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus // occupy large memory chunks when numerous Runables are pending for execution - rawExecutor = new ThreadPoolExecutor(threadCount, threadCount, - 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build()); + ExecutorService rawExecutor = + Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("App Shared Pool - #%d").build()); execService = MoreExecutors.listeningDecorator(rawExecutor); initServices(conf); @@ -1505,14 +1504,6 @@ public DAG getCurrentDAG() { return dag; } - @Override - // For Testing only! - public ThreadPoolExecutor getThreadPool() { - synchronized (DAGAppMaster.this) { - return rawExecutor; - } - } - @Override public ListeningExecutorService getExecService() { return execService;