Skip to content

Commit 2693035

Browse files
daviesJoshRosen
authored andcommitted
[SPARK-2580] [PySpark] keep silent in worker if JVM close the socket
During rdd.take(n), JVM will close the socket if it had got enough data, the Python worker should keep silent in this case. In the same time, the worker should not print the trackback into stderr if it send the traceback to JVM successfully. Author: Davies Liu <davies.liu@gmail.com> Closes #1625 from davies/error and squashes the following commits: 4fbcc6d [Davies Liu] disable log4j during testing when exception is expected. cc14202 [Davies Liu] keep silent in worker if JVM close the socket (cherry picked from commit ccd5ab5) Signed-off-by: Josh Rosen <joshrosen@apache.org>
1 parent 1a0a2f8 commit 2693035

2 files changed

Lines changed: 19 additions & 8 deletions

File tree

python/pyspark/tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,17 @@ class TestAddFile(PySparkTestCase):
111111
def test_add_py_file(self):
112112
# To ensure that we're actually testing addPyFile's effects, check that
113113
# this job fails due to `userlibrary` not being on the Python path:
114+
# disable logging in log4j temporarily
115+
log4j = self.sc._jvm.org.apache.log4j
116+
old_level = log4j.LogManager.getRootLogger().getLevel()
117+
log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
114118
def func(x):
115119
from userlibrary import UserClass
116120
return UserClass().hello()
117121
self.assertRaises(Exception,
118122
self.sc.parallelize(range(2)).map(func).first)
123+
log4j.LogManager.getRootLogger().setLevel(old_level)
124+
119125
# Add the file, so the job should now succeed:
120126
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
121127
self.sc.addPyFile(path)

python/pyspark/worker.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,19 @@ def main(infile, outfile):
7575
init_time = time.time()
7676
iterator = deserializer.load_stream(infile)
7777
serializer.dump_stream(func(split_index, iterator), outfile)
78-
except Exception as e:
79-
# Write the error to stderr in addition to trying to pass it back to
80-
# Java, in case it happened while serializing a record
81-
print >> sys.stderr, "PySpark worker failed with exception:"
82-
print >> sys.stderr, traceback.format_exc()
83-
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
84-
write_with_length(traceback.format_exc(), outfile)
85-
sys.exit(-1)
78+
except Exception:
79+
try:
80+
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
81+
write_with_length(traceback.format_exc(), outfile)
82+
outfile.flush()
83+
except IOError:
84+
# JVM close the socket
85+
pass
86+
except Exception:
87+
# Write the error to stderr if it happened while serializing
88+
print >> sys.stderr, "PySpark worker failed with exception:"
89+
print >> sys.stderr, traceback.format_exc()
90+
exit(-1)
8691
finish_time = time.time()
8792
report_times(outfile, boot_time, init_time, finish_time)
8893
# Mark the beginning of the accumulators section of the output

0 commit comments

Comments
 (0)