Skip to content

Commit 7df932c

Browse files
committed
TEZ-4231: SimpleHistoryParser doesn't merge events correctly
1 parent 464d86d commit 7df932c

23 files changed

Lines changed: 430 additions & 137 deletions

tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ public boolean hasNext() throws IOException {
9898
return message != null;
9999
} catch (java.io.EOFException e) {
100100
reader.close();
101-
102101
if (!fileIt.hasNext()) {
103102
return false;
104103
} else {

tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,14 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
236236
// time etc).
237237
if (dagJson == null) {
238238
dagJson = jsonObject;
239-
} else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
240-
.optJSONObject(ATSConstants.DAG_PLAN) == null) {
241-
// if DAG_PLAN is not filled already, let's try to fetch it from other
242-
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject
243-
.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
239+
} else{
240+
if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
241+
.optJSONObject(ATSConstants.DAG_PLAN) == null) {
242+
// if DAG_PLAN is not filled already, let's try to fetch it from other
243+
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject
244+
.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
245+
}
246+
mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS);
244247
}
245248
JSONArray relatedEntities = dagJson.optJSONArray(Constants
246249
.RELATED_ENTITIES);
@@ -268,6 +271,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
268271
}
269272
if (!vertexJsonMap.containsKey(vertexName)) {
270273
vertexJsonMap.put(vertexName, jsonObject);
274+
} else {
275+
mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS);
271276
}
272277
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap);
273278
break;
@@ -281,6 +286,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
281286
}
282287
if (!taskJsonMap.containsKey(taskName)) {
283288
taskJsonMap.put(taskName, jsonObject);
289+
} else {
290+
mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS);
284291
}
285292
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap);
286293
break;
@@ -294,6 +301,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
294301
}
295302
if (!attemptJsonMap.containsKey(taskAttemptName)) {
296303
attemptJsonMap.put(taskAttemptName, jsonObject);
304+
} else {
305+
mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS);
297306
}
298307
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap);
299308
break;
@@ -311,4 +320,17 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
311320
"Please provide a valid/complete history log file containing " + dagId);
312321
}
313322
}
323+
324+
private void mergeSubJSONArray(JSONObject source, JSONObject destination, String key)
325+
throws JSONException {
326+
if (source.optJSONArray(key) == null) {
327+
source.put(key, new JSONArray());
328+
}
329+
if (destination.optJSONArray(key) == null) {
330+
destination.put(key, new JSONArray());
331+
}
332+
for (int i = 0; i < source.getJSONArray(key).length(); i++) {
333+
destination.getJSONArray(key).put(source.getJSONArray(key).get(i));
334+
}
335+
}
314336
}

tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,12 @@ public static void parseEvents(JSONArray eventNodes, List<Event> eventList) thro
128128
JSONObject eventNode = eventNodes.optJSONObject(i);
129129
final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
130130
final String eventType = eventNode.optString(Constants.EVENT_TYPE);
131-
final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
131+
final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP) == 0
132+
? eventNode.optLong(Constants.TIMESTAMP) : eventNode.optLong(Constants.EVENT_TIME_STAMP);
132133

133134
Event event = new Event(eventInfo, eventType, time);
134135

135136
eventList.add(event);
136-
137137
}
138138
}
139139

tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.tez.analyzer;
2020

21-
import org.apache.hadoop.conf.Configuration;
2221
import org.apache.tez.dag.api.TezException;
2322
import org.apache.tez.history.parser.datamodel.DagInfo;
2423

@@ -54,11 +53,4 @@ public interface Analyzer {
5453
* @return description of analyzer
5554
*/
5655
public String getDescription();
57-
58-
/**
59-
* Get config properties related to this analyzer
60-
*
61-
* @return config related to analyzer
62-
*/
63-
public Configuration getConfiguration();
6456
}

tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,18 @@ public static void main(String argv[]){
4848
"Print task-to-node assignment details of a DAG");
4949
pgd.addClass("TaskAttemptResultStatisticsAnalyzer", TaskAttemptResultStatisticsAnalyzer.class,
5050
"Print vertex:node:status level details of task attempt results");
51+
pgd.addClass("InputReadErrorAnalyzer", InputReadErrorAnalyzer.class,
52+
"Print INPUT_READ_ERROR sources");
5153
pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
5254
"Print the task concurrency details in a DAG");
5355
pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,
5456
"Find critical path at vertex level in a DAG");
5557
pgd.addClass("OneOnOneEdgeAnalyzer", OneOnOneEdgeAnalyzer.class,
5658
"Find out schedule misses in 1:1 edges in a DAG");
59+
pgd.addClass("DagOverviewAnalyzer", DagOverviewAnalyzer.class,
60+
"Print basic dag information (dag/vertex events)");
61+
pgd.addClass("TaskHangAnalyzer", TaskHangAnalyzer.class,
62+
"Print all vertices/tasks and their last attempts with status/duration/node");
5763
exitCode = pgd.run(argv);
5864
} catch(Throwable e){
5965
e.printStackTrace();

tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,13 @@
3939
*/
4040
public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer {
4141

42-
private final Configuration config;
43-
4442
private static final String[] headers =
4543
{ "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };
4644

4745
private final CSVResult csvResult;
4846

4947
public ContainerReuseAnalyzer(Configuration config) {
50-
this.config = config;
48+
super(config);
5149
this.csvResult = new CSVResult(headers);
5250
}
5351

@@ -82,11 +80,6 @@ public String getDescription() {
8280
return "Get details on container reuse analysis";
8381
}
8482

85-
@Override
86-
public Configuration getConfiguration() {
87-
return config;
88-
}
89-
9083
public static void main(String[] args) throws Exception {
9184
Configuration config = new Configuration();
9285
ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config);

tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,11 @@ public List<String> getNotes() {
113113
ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();
114114

115115
public CriticalPathAnalyzer() {
116+
super(new Configuration());
116117
}
117118

118119
public CriticalPathAnalyzer(Configuration conf) {
119-
setConf(conf);
120+
super(conf);
120121
}
121122

122123
@Override
@@ -643,13 +644,9 @@ public String getDescription() {
643644
return "Analyze critical path of the DAG";
644645
}
645646

646-
@Override
647-
public Configuration getConfiguration() {
648-
return getConf();
649-
}
650-
651647
public static void main(String[] args) throws Exception {
652-
int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
648+
Configuration config = new Configuration();
649+
int res = ToolRunner.run(config, new CriticalPathAnalyzer(config), args);
653650
System.exit(res);
654651
}
655652

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p/>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p/>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.tez.analyzer.plugins;
19+
20+
import java.text.SimpleDateFormat;
21+
import java.util.Comparator;
22+
import java.util.Date;
23+
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.util.ToolRunner;
26+
import org.apache.tez.analyzer.Analyzer;
27+
import org.apache.tez.analyzer.CSVResult;
28+
import org.apache.tez.analyzer.Result;
29+
import org.apache.tez.dag.api.TezException;
30+
import org.apache.tez.history.parser.datamodel.DagInfo;
31+
import org.apache.tez.history.parser.datamodel.Event;
32+
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
33+
import org.apache.tez.history.parser.datamodel.TaskInfo;
34+
import org.apache.tez.history.parser.datamodel.VertexInfo;
35+
36+
public class DagOverviewAnalyzer extends TezAnalyzerBase implements Analyzer {
37+
private final String[] headers =
38+
{ "name", "id", "event_type", "status", "event_time", "event_time_str", "diagnostics" };
39+
private final CSVResult csvResult;
40+
private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
41+
42+
public DagOverviewAnalyzer(Configuration config) {
43+
super(config);
44+
csvResult = new CSVResult(headers);
45+
}
46+
47+
@Override
48+
public void analyze(DagInfo dagInfo) throws TezException {
49+
for (Event event : dagInfo.getEvents()) {
50+
csvResult.addRecord(new String[] { dagInfo.getDagId(), dagInfo.getDagId(), event.getType(),
51+
dagInfo.getStatus(), Long.toString(event.getTime()), toDateStr(event.getTime()), "" });
52+
}
53+
for (VertexInfo vertex : dagInfo.getVertices()) {
54+
for (Event event : vertex.getEvents()) {
55+
String vertexFailureInfoIfAny = "";
56+
for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
57+
if (attempt.getStatus().contains("FAILED")) {
58+
vertexFailureInfoIfAny = attempt.getTaskAttemptId() + ": "
59+
+ attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ");
60+
break;
61+
}
62+
}
63+
csvResult.addRecord(new String[] { vertex.getVertexName(), vertex.getVertexId(),
64+
event.getType(), vertex.getStatus(), Long.toString(event.getTime()),
65+
toDateStr(event.getTime()), vertexFailureInfoIfAny });
66+
}
67+
68+
// a failed task can lead to dag failure, so hopefully holds valuable information
69+
for (TaskInfo failedTask : vertex.getFailedTasks()) {
70+
for (Event failedTaskEvent : failedTask.getEvents()) {
71+
if (failedTaskEvent.getType().equalsIgnoreCase("TASK_FINISHED")) {
72+
csvResult.addRecord(new String[] { vertex.getVertexName(), failedTask.getTaskId(),
73+
failedTaskEvent.getType(), failedTask.getStatus(),
74+
Long.toString(failedTaskEvent.getTime()), toDateStr(failedTaskEvent.getTime()),
75+
failedTask.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
76+
}
77+
}
78+
// if we already found a failing task, let's scan the failing attempts as well
79+
for (TaskAttemptInfo failedAttempt : failedTask.getFailedTaskAttempts()) {
80+
for (Event failedTaskAttemptEvent : failedAttempt.getEvents()) {
81+
if (failedTaskAttemptEvent.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) {
82+
csvResult.addRecord(new String[] { vertex.getVertexName(),
83+
failedAttempt.getTaskAttemptId(), failedTaskAttemptEvent.getType(),
84+
failedAttempt.getStatus(), Long.toString(failedTaskAttemptEvent.getTime()),
85+
toDateStr(failedTaskAttemptEvent.getTime()),
86+
failedAttempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
87+
}
88+
}
89+
}
90+
}
91+
}
92+
93+
csvResult.sort(new Comparator<String[]>() {
94+
public int compare(String[] first, String[] second) {
95+
return (int) (Long.parseLong(first[4]) - Long.parseLong(second[4]));
96+
}
97+
});
98+
}
99+
100+
private static synchronized String toDateStr(long time) {
101+
return FORMAT.format(new Date(time));
102+
}
103+
104+
@Override
105+
public Result getResult() throws TezException {
106+
return csvResult;
107+
}
108+
109+
@Override
110+
public String getName() {
111+
return "Dag overview analyzer";
112+
}
113+
114+
@Override
115+
public String getDescription() {
116+
return "High level dag events overview (dag, vertex event summary)."
117+
+ " Helps understand the overall progress of a dag by simply listing the dag/vertex related events";
118+
}
119+
120+
public static void main(String[] args) throws Exception {
121+
Configuration config = new Configuration();
122+
DagOverviewAnalyzer analyzer = new DagOverviewAnalyzer(config);
123+
int res = ToolRunner.run(config, analyzer, args);
124+
analyzer.printResults();
125+
System.exit(res);
126+
}
127+
}

0 commit comments

Comments
 (0)