Skip to content

Commit f135b70

Browse files
cloud-fanliancheng
authored andcommitted
[SPARK-18251][SQL] the type of Dataset can't be Option of non-flat type
## What changes were proposed in this pull request? For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL doesn't allow the entire row to be null, only its columns can be null. That's the reason we forbid users to use top level null objects in #13469 However, if users wrap non-flat type with `Option`, then we may still encoder top level null object to row, which is not allowed. This PR fixes this case, and suggests users to wrap their type with `Tuple1` if they do wanna top level null objects. ## How was this patch tested? new test Author: Wenchen Fan <wenchen@databricks.com> Closes #15979 from cloud-fan/option.
1 parent 60022bf commit f135b70

4 files changed

Lines changed: 37 additions & 5 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,19 @@ object ScalaReflection extends ScalaReflection {
605605

606606
}
607607

608+
/**
609+
* Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
610+
* we also treat [[DefinedByConstructorParams]] as product type.
611+
*/
612+
def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized {
613+
tpe match {
614+
case t if t <:< localTypeOf[Option[_]] =>
615+
val TypeRef(_, _, Seq(optType)) = t
616+
definedByConstructorParams(optType)
617+
case _ => false
618+
}
619+
}
620+
608621
/**
609622
* Returns the parameter names and types for the primary constructor of this class.
610623
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,26 @@ object ExpressionEncoder {
4747
// We convert the not-serializable TypeTag into StructType and ClassTag.
4848
val mirror = typeTag[T].mirror
4949
val tpe = typeTag[T].tpe
50+
51+
if (ScalaReflection.optionOfProductType(tpe)) {
52+
throw new UnsupportedOperationException(
53+
"Cannot create encoder for Option of Product type, because Product type is represented " +
54+
"as a row, and the entire row can not be null in Spark SQL like normal databases. " +
55+
"You can wrap your type with Tuple1 if you do want top level null Product objects, " +
56+
"e.g. instead of creating `Dataset[Option[MyClass]]`, you can do something like " +
57+
"`val ds: Dataset[Tuple1[MyClass]] = Seq(Tuple1(MyClass(...)), Tuple1(null)).toDS`")
58+
}
59+
5060
val cls = mirror.runtimeClass(tpe)
5161
val flat = !ScalaReflection.definedByConstructorParams(tpe)
5262

5363
val inputObject = BoundReference(0, ScalaReflection.dataTypeFor[T], nullable = true)
5464
val nullSafeInput = if (flat) {
5565
inputObject
5666
} else {
57-
// For input object of non-flat type, we can't encode it to row if it's null, as Spark SQL
67+
// For input object of Product type, we can't encode it to row if it's null, as Spark SQL
5868
// doesn't allow top-level row to be null, only its columns can be null.
59-
AssertNotNull(inputObject, Seq("top level non-flat input object"))
69+
AssertNotNull(inputObject, Seq("top level Product input object"))
6070
}
6171
val serializer = ScalaReflection.serializerFor[T](nullSafeInput)
6272
val deserializer = ScalaReflection.deserializerFor[T]

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -867,10 +867,10 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
867867
checkDataset(Seq("a", null).toDS(), "a", null)
868868
}
869869

870-
test("Dataset should throw RuntimeException if non-flat input object is null") {
870+
test("Dataset should throw RuntimeException if top-level product input object is null") {
871871
val e = intercept[RuntimeException](Seq(ClassData("a", 1), null).toDS())
872872
assert(e.getMessage.contains("Null value appeared in non-nullable field"))
873-
assert(e.getMessage.contains("top level non-flat input object"))
873+
assert(e.getMessage.contains("top level Product input object"))
874874
}
875875

876876
test("dropDuplicates") {
@@ -1051,6 +1051,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
10511051
checkDataset(dsDouble, arrayDouble)
10521052
checkDataset(dsString, arrayString)
10531053
}
1054+
1055+
test("SPARK-18251: the type of Dataset can't be Option of Product type") {
1056+
checkDataset(Seq(Some(1), None).toDS(), Some(1), None)
1057+
1058+
val e = intercept[UnsupportedOperationException] {
1059+
Seq(Some(1 -> "a"), None).toDS()
1060+
}
1061+
assert(e.getMessage.contains("Cannot create encoder for Option of Product type"))
1062+
}
10541063
}
10551064

10561065
case class Generic[T](id: T, value: Double)

sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
143143
}
144144

145145
test("roundtrip in to_json and from_json") {
146-
val dfOne = Seq(Some(Tuple1(Tuple1(1))), None).toDF("struct")
146+
val dfOne = Seq(Tuple1(Tuple1(1)), Tuple1(null)).toDF("struct")
147147
val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType]
148148
val readBackOne = dfOne.select(to_json($"struct").as("json"))
149149
.select(from_json($"json", schemaOne).as("struct"))

0 commit comments

Comments
 (0)