Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private[spark] class PythonRDD(
this.interrupt()
}

override def run(): Unit = Utils.logUncaughtExceptions {
override def run(): Unit = Utils.tryLog {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing a big tryLog (which is sort of dangerous because of the potential of users adding a return statement that would break this code), can we add an explicit try around the two worker.shutdownOutput() calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The run can not have return value. The InterruptedException could happen in any place (when the task is canceled), so I think it's better to do it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can still call return even if the value is Unit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw this happened a few times in the past in Spark. somebody added a return in a giant block that is wrapped in a closure. Code reviewers couldn't catch it because it didn't show up in the diff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin, is the concern that a user might write something like

def foo(): Unit = Utils.tryLog {
   return someCallThatThrowsAnException()
}

and thereby skip the exception-handling logic?

try {
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
Expand Down