Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e5ebdad
[SPARK-23922][SQL] Add arrays_overlap function
mgaido91 Apr 10, 2018
682bc73
fix python style
mgaido91 Apr 10, 2018
876cd93
Merge branch 'master' of github.com:apache/spark into SPARK-23922
mgaido91 Apr 17, 2018
88e09b3
review comments
mgaido91 Apr 20, 2018
c895707
Merge branch 'master' of github.com:apache/spark into SPARK-23922
mgaido91 Apr 20, 2018
65b7d6d
introduce BinaryArrayExpressionWithImplicitCast
mgaido91 Apr 27, 2018
f9a1ecf
Merge branch 'master' of github.com:apache/spark into SPARK-23922
mgaido91 Apr 27, 2018
1dbcd0c
fix type check
mgaido91 Apr 27, 2018
076fc69
fix scalastyle
mgaido91 Apr 27, 2018
eafca0f
fix build error
mgaido91 Apr 27, 2018
5925104
fix
mgaido91 Apr 27, 2018
2a1121c
address comments
mgaido91 May 3, 2018
bf81e4a
use sets instead of nested loops
mgaido91 May 3, 2018
4a18ba8
address review comments
mgaido91 May 4, 2018
566946a
address review comments
mgaido91 May 4, 2018
710433e
add test case for null
mgaido91 May 4, 2018
3cf410a
Merge branch 'master' of github.com:apache/spark into SPARK-23922
mgaido91 May 7, 2018
9d086f9
address comments
mgaido91 May 7, 2018
964f7af
use findTightestCommonType for type inference
mgaido91 May 8, 2018
41ef6c6
support binary and complex data types
mgaido91 May 9, 2018
3dd724b
review comments
mgaido91 May 11, 2018
f7089f5
Merge branch 'master' of github.com:apache/spark into SPARK-23922
mgaido91 May 11, 2018
e36a5d7
fix compilation error
mgaido91 May 11, 2018
49d9372
address comments
mgaido91 May 14, 2018
227437b
address comment
mgaido91 May 15, 2018
2e9e024
fix null handling with complex types
mgaido91 May 16, 2018
92730a1
Merge branch 'master' of github.com:apache/spark into SPARK-23922
mgaido91 May 17, 2018
56c59ae
fix build
mgaido91 May 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,20 @@ def array_contains(col, value):
return Column(sc._jvm.functions.array_contains(_to_java_column(col), value))


@since(2.4)
def arrays_overlap(a1, a2):
"""
Collection function: returns true if the arrays contain any common non-null element; if not,
returns null if any of the arrays contains a null element and false otherwise.

>>> df = spark.createDataFrame([(["a", "b"], ["b", "c"]), (["a"], ["b", "c"])], ['x', 'y'])
>>> df.select(arrays_overlap(df.x, df.y).alias("overlap")).collect()
[Row(overlap=True), Row(overlap=False)]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.arrays_overlap(_to_java_column(a1), _to_java_column(a2)))


@since(1.4)
def explode(col):
"""Returns a new row for each element in the given array or map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ object FunctionRegistry {
// collection functions
expression[CreateArray]("array"),
expression[ArrayContains]("array_contains"),
expression[ArraysOverlap]("arrays_overlap"),
expression[CreateMap]("map"),
expression[CreateNamedStruct]("named_struct"),
expression[MapKeys]("map_keys"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,114 @@ case class ArrayContains(left: Expression, right: Expression)
override def prettyName: String = "array_contains"
}

/**
* Checks if the two arrays contain at least one common element.
*/
@ExpressionDescription(
usage = "_FUNC_(a1, a2) - Returns true if a1 contains at least an element present also in a2.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a note for null handling?

examples = """
Examples:
> SELECT _FUNC_(array(1, 2, 3), array(3, 4, 5));
true
""", since = "2.4.0")
case class ArraysOverlap(left: Expression, right: Expression)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you override prettyName to a value following the conventions?

override def prettyName: String = "arrays_overlap"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks!

extends BinaryExpression with ImplicitCastInputTypes {

private lazy val elementType = inputTypes.head.asInstanceOf[ArrayType].elementType

override def dataType: DataType = BooleanType

override def inputTypes: Seq[AbstractDataType] = left.dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seq(ArrayType, ArrayType)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because in that way we would loose the information about the elementType

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are similar functions, such as array_union(#21061), array_intersect(#21102), array_except(#21103), and maybe concat(#20858) which is slightly different though, to handle two (or more) arrays with the same element type.
I think we should use the same way to specify and check input types.

I'd like to discuss the best way for it here or somewhere else.
cc @kiszk @mn-mikke Do you have any suggestions?
Also cc @gatorsmile @cloud-fan

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If common functions for a method, which accepts two array with the same type, are provided as trait, it would be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can create a new trait, which will first make sure all its children are array type, and then make sure all its children are same type after implicit type cast(make sure other databases also do implicit type cast for these functions).

Then update TypeCoercion rule to handle this trait.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea if implicit type cast is not allowed for these functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick note, there is a dedicated type coercion rule for concat functions. So if there was the trait you described, we could remove the rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mn-mikke I am not sure, since it is quite a strange case, since it allows also string and byte. I am not sure we can do this with implicit type cast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Sorry, I should have been more explicit. I've been referring to the below case that I added into FunctionArgumentConversion due to enabling type coercion of array types.

case c @ Concat(children) if children.forall(c => ArrayType.acceptsType(c.dataType)) &&
  !haveSameType(children) =>
  val types = children.map(_.dataType)
  findWiderCommonType(types) match {
    case Some(finalDataType) => Concat(children.map(Cast(_, finalDataType)))
    case None => c
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implicit type cast is allowed in Presto. I am pushing here a proposal of trait, let me know what you think about it. Thanks.

case la: ArrayType if la.sameType(right.dataType) =>
Seq(la, la)
case _ => Seq.empty
}

override def checkInputDataTypes(): TypeCheckResult = {
if (!left.dataType.isInstanceOf[ArrayType] || !right.dataType.isInstanceOf[ArrayType] ||
!left.dataType.sameType(right.dataType)) {
TypeCheckResult.TypeCheckFailure("Arguments must be arrays with the same element type.")
} else {
TypeCheckResult.TypeCheckSuccess
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

}

override def nullable: Boolean = {
left.nullable || right.nullable || left.dataType.asInstanceOf[ArrayType].containsNull ||
right.dataType.asInstanceOf[ArrayType].containsNull
}

override def nullSafeEval(a1: Any, a2: Any): Any = {
var hasNull = false
val arr1 = a1.asInstanceOf[ArrayData]
val arr2 = a2.asInstanceOf[ArrayData]
if (arr1.numElements() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also compare the numElements here and check the array is empty for the smaller one? Otherwise the result is different if the arr1 is not empty and contains null and arr2 is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I added also some test cases for this

arr1.foreach(elementType, (_, v1) =>
if (v1 == null) {
hasNull = true
} else {
arr2.foreach(elementType, (_, v2) =>
if (v2 == null) {
hasNull = true
} else if (v1 == v2) {
return true
}
)
}
)
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip if the right is containsNull == false?

arr2.foreach(elementType, (_, v) =>
if (v == null) {
return null
}
)
}
if (hasNull) {
null
} else {
false
}
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (a1, a2) => {
val i1 = ctx.freshName("i")
val i2 = ctx.freshName("i")
val getValue1 = CodeGenerator.getValue(a1, elementType, i1)
val getValue2 = CodeGenerator.getValue(a2, elementType, i2)
s"""
|if ($a1.numElements() > 0) {
| for (int $i1 = 0; $i1 < $a1.numElements(); $i1 ++) {
| if ($a1.isNullAt($i1)) {
| ${ev.isNull} = true;
| } else {
| for (int $i2 = 0; $i2 < $a2.numElements(); $i2 ++) {
| if ($a2.isNullAt($i2)) {
| ${ev.isNull} = true;
| } else if (${ctx.genEqual(elementType, getValue1, getValue2)}) {
| ${ev.isNull} = false;
| ${ev.value} = true;
| break;
| }
| }
| if (${ev.value}) {
| break;
| }
| }
| }
|} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

| for (int $i2 = 0; $i2 < $a2.numElements(); $i2 ++) {
| if ($a2.isNullAt($i2)) {
| ${ev.isNull} = true;
| break;
| }
| }
|}
|""".stripMargin
})
}
}

/**
* Returns the minimum value in the array.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,30 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
checkEvaluation(ArrayContains(a3, Literal.create(null, StringType)), null)
}

test("ArraysOverlap") {
val a0 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType))
val a1 = Literal.create(Seq(4, 5, 3), ArrayType(IntegerType))
val a2 = Literal.create(Seq(null, 5, 6), ArrayType(IntegerType))
val a3 = Literal.create(Seq(7, 8), ArrayType(IntegerType))
val a4 = Literal.create(Seq.empty[Int], ArrayType(IntegerType))

val a5 = Literal.create(Seq[String](null, ""), ArrayType(StringType))
val a6 = Literal.create(Seq[String]("", "abc"), ArrayType(StringType))
val a7 = Literal.create(Seq[String]("def", "ghi"), ArrayType(StringType))

checkEvaluation(ArraysOverlap(a0, a1), true)
checkEvaluation(ArraysOverlap(a0, a2), null)
checkEvaluation(ArraysOverlap(a1, a2), true)
checkEvaluation(ArraysOverlap(a1, a3), false)
checkEvaluation(ArraysOverlap(a0, a4), false)
checkEvaluation(ArraysOverlap(a2, a4), null)
checkEvaluation(ArraysOverlap(a4, a2), null)

checkEvaluation(ArraysOverlap(a5, a6), true)
checkEvaluation(ArraysOverlap(a5, a7), null)
checkEvaluation(ArraysOverlap(a6, a7), false)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add cases for one of the two arguments is null and ArraysOverlap(Seq(null), Seq(null))?


test("Array Min") {
checkEvaluation(ArrayMin(Literal.create(Seq(-11, 10, 2), ArrayType(IntegerType))), -11)
checkEvaluation(
Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3046,6 +3046,16 @@ object functions {
ArrayContains(column.expr, Literal(value))
}

/**
* Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and
* any of the arrays contains a `null`, it returns `null`. It returns `false` otherwise.
* @group collection_funcs
* @since 2.4.0
*/
def arrays_overlap(a1: Column, a2: Column): Column = withExpr {
ArraysOverlap(a1.expr, a2.expr)
}

/**
* Creates a new row for each element in the given array or map column.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,23 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
)
}

test("arrays_overlap function") {
val df = Seq(
(Seq[Option[Int]](Some(1), Some(2)), Seq[Option[Int]](Some(-1), Some(10))),
(Seq.empty[Option[Int]], Seq[Option[Int]](Some(-1), None)),
(Seq[Option[Int]](Some(3), Some(2)), Seq[Option[Int]](Some(1), Some(2)))
).toDF("a", "b")

val answer = Seq(Row(false), Row(null), Row(true))

checkAnswer(df.select(arrays_overlap(df("a"), df("b"))), answer)
checkAnswer(df.selectExpr("arrays_overlap(a, b)"), answer)

intercept[AnalysisException] {
df.selectExpr("arrays_overlap(array(1, 2, 3), array('a', 'b', 'c'))")
}
}

test("array_min function") {
val df = Seq(
Seq[Option[Int]](Some(1), Some(3), Some(2)),
Expand Down