Skip to content

Commit bdccfbe

Browse files
committed
TEZ-2119: Counter for launched containers
1 parent 5038075 commit bdccfbe

17 files changed

Lines changed: 238 additions & 2 deletions

tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,51 @@ public enum DAGCounter {
4141
AM_CPU_MILLISECONDS,
4242
/** Wall clock time taken by all the tasks. */
4343
WALL_CLOCK_MILLIS,
44-
AM_GC_TIME_MILLIS
44+
AM_GC_TIME_MILLIS,
45+
46+
/*
47+
* Type: # of containers
48+
* Both allocated and launched containers before DAG start.
49+
* This is incremented only once when the DAG starts and it's calculated
50+
* by querying all the held containers from TaskSchedulers.
51+
*/
52+
INITIAL_HELD_CONTAINERS,
53+
54+
/*
55+
* Type: # of containers
56+
* All containers that have been seen/used in this DAG by task allocation.
57+
* This counter can be calculated at the end of DAG by simply counting the distinct
58+
* ContainerIds that have been seen in TaskSchedulerManager.taskAllocated callbacks.
59+
*/
60+
TOTAL_CONTAINERS_USED,
61+
62+
/*
63+
* Type: # of events
64+
* Number of container allocations during a DAG. This is incremented every time
65+
* the containerAllocated callback is called in the TaskSchedulerContext.
66+
* This counter doesn't account for initially held (launched, allocated) containers.
67+
*/
68+
TOTAL_CONTAINER_ALLOCATION_COUNT,
69+
70+
/*
71+
* Type: # of events
72+
* Number of container launches during a DAG. This is incremented every time
73+
* the containerLaunched callback is called in the ContainerLauncherContext.
74+
* This counter doesn't account for initially held (launched, allocated) containers.
75+
*/
76+
TOTAL_CONTAINER_LAUNCH_COUNT,
77+
78+
/*
79+
* Type: # of events
80+
* Number of container releases during a DAG. This is incremented every time
81+
* the containerBeingReleased callback is called in the TaskSchedulerContext.
82+
*/
83+
TOTAL_CONTAINER_RELEASE_COUNT,
84+
85+
/*
86+
* Type: # of events
87+
* Number of container reuses during a DAG. This is incremented every time
88+
* the containerReused callback is called in the TaskSchedulerContext.
89+
*/
90+
TOTAL_CONTAINER_REUSE_COUNT
4591
}

tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package org.apache.tez.serviceplugins.api;
1616

17+
import java.util.List;
18+
1719
import javax.annotation.Nullable;
1820

1921
import org.apache.hadoop.classification.InterfaceAudience;
@@ -263,4 +265,19 @@ public abstract boolean deallocateTask(Object task, boolean taskSucceeded,
263265
*/
264266
public abstract void dagComplete() throws ServicePluginException;
265267

268+
/**
269+
* Get the number of held containers
270+
* @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
271+
* This will cause the app to shutdown.
272+
*/
273+
public abstract int getHeldContainersCount();
274+
275+
/**
276+
* Callback to be used in the event of a container allocation.
277+
*/
278+
protected void onContainersAllocated(List<Container> containers) {
279+
for (Container container : containers) {
280+
getContext().containerAllocated(container);
281+
}
282+
}
266283
}

tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,20 @@ void taskAllocated(Object task,
8484
Object appCookie,
8585
Container container);
8686

87+
/**
88+
* Indicate to the framework that a container is being allocated.
89+
*
90+
* @param the actual container
91+
*/
92+
void containerAllocated(Container container);
93+
94+
/**
95+
* Indicate to the framework that a container is being reused:
96+
* there is a task assigned to an already used container.
97+
*
98+
* @param the actual container
99+
*/
100+
void containerReused(Container container);
87101

88102
/**
89103
* Indicate to the framework that a container has completed. This is typically used by sources

tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
2222
import org.apache.hadoop.yarn.api.records.ContainerId;
2323
import org.apache.tez.common.TezUtilsInternal;
24+
import org.apache.tez.common.counters.DAGCounter;
2425
import org.apache.tez.dag.api.UserPayload;
2526
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
2627
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
@@ -65,6 +66,7 @@ public ContainerLauncherContextImpl(AppContext appContext, ContainerLauncherMana
6566

6667
@Override
6768
public void containerLaunched(ContainerId containerId) {
69+
context.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_LAUNCH_COUNT, 1);
6870
context.getEventHandler().handle(
6971
new AMContainerEventLaunched(containerId));
7072
ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@
121121
import org.apache.tez.common.TezConverterUtils;
122122
import org.apache.tez.common.TezUtilsInternal;
123123
import org.apache.tez.common.VersionInfo;
124+
import org.apache.tez.common.counters.DAGCounter;
124125
import org.apache.tez.common.counters.Limits;
125126
import org.apache.tez.common.security.ACLManager;
126127
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -773,6 +774,8 @@ protected synchronized void handle(DAGAppMasterEvent event) {
773774
System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
774775
// Stop vertex services if any
775776
stopVertexServices(currentDAG);
777+
handleUsedContainersOnDagFinish(currentDAG);
778+
776779
if (!isSession) {
777780
LOG.info("Not a session, AM will unregister as DAG has completed");
778781
this.taskSchedulerManager.setShouldUnregisterFlag();
@@ -895,6 +898,12 @@ protected synchronized void handle(DAGAppMasterEvent event) {
895898
}
896899
}
897900

901+
private void handleUsedContainersOnDagFinish(DAG dag) {
902+
dag.incrementDagCounter(DAGCounter.TOTAL_CONTAINERS_USED,
903+
taskSchedulerManager.getContainersCountUsedByCurrentDAG());
904+
taskSchedulerManager.clearContainersUsedByCurrentDAG();
905+
}
906+
898907
private void updateLoggers(DAG dag, String appender) {
899908
try {
900909
TezUtilsInternal.updateLoggers(dag.getConf(), dag.getID().toString() + appender,
@@ -2557,11 +2566,17 @@ public Void run() throws Exception {
25572566
throw new TezUncheckedException(e);
25582567
}
25592568

2569+
countAlreadyHeldContainers(newDAG);
25602570
startDAGExecution(newDAG, lrDiff);
25612571
// set state after curDag is set
25622572
this.state = DAGAppMasterState.RUNNING;
25632573
}
25642574

2575+
private void countAlreadyHeldContainers(DAG newDAG) {
2576+
newDAG.incrementDagCounter(DAGCounter.INITIAL_HELD_CONTAINERS,
2577+
taskSchedulerManager.getAlreadyHeldContainersCount());
2578+
}
2579+
25652580
private void startVertexServices(DAG dag) {
25662581
for (Vertex v : dag.getVertices().values()) {
25672582
v.startServices();

tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.security.UserGroupInformation;
2626
import org.apache.hadoop.yarn.api.records.LocalResource;
2727
import org.apache.hadoop.yarn.event.EventHandler;
28+
import org.apache.tez.common.counters.DAGCounter;
2829
import org.apache.tez.common.counters.TezCounters;
2930
import org.apache.tez.dag.api.TezException;
3031
import org.apache.tez.dag.api.client.DAGStatusBuilder;
@@ -102,4 +103,6 @@ VertexStatusBuilder getVertexStatus(String vertexName,
102103
*/
103104
@Nullable DAGScheduler getDAGScheduler();
104105

106+
void incrementDagCounter(DAGCounter counter, int incrValue);
107+
105108
}

tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,6 +1441,11 @@ private void updateCpuCounters() {
14411441
dagCounters.findCounter(DAGCounter.AM_GC_TIME_MILLIS).setValue(totalDAGGCTime);
14421442
}
14431443

1444+
@Override
1445+
public void incrementDagCounter(DAGCounter counter, int incrValue) {
1446+
dagCounters.findCounter(counter).increment(incrValue);
1447+
}
1448+
14441449
private DAGState finished(DAGState finalState) {
14451450
boolean dagError = false;
14461451
try {

tez-dag/src/main/java/org/apache/tez/dag/app/rm/DagAwareYarnTaskScheduler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,8 @@ public void shutdown() throws Exception {
328328

329329
@Override
330330
public void onContainersAllocated(List<Container> containers) {
331+
super.onContainersAllocated(containers);
332+
331333
AMState appState = getContext().getAMState();
332334
if (stopRequested || appState == AMState.COMPLETED) {
333335
LOG.info("Ignoring {} allocations since app is terminating", containers.size());
@@ -946,6 +948,9 @@ private void addTaskAssignment(TaskRequest request, HeldContainer hc) {
946948
assignedVertices.set(vertexIndex);
947949
}
948950
cset.add(hc);
951+
if (!hc.isNew()) {
952+
getContext().containerReused(hc.getContainer());
953+
}
949954
hc.assignTask(request);
950955
}
951956

@@ -1489,6 +1494,10 @@ Object getLastTask() {
14891494
return lastRequest != null ? lastRequest.getTask() : null;
14901495
}
14911496

1497+
boolean isNew() {
1498+
return lastRequest == null;
1499+
}
1500+
14921501
String getMatchingLocation() {
14931502
switch (state) {
14941503
case MATCHING_LOCAL:
@@ -2089,4 +2098,9 @@ protected void afterExecute(Runnable r, Throwable t) {
20892098
}
20902099
}
20912100
}
2101+
2102+
@Override
2103+
public int getHeldContainersCount() {
2104+
return heldContainers.size();
2105+
}
20922106
}

tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,4 +513,9 @@ void preemptTask(DeallocateContainerRequest request) {
513513
}
514514
}
515515
}
516+
517+
@Override
518+
public int getHeldContainersCount() {
519+
return 0;
520+
}
516521
}

tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.yarn.api.records.NodeReport;
2929
import org.apache.hadoop.yarn.api.records.Resource;
3030
import org.apache.tez.common.ContainerSignatureMatcher;
31+
import org.apache.tez.common.counters.DAGCounter;
3132
import org.apache.tez.dag.api.TezUncheckedException;
3233
import org.apache.tez.dag.api.UserPayload;
3334
import org.apache.tez.dag.app.AppContext;
@@ -69,13 +70,24 @@ public void taskAllocated(Object task, Object appCookie, Container container) {
6970
taskSchedulerManager.taskAllocated(schedulerId, task, appCookie, container);
7071
}
7172

73+
@Override
74+
public void containerAllocated(Container container) {
75+
appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_ALLOCATION_COUNT, 1);
76+
}
77+
78+
@Override
79+
public void containerReused(Container container) {
80+
appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_REUSE_COUNT, 1);
81+
}
82+
7283
@Override
7384
public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
7485
taskSchedulerManager.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
7586
}
7687

7788
@Override
7889
public void containerBeingReleased(ContainerId containerId) {
90+
appContext.getCurrentDAG().incrementDagCounter(DAGCounter.TOTAL_CONTAINER_RELEASE_COUNT, 1);
7991
taskSchedulerManager.containerBeingReleased(schedulerId, containerId);
8092
}
8193

0 commit comments

Comments
 (0)