Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,8 @@ public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto(
switch (statusGetOpts) {
case GET_COUNTERS:
return DAGProtos.StatusGetOptsProto.GET_COUNTERS;
case GET_MEMORY_USAGE:
return DAGProtos.StatusGetOptsProto.GET_MEMORY_USAGE;
}
throw new TezUncheckedException("Could not convert StatusGetOpts to" + " proto");
}
Expand All @@ -636,6 +638,8 @@ public static StatusGetOpts convertStatusGetOptsFromProto(DAGProtos.StatusGetOpt
switch (proto) {
case GET_COUNTERS:
return StatusGetOpts.GET_COUNTERS;
case GET_MEMORY_USAGE:
return StatusGetOpts.GET_MEMORY_USAGE;
}
throw new TezUncheckedException("Could not convert to StatusGetOpts from" + " proto");
}
Expand Down
20 changes: 14 additions & 6 deletions tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ public TezCounters getDAGCounters() {
return dagCounters;
}

public long getMemoryUsedByAM() {
return proxy.getMemoryUsedByAM();
}

public long getMemoryUsedByTasks() {
return proxy.getMemoryUsedByTasks();
}

@InterfaceAudience.Private
DagStatusSource getSource() {
return this.source;
Expand Down Expand Up @@ -201,12 +209,12 @@ public int hashCode() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("status=" + getState()
+ ", progress=" + getDAGProgress()
+ ", diagnostics="
+ StringUtils.join(getDiagnostics(), LINE_SEPARATOR)
+ ", counters="
+ (getDAGCounters() == null ? "null" : getDAGCounters().toString()));
sb.append("status=" + getState());
sb.append(", progress=" + getDAGProgress());
sb.append(", diagnostics=" + StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
sb.append(", memoryUsedByAM=").append(proxy.getMemoryUsedByAM());
sb.append(", memoryUsedByTasks=").append(proxy.getMemoryUsedByTasks());
sb.append(", counters=" + (getDAGCounters() == null ? "null" : getDAGCounters().toString()));
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
@Evolving
public enum StatusGetOpts {
/** Retrieve Counters with Status */
GET_COUNTERS
GET_COUNTERS,
GET_MEMORY_USAGE
}
3 changes: 3 additions & 0 deletions tez-api/src/main/proto/DAGApiRecords.proto
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ message DAGStatusProto {
optional ProgressProto DAGProgress = 3;
repeated StringProgressPairProto vertexProgress = 4;
optional TezCountersProto dagCounters = 5;
optional int64 memoryUsedByAM = 6;
optional int64 memoryUsedByTasks = 7;
}

message PlanLocalResourcesProto {
Expand All @@ -299,6 +301,7 @@ message TezCountersProto {

enum StatusGetOptsProto {
GET_COUNTERS = 0;
GET_MEMORY_USAGE = 1;
}

message VertexLocationHintProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public void addVertexProgress(String name, ProgressBuilder progress) {
getBuilder().addVertexProgress(builder.build());
}

//TODO: let this be a map of values in protobuf 3.x
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this todo for? or am i missing something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to put these memory related stuff into a map, which can be dinamically extended later with other metrics if needed, but unfortunately the protoc compiler which is used in protobuf 2.5 doesn't support maps

public void setMemoryUsage(long memoryUsedByAM, long memoryUsedByTasks) {
Builder builder = getBuilder();
builder.setMemoryUsedByAM(memoryUsedByAM);
builder.setMemoryUsedByTasks(memoryUsedByTasks);
}

public DAGStatusProto getProto() {
return getBuilder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,12 @@ public String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, No
return null;
}

@Override
public long getTotalUsedMemory() {
long totalUsedMemory = 0;
for (int i = 0; i < taskCommunicators.length; i++) {
totalUsedMemory += taskCommunicators[i].getTaskCommunicator().getTotalUsedMemory();
}
return totalUsedMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ public interface TaskCommunicatorManagerInterface {

String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId);

long getTotalUsedMemory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static final class ContainerInfo {
Credentials credentials = null;
boolean credentialsChanged = false;
boolean taskPulled = false;
long usedMemory = 0;

void reset() {
taskSpec = null;
Expand Down Expand Up @@ -382,6 +383,7 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce
response.setLastRequestId(requestId);
containerInfo.lastRequestId = requestId;
containerInfo.lastResponse = response;
containerInfo.usedMemory = request.getUsedMemory();
return response;
}

Expand Down Expand Up @@ -466,4 +468,8 @@ protected ContainerInfo getContainerInfo(ContainerId containerId) {
protected ContainerId getContainerForAttempt(TezTaskAttemptID taskAttemptId) {
return attemptToContainerMap.get(taskAttemptId);
}

public long getTotalUsedMemory() {
return registeredContainers.values().stream().mapToLong(c -> c.usedMemory).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.tez.dag.app.dag.impl;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
Expand Down Expand Up @@ -244,6 +246,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private static final CommitCompletedTransition COMMIT_COMPLETED_TRANSITION =
new CommitCompletedTransition();

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();

protected static final
StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
stateMachineFactory
Expand Down Expand Up @@ -940,6 +944,10 @@ public DAGStatusBuilder getDAGStatus(Set<StatusGetOpts> statusOptions) {
if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) {
status.setDAGCounters(getAllCounters());
}
if (statusOptions.contains(StatusGetOpts.GET_MEMORY_USAGE)) {
status.setMemoryUsage(memoryMXBean.getHeapMemoryUsage().getUsed(),
taskCommunicatorManagerInterface.getTotalUsedMemory());
}
return status;
} finally {
readLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,13 @@ public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNo
return null;
}

/**
* Return the amount of memory used by the containers. Each container is supposed to refresh
* its current state via heartbeat requests, and the TaskCommunicator implementation is supposed
* to aggregate this properly.
* @return memory in MB
*/
public long getTotalUsedMemory() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public Void call() throws Exception {
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 50000, 0);
doHeartbeat(request, cData);
} else if (version != null && cData.taId.getId() <= version.intValue()) {
preemptContainer(cData);
Expand All @@ -443,7 +443,7 @@ public Void call() throws Exception {
EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId),
MockDAGAppMaster.this.getContext().getClock().getTime()));
TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events,
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId, cData.nextFromEventId, 10000, 0);
doHeartbeat(request, cData);
cData.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,22 @@ public class TezHeartbeatRequest implements Writable {
private int preRoutedStartIndex;
private int maxEvents;
private long requestId;
private long usedMemory;

public TezHeartbeatRequest() {
}

public TezHeartbeatRequest(long requestId, List<TezEvent> events,
int preRoutedStartIndex, String containerIdentifier,
TezTaskAttemptID taskAttemptID, int startIndex, int maxEvents) {
TezTaskAttemptID taskAttemptID, int startIndex, int maxEvents, long usedMemory) {
this.containerIdentifier = containerIdentifier;
this.requestId = requestId;
this.events = Collections.unmodifiableList(events);
this.startIndex = startIndex;
this.preRoutedStartIndex = preRoutedStartIndex;
this.maxEvents = maxEvents;
this.currentTaskAttemptID = taskAttemptID;
this.usedMemory = usedMemory;
}

public String getContainerIdentifier() {
Expand Down Expand Up @@ -83,6 +85,10 @@ public TezTaskAttemptID getCurrentTaskAttemptID() {
return currentTaskAttemptID;
}

public long getUsedMemory() {
return usedMemory;
}

@Override
public void write(DataOutput out) throws IOException {
if (events != null) {
Expand All @@ -105,6 +111,7 @@ public void write(DataOutput out) throws IOException {
out.writeInt(maxEvents);
out.writeLong(requestId);
Text.writeString(out, containerIdentifier);
out.writeLong(usedMemory);
}

@Override
Expand All @@ -128,6 +135,7 @@ public void readFields(DataInput in) throws IOException {
maxEvents = in.readInt();
requestId = in.readLong();
containerIdentifier = Text.readString(in);
usedMemory = in.readLong();
}

@Override
Expand All @@ -140,6 +148,7 @@ public String toString() {
+ ", maxEventsToGet=" + maxEvents
+ ", taskAttemptId=" + currentTaskAttemptID
+ ", eventCount=" + (events != null ? events.size() : 0)
+ ", usedMemory=" + usedMemory
+ " }";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.tez.runtime.task;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -136,6 +138,7 @@ static class HeartbeatCallable implements Callable<Boolean> {

private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
private static final float LOG_COUNTER_BACKOFF = 1.3f;
private static final int HEAP_MEMORY_USAGE_UPDATE_INTERVAL = 5000; // 5 seconds

private final RuntimeTask task;
private final EventMetaData updateEventMetadata;
Expand All @@ -157,6 +160,10 @@ static class HeartbeatCallable implements Callable<Boolean> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private long usedMemory = 0;
private long heapMemoryUsageUpdatedTime = System.currentTimeMillis() - HEAP_MEMORY_USAGE_UPDATE_INTERVAL;

/*
* Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
* log counters.
Expand Down Expand Up @@ -263,7 +270,7 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
int fromPreRoutedEventId = task.getNextPreRoutedEventId();
int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId,
containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, getUsedMemory());
LOG.debug("Sending heartbeat to AM, request={}", request);

maybeLogCounters();
Expand Down Expand Up @@ -305,6 +312,15 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
return new ResponseWrapper(false, numEventsReceived);
}

private long getUsedMemory() {
long now = System.currentTimeMillis();
if (now - heapMemoryUsageUpdatedTime > HEAP_MEMORY_USAGE_UPDATE_INTERVAL) {
usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
heapMemoryUsageUpdatedTime = now;
}
return usedMemory;
}

public void markComplete() {
// Notify to clear pending events, if any.
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -211,13 +212,18 @@ public void testSleepJob() throws TezException, IOException, InterruptedExceptio
+ dagStatus.getState());
Thread.sleep(500l);
dagStatus = dagClient.getDAGStatus(null);
assertTrue("Memory used by AM is supposed to be 0 if not requested", dagStatus.getMemoryUsedByAM() == 0);
assertTrue("Memory used by tasks is supposed to be 0 if not requested", dagStatus.getMemoryUsedByTasks() == 0);
}
dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS, StatusGetOpts.GET_MEMORY_USAGE));

assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
assertNotNull(dagStatus.getDAGCounters());
assertNotNull(dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
assertNotNull(dagStatus.getDAGCounters().findCounter(TaskCounter.GC_TIME_MILLIS));
assertTrue("Memory used by AM is supposed to be >0", dagStatus.getMemoryUsedByAM() > 0);
assertTrue("Memory used by tasks is supposed to be >0", dagStatus.getMemoryUsedByTasks() > 0);

ExampleDriver.printDAGStatus(dagClient, new String[] { "SleepVertex" }, true, true);
tezSession.stop();
}
Expand Down