Skip to content

Commit 92119cb

Browse files
TEZ-4666: Migrate tez-tools python scripts to python3
1 parent 1c8af3e commit 92119cb

7 files changed

Lines changed: 106 additions & 81 deletions

File tree

tez-tools/counter-diff/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ Before using the tool, make sure to install texttable using `pip install texttab
2626

2727
To use the tool, run
2828

29-
`python counter-diff.py dag_1.zip dag_2.zip [--detail]`
29+
`python3 counter-diff.py dag_1.zip dag_2.zip [--detail]`
3030

3131
This will print counter output difference between the specified DAGs like this
3232

33-
Example: `python counter-diff.py dag_1499978510619_0002_143.zip dag_1499978510619_0002_144.zip`
33+
Example: `python3 counter-diff.py dag_1499978510619_0002_143.zip dag_1499978510619_0002_144.zip`
3434

3535
```
3636
+-------------------+-------------------------------------+----------------------------+----------------------------+-------------+
@@ -81,4 +81,4 @@ Example: `python counter-diff.py dag_1499978510619_0002_143.zip dag_149997851061
8181
| | KILLED_TASKS | 0 | 0 | 0 |
8282
| | TIME_TAKEN | 250198 | 68981 | -181217 |
8383
+-------------------+-------------------------------------+----------------------------+----------------------------+-------------+
84-
```
84+
```

tez-tools/counter-diff/counter-diff.py

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
# under the License.
1818
#
1919

20-
import imp, json, os, shutil, sys, tempfile, zipfile
20+
import json
21+
import os
22+
import shutil
23+
import sys
24+
import tempfile
25+
import zipfile
26+
2127
try:
22-
imp.find_module('texttable')
2328
from texttable import Texttable
2429
except ImportError:
25-
sys.stderr.write("Could not import Texttable\nRetry after 'pip install texttable'\n")
26-
exit()
30+
print("Could not import Texttable. Retry after 'pip install texttable'", file=sys.stderr)
31+
sys.exit(1)
2732

2833
tmpdir = tempfile.mkdtemp()
2934

@@ -47,28 +52,33 @@ def diff(file1, file2):
4752
# also in dag.json data is inside "dag" root node
4853
file1_using_dag_json = True
4954
dag_json_file1 = os.path.join(file1_dir, "dag.json")
50-
if os.path.isfile(dag_json_file1) == False:
55+
if not os.path.isfile(dag_json_file1):
5156
file1_using_dag_json = False
5257
dag_json_file1 = os.path.join(file1_dir, "TEZ_DAG")
53-
if os.path.isfile(dag_json_file1) == False:
54-
print "Unable to find dag.json/TEZ_DAG file inside the archive " + file1
55-
exit()
58+
if not os.path.isfile(dag_json_file1):
59+
print("Unable to find dag.json/TEZ_DAG file inside the archive " + file1)
60+
sys.exit()
5661

5762
file2_using_dag_json = True
5863
dag_json_file2 = os.path.join(file2_dir, "dag.json")
59-
if os.path.isfile(dag_json_file2) == False:
64+
if not os.path.isfile(dag_json_file2):
6065
file2_using_dag_json = False
6166
dag_json_file2 = os.path.join(file2_dir, "TEZ_DAG")
62-
if os.path.isfile(dag_json_file2) == False:
63-
print "Unable to find dag.json/TEZ_DAG file inside the archive " + file1
64-
exit()
67+
if not os.path.isfile(dag_json_file2):
68+
print("Unable to find dag.json/TEZ_DAG file inside the archive " + file1)
69+
sys.exit()
6570

6671
# populate diff table
6772
difftable = {}
6873
with open(dag_json_file1) as data_file:
6974
file1_dag_json = json.load(data_file)["dag"] if file1_using_dag_json else json.load(data_file)
70-
counters = file1_dag_json['otherinfo']['counters']
71-
for group in counters['counterGroups']:
75+
76+
# Safe access to otherinfo and counters
77+
otherinfo = file1_dag_json.get('otherinfo', {})
78+
counters = otherinfo.get('counters', {})
79+
80+
# Iterate only if counterGroups exists
81+
for group in counters.get('counterGroups', []):
7282
countertable = {}
7383
for counter in group['counters']:
7484
counterName = counter['counterName']
@@ -78,22 +88,24 @@ def diff(file1, file2):
7888
groupName = group['counterGroupName']
7989
difftable[groupName] = countertable
8090

81-
# add other info
82-
otherinfo = file1_dag_json['otherinfo']
91+
# add other info safely
8392
countertable = {}
84-
countertable["TIME_TAKEN"] = [otherinfo['timeTaken']]
85-
countertable["COMPLETED_TASKS"] = [otherinfo['numCompletedTasks']]
86-
countertable["SUCCEEDED_TASKS"] = [otherinfo['numSucceededTasks']]
87-
countertable["FAILED_TASKS"] = [otherinfo['numFailedTasks']]
88-
countertable["KILLED_TASKS"] = [otherinfo['numKilledTasks']]
89-
countertable["FAILED_TASK_ATTEMPTS"] = [otherinfo['numFailedTaskAttempts']]
90-
countertable["KILLED_TASK_ATTEMPTS"] = [otherinfo['numKilledTaskAttempts']]
93+
countertable["TIME_TAKEN"] = [otherinfo.get('timeTaken', 0)]
94+
countertable["COMPLETED_TASKS"] = [otherinfo.get('numCompletedTasks', 0)]
95+
countertable["SUCCEEDED_TASKS"] = [otherinfo.get('numSucceededTasks', 0)]
96+
countertable["FAILED_TASKS"] = [otherinfo.get('numFailedTasks', 0)]
97+
countertable["KILLED_TASKS"] = [otherinfo.get('numKilledTasks', 0)]
98+
countertable["FAILED_TASK_ATTEMPTS"] = [otherinfo.get('numFailedTaskAttempts', 0)]
99+
countertable["KILLED_TASK_ATTEMPTS"] = [otherinfo.get('numKilledTaskAttempts', 0)]
91100
difftable['otherinfo'] = countertable
92101

93102
with open(dag_json_file2) as data_file:
94103
file2_dag_json = json.load(data_file)["dag"] if file2_using_dag_json else json.load(data_file)
95-
counters = file2_dag_json['otherinfo']['counters']
96-
for group in counters['counterGroups']:
104+
105+
otherinfo = file2_dag_json.get('otherinfo', {})
106+
counters = otherinfo.get('counters', {})
107+
108+
for group in counters.get('counterGroups', []):
97109
groupName = group['counterGroupName']
98110
if groupName not in difftable:
99111
difftable[groupName] = {}
@@ -105,16 +117,15 @@ def diff(file1, file2):
105117
countertable[counterName] = [0]
106118
countertable[counterName].append(counter['counterValue'])
107119

108-
# append other info
109-
otherinfo = file2_dag_json['otherinfo']
120+
# append other info safely
110121
countertable = difftable['otherinfo']
111-
countertable["TIME_TAKEN"].append(otherinfo['timeTaken'])
112-
countertable["COMPLETED_TASKS"].append(otherinfo['numCompletedTasks'])
113-
countertable["SUCCEEDED_TASKS"].append(otherinfo['numSucceededTasks'])
114-
countertable["FAILED_TASKS"].append(otherinfo['numFailedTasks'])
115-
countertable["KILLED_TASKS"].append(otherinfo['numKilledTasks'])
116-
countertable["FAILED_TASK_ATTEMPTS"].append(otherinfo['numFailedTaskAttempts'])
117-
countertable["KILLED_TASK_ATTEMPTS"].append(otherinfo['numKilledTaskAttempts'])
122+
countertable["TIME_TAKEN"].append(otherinfo.get('timeTaken', 0))
123+
countertable["COMPLETED_TASKS"].append(otherinfo.get('numCompletedTasks', 0))
124+
countertable["SUCCEEDED_TASKS"].append(otherinfo.get('numSucceededTasks', 0))
125+
countertable["FAILED_TASKS"].append(otherinfo.get('numFailedTasks', 0))
126+
countertable["KILLED_TASKS"].append(otherinfo.get('numKilledTasks', 0))
127+
countertable["FAILED_TASK_ATTEMPTS"].append(otherinfo.get('numFailedTaskAttempts', 0))
128+
countertable["KILLED_TASK_ATTEMPTS"].append(otherinfo.get('numKilledTaskAttempts', 0))
118129
difftable['otherinfo'] = countertable
119130

120131
# if some counters are missing, consider it as 0 and compute delta difference
@@ -134,7 +145,7 @@ def print_table(difftable, name1, name2, detailed=False):
134145
table = Texttable(max_width=0)
135146
table.set_cols_align(["l", "l", "l", "l", "l"])
136147
table.set_cols_valign(["m", "m", "m", "m", "m"])
137-
table.add_row(["Counter Group", "Counter Name", name1, name2, "delta"]);
148+
table.add_row(["Counter Group", "Counter Name", name1, name2, "delta"])
138149
for k in sorted(difftable):
139150
# ignore task specific counters in default output
140151
if not detailed and ("_INPUT_" in k or "_OUTPUT_" in k):
@@ -177,13 +188,13 @@ def print_table(difftable, name1, name2, detailed=False):
177188

178189
table.add_row(row)
179190

180-
print table.draw() + "\n"
191+
print(table.draw() + "\n")
181192

182193

183194
def main(argv):
184195
sysargs = len(argv)
185196
if sysargs < 2:
186-
print "Usage: python counter-diff.py dag_file1.zip dag_file2.zip [--detail]"
197+
print("Usage: python3 counter-diff.py dag_file1.zip dag_file2.zip [--detail]")
187198
return -1
188199

189200
file1 = argv[0]
@@ -200,4 +211,4 @@ def main(argv):
200211
try:
201212
sys.exit(main(sys.argv[1:]))
202213
finally:
203-
shutil.rmtree(tmpdir)
214+
shutil.rmtree(tmpdir)

tez-tools/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
texttable==1.7.0

tez-tools/swimlanes/amlogparser.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,12 @@
1717
# under the License.
1818
#
1919

20-
import sys,re
20+
import sys
21+
import re
2122
from itertools import groupby
22-
from bz2 import BZ2File
23-
from gzip import GzipFile as GZFile
24-
try:
25-
from urllib.request import urlopen
26-
except:
27-
from urllib2 import urlopen as urlopen
23+
import bz2
24+
import gzip
25+
from urllib.request import urlopen
2826

2927
class AMRawEvent(object):
3028
def __init__(self, ts, dag, event, args):
@@ -39,7 +37,7 @@ def first(l):
3937
return (l[:1] or [None])[0]
4038

4139
def kv_add(d, k, v):
42-
if(d.has_key(k)):
40+
if(k in d):
4341
oldv = d[k]
4442
if(type(oldv) is list):
4543
oldv.append(v)
@@ -50,7 +48,7 @@ def kv_add(d, k, v):
5048
d[k] = v
5149

5250
def csv_kv(args):
53-
kvs = {};
51+
kvs = {}
5452
pairs = [p.strip() for p in args.split(",")]
5553
for kv in pairs:
5654
if(kv.find("=") == -1):
@@ -160,10 +158,11 @@ def __repr__(self):
160158

161159
class Attempt(object):
162160
def __init__(self, pair):
163-
start = first(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair))
164-
finish = first(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair))
161+
# Consuming iterators immediately with list() for Py3 compatibility
162+
start = first(list(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair)))
163+
finish = first(list(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair)))
165164
if start is None or finish is None:
166-
print [start, finish];
165+
print([start, finish])
167166
self.raw = finish
168167
self.kvs = csv_kv(start.args)
169168
if finish is not None:
@@ -186,26 +185,28 @@ def __repr__(self):
186185

187186
def open_file(f):
188187
if(f.endswith(".gz")):
189-
return GZFile(f)
188+
return gzip.open(f, "rt")
190189
elif(f.endswith(".bz2")):
191-
return BZ2File(f)
190+
return bz2.open(f, "rt")
192191
elif(f.startswith("http://")):
193192
return urlopen(f)
194-
return open(f)
193+
return open(f, "r")
195194

196195
class AMLog(object):
197196
def init(self):
198197
ID=r'[^\]]*'
199198
TS=r'[0-9:\-, ]*'
200199
MAIN_RE=r'^(?P<ts>%(ts)s) [?INFO]? [(?P<thread>%(id)s)] \|?((HistoryEventHandler.criticalEvents)|((org.apache.tez.dag.)?history.HistoryEventHandler))\|?: [HISTORY][DAG:(?P<dag>%(id)s)][Event:(?P<event>%(id)s)]: (?P<args>.*)'
201-
MAIN_RE = MAIN_RE.replace('[','\[').replace(']','\]')
200+
# Fix for SyntaxWarning: using raw strings
201+
MAIN_RE = MAIN_RE.replace('[', r'\[').replace(']', r'\]')
202202
MAIN_RE = MAIN_RE % {'ts' : TS, 'id' : ID}
203203
self.MAIN_RE = re.compile(MAIN_RE)
204204

205205
def __init__(self, f):
206206
fp = open_file(f)
207207
self.init()
208-
self.events = filter(lambda a:a, [self.parse(l.strip()) for l in fp])
208+
# Filter returns iterator in Py3, list() ensures immediate execution
209+
self.events = list(filter(lambda a:a, [self.parse(l.strip()) for l in fp]))
209210

210211
def structure(self):
211212
am = self.appmaster() # this is a copy
@@ -221,7 +222,7 @@ def structure(self):
221222
for d in dags:
222223
d.structure(vertexes)
223224
for a in attempts:
224-
if containers.has_key(a.container):
225+
if a.container in containers:
225226
c = containers[a.container]
226227
c.node = a.node
227228
else:
@@ -242,7 +243,7 @@ def containers(self):
242243
for ev in self.events:
243244
if ev.event == "CONTAINER_STOPPED":
244245
kvs = csv_kv(ev.args)
245-
if containermap.has_key(kvs["containerId"]):
246+
if kvs["containerId"] in containermap:
246247
containermap[kvs["containerId"]].stop = int(kvs["stoppedTime"])
247248
containermap[kvs["containerId"]].status = int(kvs["exitStatus"])
248249
return containers
@@ -265,26 +266,31 @@ def attempts(self):
265266
key = lambda a:a[0]
266267
value = lambda a:a[1]
267268
raw = [(csv_kv(ev.args)["taskAttemptId"], ev) for ev in self.events if ev.event == "TASK_ATTEMPT_STARTED" or ev.event == "TASK_ATTEMPT_FINISHED"]
268-
pairs = groupby(sorted(raw), key = key)
269-
attempts = [Attempt(map(value,p)) for (k,p) in pairs]
269+
# FIX: explicitly pass key to sorted() to avoid comparing AMRawEvent objects
270+
# which causes TypeError in Python 3
271+
pairs = groupby(sorted(raw, key=key), key = key)
272+
# Map returns iterator in Py3, list() creates the necessary list
273+
attempts = [Attempt(list(map(value,p))) for (k,p) in pairs]
270274
return attempts
271275

272276
def parse(self, l):
273277
if(l.find("[HISTORY]") != -1):
274278
m = self.MAIN_RE.match(l)
275-
ts = m.group("ts")
276-
dag = m.group("dag")
277-
event = m.group("event")
278-
args = m.group("args")
279-
return AMRawEvent(ts, dag, event, args)
279+
if m:
280+
ts = m.group("ts")
281+
dag = m.group("dag")
282+
event = m.group("event")
283+
args = m.group("args")
284+
return AMRawEvent(ts, dag, event, args)
285+
return None
280286

281287
def main(argv):
282288
tree = AMLog(argv[0]).structure()
283289
# AM -> dag -> vertex -> task -> attempt
284290
# AM -> container
285291
for d in tree.dags:
286292
for a in d.attempts():
287-
print [a.vertex, a.name, a.container, a.start, a.finish]
293+
print([a.vertex, a.name, a.container, a.start, a.finish])
288294

289295
if __name__ == "__main__":
290296
main(sys.argv[1:])

0 commit comments

Comments
 (0)