Skip to content

Commit 4428edd

Browse files
committed
Revert "revert scary parts"
This reverts commit 95cec7d.
1 parent 59a5013 commit 4428edd

4 files changed

Lines changed: 51 additions & 6 deletions

File tree

repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,14 +1221,24 @@ import org.apache.spark.annotation.DeveloperApi
12211221
)
12221222
}
12231223

1224-
val preamble = """
1225-
|class %s extends Serializable {
1226-
| %s%s%s
1227-
""".stripMargin.format(lineRep.readName, envLines.map(" " + _ + ";\n").mkString, importsPreamble, indentCode(toCompute))
1224+
val preamble = s"""
1225+
|class ${lineRep.readName} extends Serializable {
1226+
| ${envLines.map(" " + _ + ";\n").mkString}
1227+
| $importsPreamble
1228+
|
1229+
| // If we need to construct any objects defined in the REPL on an executor we will need
1230+
| // to pass the outer scope to the appropriate encoder.
1231+
| org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
1232+
| ${indentCode(toCompute)}
1233+
""".stripMargin
1234+
12281235
val postamble = importsTrailer + "\n}" + "\n" +
12291236
"object " + lineRep.readName + " {\n" +
12301237
" val INSTANCE = new " + lineRep.readName + "();\n" +
12311238
"}\n"
1239+
1240+
System.out.println(preamble + postamble)
1241+
12321242
val generate = (m: MemberHandler) => m extraCodeToEvaluate Request.this
12331243

12341244
/*

repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,9 @@ class ReplSuite extends SparkFunSuite {
262262
|import sqlContext.implicits._
263263
|case class TestCaseClass(value: Int)
264264
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
265+
|
266+
|// Test Dataset Serialization in the REPL
267+
|Seq(TestCaseClass(1)).toDS().collect()
265268
""".stripMargin)
266269
assertDoesNotContain("error:", output)
267270
assertDoesNotContain("Exception", output)
@@ -278,6 +281,27 @@ class ReplSuite extends SparkFunSuite {
278281
assertDoesNotContain("java.lang.ClassNotFoundException", output)
279282
}
280283

284+
test("Datasets and encoders") {
285+
val output = runInterpreter("local",
286+
"""
287+
|import org.apache.spark.sql.functions._
288+
|import org.apache.spark.sql.Encoder
289+
|import org.apache.spark.sql.expressions.Aggregator
290+
|import org.apache.spark.sql.TypedColumn
291+
|val simpleSum = new Aggregator[Int, Int, Int] with Serializable {
292+
| def zero: Int = 0 // The initial value.
293+
| def reduce(b: Int, a: Int) = b + a // Add an element to the running total
294+
| def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
295+
| def finish(b: Int) = b // Return the final result.
296+
|}.toColumn
297+
|
298+
|val ds = Seq(1, 2, 3, 4).toDS()
299+
|ds.select(simpleSum).collect
300+
""".stripMargin)
301+
assertDoesNotContain("error:", output)
302+
assertDoesNotContain("Exception", output)
303+
}
304+
281305
test("SPARK-2632 importing a method from non serializable class and not using it.") {
282306
val output = runInterpreter("local",
283307
"""

repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.repl
2020
import java.io.{IOException, ByteArrayOutputStream, InputStream}
2121
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
2222

23+
import org.apache.spark.unsafe.Platform
24+
2325
import scala.util.control.NonFatal
2426

2527
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -56,6 +58,10 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
5658
}
5759

5860
override def findClass(name: String): Class[_] = {
61+
// This is a horrible hack to workround an issue that Janino has when operating on a
62+
// REPL classloader :(.
63+
if (name == "Platform") return classOf[Platform]
64+
5965
userClassPathFirst match {
6066
case true => findClassLocally(name).getOrElse(parentLoader.loadClass(name))
6167
case false => {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.codegen
1919

20+
import org.apache.spark.util.Utils
21+
2022
import scala.collection.mutable
2123
import scala.collection.mutable.ArrayBuffer
2224
import scala.language.existentials
@@ -204,7 +206,7 @@ class CodeGenContext {
204206
case udt: UserDefinedType[_] => javaType(udt.sqlType)
205207
case ObjectType(cls) if cls.isArray => s"${javaType(ObjectType(cls.getComponentType))}[]"
206208
case ObjectType(cls) => cls.getName
207-
case _ => "Object"
209+
case _ => "java.lang.Object"
208210
}
209211

210212
/**
@@ -523,8 +525,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
523525
* Compile the Java source code into a Java class, using Janino.
524526
*/
525527
private[this] def doCompile(code: String): GeneratedClass = {
528+
assert(!code.contains(" Object ", s"java.lang.Object should be used instead in: \n$code"))
529+
assert(!code.contains(" Object[] ", s"java.lang.Object[] should be used instead in: \n$code"))
530+
526531
val evaluator = new ClassBodyEvaluator()
527-
evaluator.setParentClassLoader(getClass.getClassLoader)
532+
evaluator.setParentClassLoader(Utils.getContextOrSparkClassLoader)
528533
// Cannot be under package codegen, or fail with java.lang.InstantiationException
529534
evaluator.setClassName("org.apache.spark.sql.catalyst.expressions.GeneratedClass")
530535
evaluator.setDefaultImports(Array(

0 commit comments

Comments
 (0)