Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1221,10 +1221,16 @@ import org.apache.spark.annotation.DeveloperApi
)
}

val preamble = """
|class %s extends Serializable {
| %s%s%s
""".stripMargin.format(lineRep.readName, envLines.map(" " + _ + ";\n").mkString, importsPreamble, indentCode(toCompute))
val preamble = s"""
|class ${lineRep.readName} extends Serializable {
| ${envLines.map(" " + _ + ";\n").mkString}
| $importsPreamble
|
| // If we need to construct any objects defined in the REPL on an executor we will need
| // to pass the outer scope to the appropriate encoder.
| org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus Can you explain more about this change. This change will be difficult to port to 2.11, I can take a stab.

I just wanted to understand if there is an alternate way to solve this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a handle to any outer class that defines an inner class that is going to used in a Spark Dataset so that we can construct new instances on the executors. It might be helpful to also look at the changes in #9602.

@dragos was saying maybe we don't have this problem in 2.11, but I have not investigated at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @dragos, that is why I am concerned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in 2.11 you probably don't have this problem, do you? All enclosing entities are objects, and they don't need an outer pointer.

| ${indentCode(toCompute)}
""".stripMargin
val postamble = importsTrailer + "\n}" + "\n" +
"object " + lineRep.readName + " {\n" +
" val INSTANCE = new " + lineRep.readName + "();\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ class ReplSuite extends SparkFunSuite {
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
|
|// Test Dataset Serialization in the REPL
|Seq(TestCaseClass(1)).toDS().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
Expand All @@ -278,6 +281,27 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("java.lang.ClassNotFoundException", output)
}

test("Datasets and encoders") {
val output = runInterpreter("local",
"""
|import org.apache.spark.sql.functions._
|import org.apache.spark.sql.Encoder
|import org.apache.spark.sql.expressions.Aggregator
|import org.apache.spark.sql.TypedColumn
|val simpleSum = new Aggregator[Int, Int, Int] with Serializable {
| def zero: Int = 0 // The initial value.
| def reduce(b: Int, a: Int) = b + a // Add an element to the running total
| def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
| def finish(b: Int) = b // Return the final result.
|}.toColumn
|
|val ds = Seq(1, 2, 3, 4).toDS()
|ds.select(simpleSum).collect
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not get this test to fail without this patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case only fails on partial applications of this patch. In particular with the classloader change here without the clasloader hack here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can see it failing now both in scala 2.10 and 2.11.

I will spend more time to understand, if it is possible to fix it in some other way.

}

test("SPARK-2632 importing a method from non serializable class and not using it.") {
val output = runInterpreter("local",
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
case e: ClassNotFoundException => {
val classOption = findClassLocally(name)
classOption match {
case None => throw new ClassNotFoundException(name, e)
case None =>
// If this class has a cause, it will break the internal assumption of Janino
// (the compiler used for Spark SQL code-gen).
// See org.codehaus.janino.ClassLoaderIClassLoader's findIClass, you will see
// its behavior will be changed if there is a cause and the compilation
// of generated class will fail.
throw new ClassNotFoundException(name)
case Some(a) => a
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.{MapData, ArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types._

import org.apache.spark.util.Utils

/**
* Java source for evaluating an [[Expression]] given a [[InternalRow]] of input.
Expand Down Expand Up @@ -524,7 +524,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
*/
private[this] def doCompile(code: String): GeneratedClass = {
val evaluator = new ClassBodyEvaluator()
evaluator.setParentClassLoader(getClass.getClassLoader)
evaluator.setParentClassLoader(Utils.getContextOrSparkClassLoader)
// Cannot be under package codegen, or fail with java.lang.InstantiationException
evaluator.setClassName("org.apache.spark.sql.catalyst.expressions.GeneratedClass")
evaluator.setDefaultImports(Array(
Expand Down