Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public boolean hasNext() throws IOException {
return message != null;
} catch (java.io.EOFException e) {
reader.close();

if (!fileIt.hasNext()) {
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,14 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
// time etc).
if (dagJson == null) {
dagJson = jsonObject;
} else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
.optJSONObject(ATSConstants.DAG_PLAN) == null) {
// if DAG_PLAN is not filled already, let's try to fetch it from other
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject
.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
} else{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: fix indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
.optJSONObject(ATSConstants.DAG_PLAN) == null) {
// if DAG_PLAN is not filled already, let's try to fetch it from other
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject
.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
}
mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS);
}
JSONArray relatedEntities = dagJson.optJSONArray(Constants
.RELATED_ENTITIES);
Expand Down Expand Up @@ -268,6 +271,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
}
if (!vertexJsonMap.containsKey(vertexName)) {
vertexJsonMap.put(vertexName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap);
break;
Expand All @@ -281,6 +286,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
}
if (!taskJsonMap.containsKey(taskName)) {
taskJsonMap.put(taskName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap);
break;
Expand All @@ -294,6 +301,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
}
if (!attemptJsonMap.containsKey(taskAttemptName)) {
attemptJsonMap.put(taskAttemptName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap);
break;
Expand All @@ -311,4 +320,17 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
"Please provide a valid/complete history log file containing " + dagId);
}
}

private void mergeSubJSONArray(JSONObject source, JSONObject destination, String key)
throws JSONException {
if (source.optJSONArray(key) == null) {
source.put(key, new JSONArray());
}
if (destination.optJSONArray(key) == null) {
destination.put(key, new JSONArray());
}
for (int i = 0; i < source.getJSONArray(key).length(); i++) {
destination.getJSONArray(key).put(source.getJSONArray(key).get(i));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ public static void parseEvents(JSONArray eventNodes, List<Event> eventList) thro
JSONObject eventNode = eventNodes.optJSONObject(i);
final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
final String eventType = eventNode.optString(Constants.EVENT_TYPE);
final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP) == 0
? eventNode.optLong(Constants.TIMESTAMP) : eventNode.optLong(Constants.EVENT_TIME_STAMP);

Event event = new Event(eventInfo, eventType, time);

eventList.add(event);

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,12 +556,12 @@ private static JSONObject convertTaskFinishedEvent(HistoryEventProto event) thro
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);

long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);

JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, startTime);
otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, there were corner cases where events will not be properly populated. May be in cases, where vertices were shutdown due to errors or so (need to check).

In such cases, this would have returned "-ve" value earlier.

Current patch seem to change the start_time, depending on getEventTime. This could give a perspective that the task/vertex was there for very short time.

Can you plz share more info on prev error? Were you getting -ve values earlier for which this is being modified?

Copy link
Contributor Author

@abstractdog abstractdog Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I saw this for all protobuf history files, that's why I'm not suspecting corner case
I found that in case of a TASK_FINISHED finished event, there is always an event time (which is the end time obviously) and timeTaken in event_data, but there is no startTime there

this string is what the debugger writes for a HistoryLoggerProtos$HistoryEventProto instance while doing this conversion:

event_type: "TASK_FINISHED"
event_time: 1628149977709
app_id: "application_1628051798891_0030"
dag_id: "dag_1628051798891_0030_1"
vertex_id: "vertex_1628051798891_0030_1_00"
task_id: "task_1628051798891_0030_1_00_000001"
event_data {
  key: "timeTaken"
  value: "4193"
}
event_data {
  key: "status"
  value: "SUCCEEDED"
}
event_data {
  key: "numFailedTaskAttempts"
  value: "0"
}
event_data {
  key: "successfulAttemptId"
  value: "attempt_1628051798891_0030_1_00_000001_0"
}
event_data {
  key: "diagnostics"
  value: ""
}
event_data {
  key: "counters"
  value: "..."
}

the root cause of this behavior would be:
https://github.com/apache/tez/blob/master/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java#L392

  private HistoryEventProto convertTaskFinishedEvent(TaskFinishedEvent event) {
    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
        null, null, null, null, event.getTaskID(), null, null);

    addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));

here I can see the builder consumes only event.getFinishTime() for the "time" parameter, and startTime is shipped indirectly...according to blame, this code part is unchanged since the introduction of proto history logger (TEZ-3915)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the note. Earlier code didn't populate START_TIME (& had only timeTaken) causing the issue.

otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);

otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
Expand Down Expand Up @@ -620,11 +620,13 @@ private static JSONObject convertVertexFinishedEvent(HistoryEventProto event)
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);

long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);

JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getEventTime() - startTime));
otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);

otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.tez.analyzer;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;

Expand Down Expand Up @@ -54,11 +53,4 @@ public interface Analyzer {
* @return description of analyzer
*/
public String getDescription();

/**
* Get config properties related to this analyzer
*
* @return config related to analyzer
*/
public Configuration getConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ public static void main(String argv[]){
"Print task-to-node assignment details of a DAG");
pgd.addClass("TaskAttemptResultStatisticsAnalyzer", TaskAttemptResultStatisticsAnalyzer.class,
"Print vertex:node:status level details of task attempt results");
pgd.addClass("InputReadErrorAnalyzer", InputReadErrorAnalyzer.class,
"Print INPUT_READ_ERROR sources");
pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
"Print the task concurrency details in a DAG");
pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,
"Find critical path at vertex level in a DAG");
pgd.addClass("OneOnOneEdgeAnalyzer", OneOnOneEdgeAnalyzer.class,
"Find out schedule misses in 1:1 edges in a DAG");
pgd.addClass("DagOverviewAnalyzer", DagOverviewAnalyzer.class,
"Print basic dag information (dag/vertex events)");
pgd.addClass("TaskHangAnalyzer", TaskHangAnalyzer.class,
"Print all vertices/tasks and their last attempts with status/duration/node");
exitCode = pgd.run(argv);
} catch(Throwable e){
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@
*/
public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer {

private final Configuration config;

private static final String[] headers =
{ "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };

private final CSVResult csvResult;

public ContainerReuseAnalyzer(Configuration config) {
this.config = config;
super(config);
this.csvResult = new CSVResult(headers);
}

Expand Down Expand Up @@ -82,11 +80,6 @@ public String getDescription() {
return "Get details on container reuse analysis";
}

@Override
public Configuration getConfiguration() {
return config;
}

public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ public List<String> getNotes() {
ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();

public CriticalPathAnalyzer() {
super(new Configuration());
}

public CriticalPathAnalyzer(Configuration conf) {
setConf(conf);
super(conf);
}

@Override
Expand Down Expand Up @@ -643,13 +644,9 @@ public String getDescription() {
return "Analyze critical path of the DAG";
}

@Override
public Configuration getConfiguration() {
return getConf();
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
Configuration config = new Configuration();
int res = ToolRunner.run(config, new CriticalPathAnalyzer(config), args);
System.exit(res);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.analyzer.plugins;

import java.text.SimpleDateFormat;
import java.util.Comparator;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.analyzer.Result;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.Event;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.TaskInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;

public class DagOverviewAnalyzer extends TezAnalyzerBase implements Analyzer {
private final String[] headers =
{ "name", "id", "event_type", "status", "event_time", "event_time_str", "diagnostics" };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to include the number of tasks assigned in the vertex as well (can be added in another field called "comments" or "additional info" which can be populated optionally).
e.g "numTasks: " vertex.getNumTasks() + ", failedTasks: " + vertex.getFailedTasks().size()
+ ", completedTasks: " + vertex.getCompletedTasksCount()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I'm adding a "vertex_task_stats" before diagnostics column for better readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final CSVResult csvResult;
private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

public DagOverviewAnalyzer(Configuration config) {
super(config);
csvResult = new CSVResult(headers);
}

@Override
public void analyze(DagInfo dagInfo) throws TezException {
for (Event event : dagInfo.getEvents()) {
csvResult.addRecord(new String[] { dagInfo.getDagId(), dagInfo.getDagId(), event.getType(),
dagInfo.getStatus(), Long.toString(event.getTime()), toDateStr(event.getTime()), "" });
}
for (VertexInfo vertex : dagInfo.getVertices()) {
for (Event event : vertex.getEvents()) {
String vertexFailureInfoIfAny = "";
for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
if (attempt.getStatus().contains("FAILED")) {
vertexFailureInfoIfAny = attempt.getTaskAttemptId() + ": "
+ attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ");
break;
}
}
csvResult.addRecord(new String[] { vertex.getVertexName(), vertex.getVertexId(),
event.getType(), vertex.getStatus(), Long.toString(event.getTime()),
toDateStr(event.getTime()), vertexFailureInfoIfAny });
}

// a failed task can lead to dag failure, so hopefully holds valuable information
for (TaskInfo failedTask : vertex.getFailedTasks()) {
for (Event failedTaskEvent : failedTask.getEvents()) {
if (failedTaskEvent.getType().equalsIgnoreCase("TASK_FINISHED")) {
csvResult.addRecord(new String[] { vertex.getVertexName(), failedTask.getTaskId(),
failedTaskEvent.getType(), failedTask.getStatus(),
Long.toString(failedTaskEvent.getTime()), toDateStr(failedTaskEvent.getTime()),
failedTask.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
}
}
// if we already found a failing task, let's scan the failing attempts as well
for (TaskAttemptInfo failedAttempt : failedTask.getFailedTaskAttempts()) {
for (Event failedTaskAttemptEvent : failedAttempt.getEvents()) {
if (failedTaskAttemptEvent.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) {
csvResult.addRecord(new String[] { vertex.getVertexName(),
failedAttempt.getTaskAttemptId(), failedTaskAttemptEvent.getType(),
failedAttempt.getStatus(), Long.toString(failedTaskAttemptEvent.getTime()),
toDateStr(failedTaskAttemptEvent.getTime()),
failedAttempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
}
}
}
}
}

csvResult.sort(new Comparator<String[]>() {
public int compare(String[] first, String[] second) {
return (int) (Long.parseLong(first[4]) - Long.parseLong(second[4]));
}
});
}

private static synchronized String toDateStr(long time) {
return FORMAT.format(new Date(time));
}

@Override
public Result getResult() throws TezException {
return csvResult;
}

@Override
public String getName() {
return "Dag overview analyzer";
}

@Override
public String getDescription() {
return "High level dag events overview (dag, vertex event summary)."
+ " Helps understand the overall progress of a dag by simply listing the dag/vertex related events";
}

public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
DagOverviewAnalyzer analyzer = new DagOverviewAnalyzer(config);
int res = ToolRunner.run(config, analyzer, args);
analyzer.printResults();
System.exit(res);
}
}
Loading