Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import static org.apache.tez.dag.app.dag.impl.TezContainer.NULL_TEZ_CONTAINER;

public class TaskAttemptImpl implements TaskAttempt,
EventHandler<TaskAttemptEvent> {

Expand Down Expand Up @@ -187,14 +189,8 @@ public static DataEventDependencyInfo fromProto(DataEventDependencyInfoProto pro
private String trackerName;
private int httpPort;

// TODO Can these be replaced by the container object TEZ-1037
private Container container;
TezContainer container = NULL_TEZ_CONTAINER;
private long allocationTime;
private ContainerId containerId;
protected NodeId containerNodeId;
private String nodeHttpAddress;
private String nodeRackName;

private final Vertex vertex;
private final Task task;
private final TaskLocationHint locationHint;
Expand Down Expand Up @@ -614,8 +610,8 @@ public TaskAttemptReport getReport() {
result.setContainerId(this.getAssignedContainerID());
result.setNodeManagerHost(trackerName);
result.setNodeManagerHttpPort(httpPort);
if (this.containerNodeId != null) {
result.setNodeManagerPort(this.containerNodeId.getPort());
if (this.container.getNodeId() != null) {
result.setNodeManagerPort(this.container.getNodeId().getPort());
}
return result;
} finally {
Expand All @@ -625,11 +621,9 @@ public TaskAttemptReport getReport() {

@Override
public List<String> getDiagnostics() {
List<String> result = new ArrayList<String>();
readLock.lock();
try {
result.addAll(diagnostics);
return result;
return new ArrayList<String>(diagnostics);
} finally {
readLock.unlock();
}
Expand Down Expand Up @@ -714,7 +708,7 @@ public boolean isFinished() {
public ContainerId getAssignedContainerID() {
readLock.lock();
try {
return containerId;
return container.getId();
} finally {
readLock.unlock();
}
Expand All @@ -724,7 +718,7 @@ public ContainerId getAssignedContainerID() {
public Container getAssignedContainer() {
readLock.lock();
try {
return container;
return container == NULL_TEZ_CONTAINER ? null : container;
} finally {
readLock.unlock();
}
Expand All @@ -734,7 +728,7 @@ public Container getAssignedContainer() {
public String getAssignedContainerMgrAddress() {
readLock.lock();
try {
return containerNodeId.toString();
return container.getNodeId().toString();
} finally {
readLock.unlock();
}
Expand All @@ -744,7 +738,7 @@ public String getAssignedContainerMgrAddress() {
public NodeId getNodeId() {
readLock.lock();
try {
return containerNodeId;
return container.getNodeId();
} finally {
readLock.unlock();
}
Expand All @@ -756,7 +750,7 @@ public NodeId getNodeId() {
public String getNodeHttpAddress() {
readLock.lock();
try {
return nodeHttpAddress;
return container.getNodeHttpAddress();
} finally {
readLock.unlock();
}
Expand All @@ -769,7 +763,7 @@ public String getNodeHttpAddress() {
public String getNodeRackName() {
this.readLock.lock();
try {
return this.nodeRackName;
return container.getRackName();
} finally {
this.readLock.unlock();
}
Expand Down Expand Up @@ -1136,16 +1130,16 @@ protected void logJobHistoryAttemptStarted() {
String completedLogsUrl = getCompletedLogsUrl();
TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
attemptId, getVertex().getName(),
launchTime, containerId, containerNodeId,
inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
launchTime, container.getId(), container.getNodeId(),
inProgressLogsUrl, completedLogsUrl, container.getNodeHttpAddress());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), startEvt));
}

protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) {
Preconditions.checkArgument(recoveryData == null
|| recoveryData.getTaskAttemptFinishedEvent() == null,
"log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent");
"log TaskAttemptFinishedEvent again in recovery when there's already another TaskAttemptFinishedEvent");
if (getLaunchTime() == 0) return;

TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
Expand All @@ -1163,7 +1157,7 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion(
TaskAttemptState state, TaskFailureType taskFailureType) {
Preconditions.checkArgument(recoveryData == null
|| recoveryData.getTaskAttemptFinishedEvent() == null,
"log TaskAttemptFinishedEvent again in recovery when there's already another TaskAtttemptFinishedEvent");
"log TaskAttemptFinishedEvent again in recovery when there's already another TaskAttemptFinishedEvent");
if (state == TaskAttemptState.FAILED && taskFailureType == null) {
throw new IllegalStateException("FAILED state must be accompanied by a FailureType");
}
Expand All @@ -1174,8 +1168,8 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion(
String completedLogsUrl = null;
if (finishTime <= 0) {
finishTime = clock.getTime(); // comes here in case it was terminated before launch
unsuccessfulContainerId = containerId;
unsuccessfulContainerNodeId = containerNodeId;
unsuccessfulContainerId = container.getId();
unsuccessfulContainerNodeId = container.getNodeId();
inProgressLogsUrl = getInProgressLogsUrl();
completedLogsUrl = getCompletedLogsUrl();
}
Expand All @@ -1186,8 +1180,8 @@ attemptId, getVertex().getName(), getLaunchTime(),
terminationCause,
StringUtils.join(
getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEvents,
taGeneratedEvents, creationTime, creationCausalTA, allocationTime,
unsuccessfulContainerId, unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, nodeHttpAddress);
taGeneratedEvents, creationTime, creationCausalTA, allocationTime, unsuccessfulContainerId,
unsuccessfulContainerNodeId, inProgressLogsUrl, completedLogsUrl, container.getNodeHttpAddress());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
Expand All @@ -1199,17 +1193,17 @@ private String getInProgressLogsUrl() {
TezConstants.getTezYarnServicePluginName())
|| getVertex().getServicePluginInfo().getContainerLauncherName().equals(
TezConstants.getTezUberServicePluginName())) {
if (containerId != null && nodeHttpAddress != null) {
final String containerIdStr = containerId.toString();
inProgressLogsUrl = nodeHttpAddress
if (container.getId() != null && container.getNodeHttpAddress() != null) {
final String containerIdStr = container.getId().toString();
inProgressLogsUrl = container.getNodeHttpAddress()
+ "/" + "node/containerlogs"
+ "/" + containerIdStr
+ "/" + this.appContext.getUser();
}
} else {
inProgressLogsUrl = appContext.getTaskCommunicatorManager().getInProgressLogsUrl(
getVertex().getTaskCommunicatorIdentifier(),
attemptId, containerNodeId);
attemptId, container.getNodeId());
}
return inProgressLogsUrl;
}
Expand All @@ -1220,15 +1214,16 @@ private String getCompletedLogsUrl() {
TezConstants.getTezYarnServicePluginName())
|| getVertex().getServicePluginInfo().getContainerLauncherName().equals(
TezConstants.getTezUberServicePluginName())) {
if (containerId != null && containerNodeId != null && nodeHttpAddress != null) {
final String containerIdStr = containerId.toString();
if (container.getId() != null && container.getNodeId() != null &&
container.getNodeHttpAddress() != null) {
final String containerIdStr = container.getId().toString();
if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
&& conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
String contextStr = "v_" + getVertex().getName()
+ "_" + this.attemptId.toString();
completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+ "/" + containerNodeId.toString()
+ "/" + container.getNodeId().toString()
+ "/" + containerIdStr
+ "/" + contextStr
+ "/" + this.appContext.getUser();
Expand All @@ -1237,7 +1232,7 @@ private String getCompletedLogsUrl() {
} else {
completedLogsUrl = appContext.getTaskCommunicatorManager().getCompletedLogsUrl(
getVertex().getTaskCommunicatorIdentifier(),
attemptId, containerNodeId);
attemptId, container.getNodeId());
}
return completedLogsUrl;
}
Expand Down Expand Up @@ -1386,13 +1381,10 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
if (event instanceof TaskAttemptEventContainerTerminated) {
TaskAttemptEventContainerTerminated tEvent = (TaskAttemptEventContainerTerminated) event;
AMContainer amContainer = ta.appContext.getAllContainers().get(tEvent.getContainerId());
Container container = amContainer.getContainer();
TezContainer container = new TezContainer(amContainer.getContainer());

ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
ta.container = container;
ta.containerId = tEvent.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress());
}

if (event instanceof TaskAttemptEventContainerTerminatedBySystem) {
Expand All @@ -1401,10 +1393,7 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
Container container = amContainer.getContainer();

ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
ta.container = container;
ta.containerId = tEvent.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress());
ta.container = new TezContainer(container);
}

if (ta.recoveryData == null ||
Expand Down Expand Up @@ -1440,39 +1429,33 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
TaskAttemptEventSubmitted event = (TaskAttemptEventSubmitted) origEvent;

AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId());
Container container = amContainer.getContainer();
TezContainer container = new TezContainer(amContainer.getContainer());

ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
ta.container = container;
ta.containerId = event.getContainerId();
ta.containerNodeId = container.getNodeId();
ta.nodeHttpAddress = StringInterner.intern(container.getNodeHttpAddress());
ta.nodeRackName = StringInterner.intern(RackResolver.resolve(ta.containerNodeId.getHost())
.getNetworkLocation());
ta.container = new TezContainer(container);
ta.lastNotifyProgressTimestamp = ta.clock.getTime();

ta.setLaunchTime();

// TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr = NetUtils
.createSocketAddr(ta.nodeHttpAddress); // TODO: Costly?
InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(container.getNodeHttpAddress()); // TODO: Costly?
ta.trackerName = StringInterner.intern(nodeHttpInetAddr.getHostName());
ta.httpPort = nodeHttpInetAddr.getPort();
ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta));

LOG.info("TaskAttempt: [" + ta.attemptId + "] submitted."
+ " Is using containerId: [" + ta.containerId + "]" + " on NM: ["
+ ta.containerNodeId + "]");
+ " Is using containerId: [" + ta.container.getId() + "]" + " on NM: ["
+ ta.container.getNodeId() + "]");

// JobHistoryEvent.
// The started event represents when the attempt was submitted to the executor.
ta.logJobHistoryAttemptStarted();

// TODO Remove after HDFS-5098
// Compute LOCALITY counter for this task.
if (ta.taskHosts.contains(ta.containerNodeId.getHost())) {
if (ta.taskHosts.contains(ta.container.getNodeId().getHost())) {
ta.localityCounter = DAGCounter.DATA_LOCAL_TASKS;
} else if (ta.taskRacks.contains(ta.nodeRackName)) {
} else if (ta.taskRacks.contains(container.getRackName())) {
ta.localityCounter = DAGCounter.RACK_LOCAL_TASKS;
} else {
// Not computing this if the task does not have locality information.
Expand Down Expand Up @@ -1531,9 +1514,9 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
super.transition(ta, event);
// Inform the scheduler
if (sendSchedulerEvent()) {
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
.getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
ta instanceof DiagnosableEvent ? ((DiagnosableEvent)ta).getDiagnosticInfo() : null,
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), helper.getTaskAttemptState(),
TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
ta instanceof DiagnosableEvent ? ((DiagnosableEvent) ta).getDiagnosticInfo() : null,
ta.getVertex().getTaskSchedulerIdentifier()));
}
}
Expand Down Expand Up @@ -1648,7 +1631,7 @@ protected static class SucceededTransition implements
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {

// If TaskAttempt is recovered to SUCCEEDED, send events generated by this TaskAttempt to vertex
// for its downstream consumers. For normal dag execution, the events are sent by TaskAttmeptListener
// for its downstream consumers. For normal dag execution, the events are sent by TaskAttemptListener
// for performance consideration.
if (ta.recoveryData != null && ta.recoveryData.isTaskAttemptSucceeded()) {
TaskAttemptFinishedEvent taFinishedEvent = ta.recoveryData
Expand All @@ -1671,8 +1654,8 @@ public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptState.SUCCEEDED));

// Inform the Scheduler.
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier()));
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.container.getId(), TaskAttemptState.SUCCEEDED, null, null,
ta.getVertex().getTaskSchedulerIdentifier()));

// Inform the task.
ta.sendEvent(new TaskEventTASucceeded(ta.attemptId));
Expand Down
Loading