Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 17 additions & 35 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -225,7 +224,6 @@ public class DAGAppMaster extends AbstractService {
* Priority of the DAGAppMaster shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Joiner PATH_JOINER = Joiner.on('/');

@VisibleForTesting
static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. "
Expand Down Expand Up @@ -313,7 +311,6 @@ public class DAGAppMaster extends AbstractService {
/**
* set of already executed dag names.
*/
Set<String> dagNames = new HashSet<String>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No references to this variable anywhere

Set<String> dagIDs = new HashSet<String>();

protected boolean isLastAMRetry = false;
Expand Down Expand Up @@ -373,19 +370,17 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
this.containerID.toString(), this.appMasterUgi.getShortUserName());

LOG.info("Created DAGAppMaster for application " + applicationAttemptId
+ ", versionInfo=" + dagVersionInfo.toString());
+ ", versionInfo=" + dagVersionInfo);
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");
}

// Pull this WebAppUtils function into Tez until YARN-4186
public static String getRunningLogURL(String nodeHttpAddress,
private static String getRunningLogURL(String nodeHttpAddress,
String containerId, String user) {
if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
|| containerId == null || containerId.isEmpty() || user == null
|| user.isEmpty()) {
if (containerId.isEmpty() || user == null | user.isEmpty()) {
return null;
}
return PATH_JOINER.join(nodeHttpAddress, "node", "containerlogs",
return String.format("%s/node/containerlogs/%s/%s", nodeHttpAddress,
containerId, user);
}

Expand Down Expand Up @@ -696,8 +691,7 @@ private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagE
state = DAGAppMasterState.ERROR;
if (currentDAG != null) {
_updateLoggers(currentDAG, "_post");
String errDiagnostics = errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID();
LOG.info(errDiagnostics);
LOG.info(errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID());
// Inform the current DAG about the error
sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent));
} else {
Expand Down Expand Up @@ -759,22 +753,20 @@ protected synchronized void handle(DAGAppMasterEvent event) {
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString());
System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString());
System.err.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId());
// Stop vertex services if any
stopVertexServices(currentDAG);
if (!isSession) {
LOG.info("Not a session, AM will unregister as DAG has completed");
this.taskSchedulerManager.setShouldUnregisterFlag();
_updateLoggers(currentDAG, "_post");
setStateOnDAGCompletion();
LOG.info("Shutting down on completion of dag:" +
finishEvt.getDAGId().toString());
LOG.info("Shutting down on completion of dag:" + finishEvt.getDAGId());
shutdownHandler.shutdown();
} else {
LOG.info("DAG completed, dagId="
+ finishEvt.getDAGId().toString()
+ ", dagState=" + finishEvt.getDAGState());
LOG.info("DAG completed, dagId=" + finishEvt.getDAGId() + ", dagState="
+ finishEvt.getDAGState());
lastDAGCompletionTime = clock.getTime();
_updateLoggers(currentDAG, "_post");
if (this.historyEventHandler.hasRecoveryFailed()) {
Expand Down Expand Up @@ -1033,17 +1025,16 @@ DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) {

try {
if (LOG.isDebugEnabled()) {
LOG.debug("JSON dump for submitted DAG, dagId=" + dagId.toString()
+ ", json="
+ DAGUtils.generateSimpleJSONPlan(dagPB).toString());
LOG.debug("JSON dump for submitted DAG, dagId=" + dagId + ", json="
+ DAGUtils.generateSimpleJSONPlan(dagPB));
}
} catch (JSONException e) {
LOG.warn("Failed to generate json for DAG", e);
}

writeDebugArtifacts(dagPB, newDag);
return newDag;
} // end createDag()
}

private void writeDebugArtifacts(DAGPlan dagPB, DAGImpl newDag) {
boolean debugArtifacts =
Expand All @@ -1057,7 +1048,7 @@ private void writeDebugArtifacts(DAGPlan dagPB, DAGImpl newDag) {

private void writePBTextFile(DAG dag) {
String logFile = logDirs[new Random().nextInt(logDirs.length)] + File.separatorChar
+ dag.getID().toString() + "-" + TezConstants.TEZ_PB_PLAN_TEXT_NAME;
+ dag.getID() + "-" + TezConstants.TEZ_PB_PLAN_TEXT_NAME;

LOG.info("Writing DAG plan to: " + logFile);
File outFile = new File(logFile);
Expand All @@ -1066,7 +1057,7 @@ private void writePBTextFile(DAG dag) {
printWriter.println(TezUtilsInternal.convertDagPlanToString(dag.getJobPlan()));
printWriter.close();
} catch (IOException e) {
LOG.warn("Failed to write TEZ_PLAN to " + outFile.toString(), e);
LOG.warn("Failed to write TEZ_PLAN to " + outFile, e);
}
}

Expand Down Expand Up @@ -2269,15 +2260,6 @@ public void handle(VertexEvent event) {
}
}

private static void validateInputParam(String value, String param)
Copy link
Contributor Author

@belugabehr belugabehr Jan 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used in one place. Limited value,

throws IOException {
if (value == null) {
String msg = param + " is null";
LOG.error(msg);
throw new IOException(msg);
}
}

private long checkAndHandleDAGClientTimeout() throws TezException {
if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.RECOVERING).contains(this.state)
|| sessionStopped.get()) {
Expand Down Expand Up @@ -2346,8 +2328,8 @@ public static void main(String[] args) {
clientVersion = VersionInfo.UNKNOWN;
}

validateInputParam(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV);
Objects.requireNonNull(appSubmitTimeStr,
ApplicationConstants.APP_SUBMIT_TIME_ENV + " is null");

ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationAttemptId applicationAttemptId =
Expand Down