Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2d762b4
plan exists subquery
AngersZhuuuu Nov 8, 2019
1c577bc
Update subquery.scala
AngersZhuuuu Nov 8, 2019
5fa971b
format import
AngersZhuuuu Nov 8, 2019
1401349
don;t collect executed rdd
AngersZhuuuu Nov 8, 2019
7b943aa
format code
AngersZhuuuu Nov 8, 2019
95e446d
Update predicates.scala
AngersZhuuuu Nov 10, 2019
20cda42
Update subquery.scala
AngersZhuuuu Nov 10, 2019
8e3ce4f
remove ExistsSubquery
AngersZhuuuu Nov 11, 2019
c290411
minimize cost
AngersZhuuuu Nov 11, 2019
866ddc7
follow comment
AngersZhuuuu Nov 11, 2019
3de0ecc
update import
AngersZhuuuu Nov 11, 2019
32f85c3
follow comment
AngersZhuuuu Nov 12, 2019
e47a757
Merge branch 'master' into SPARK-29800
AngersZhuuuu Nov 12, 2019
4c86605
remove broadcaset
AngersZhuuuu Nov 12, 2019
626e41f
Update subquery.scala
AngersZhuuuu Nov 13, 2019
ce76e0c
remove unused import
AngersZhuuuu Nov 13, 2019
4a4ca9b
Update subquery.scala
AngersZhuuuu Nov 13, 2019
88f804d
Merge branch 'master' into SPARK-29800
AngersZhuuuu Nov 21, 2019
7668bd6
ExistsSExec -> ExistsSubqueryExec
AngersZhuuuu Nov 25, 2019
a6b8485
Revert "Merge branch 'master' into SPARK-29800"
AngersZhuuuu Nov 25, 2019
34046be
follow comment
AngersZhuuuu Jan 2, 2020
4c6c04d
follow comment
AngersZhuuuu Jan 2, 2020
ac6a4d2
Update subquery.scala
AngersZhuuuu Jan 2, 2020
59162c6
Update finishAnalysis.scala
AngersZhuuuu Jan 2, 2020
89a1721
Update finishAnalysis.scala
AngersZhuuuu Jan 2, 2020
fb98b54
update
AngersZhuuuu Jan 2, 2020
67b4281
Update finishAnalysis.scala
AngersZhuuuu Jan 3, 2020
821ed40
Update finishAnalysis.scala
AngersZhuuuu Jan 3, 2020
e319fee
fix ut
AngersZhuuuu Jan 3, 2020
2c387f2
Update SubquerySuite.scala
AngersZhuuuu Jan 3, 2020
2aff8eb
Update SubquerySuite.scala
AngersZhuuuu Jan 3, 2020
2b7b417
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
88fcdbf
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
9f084ee
Merge branch 'master' into SPARK-29800
AngersZhuuuu Jan 4, 2020
8c6060a
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
9a9d9d1
fix comment error
AngersZhuuuu Jan 5, 2020
173942d
follow comment
AngersZhuuuu Jan 6, 2020
26258b0
Update finishAnalysis.scala
AngersZhuuuu Jan 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,50 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with
}
}

case class ExistsSubquery(child: Expression,
subQuery: String,
hset: Set[Any]) extends UnaryExpression with Predicate {

require(hset != null, "hset could not be null")

override def toString: String = s"Exists ${subQuery}"

override def nullable: Boolean = child.nullable

protected override def nullSafeEval(value: Any): Any = {
if (set.contains(value)) {
true
} else {
false
}
}

@transient lazy val set: Set[Any] = child.dataType match {
case t: AtomicType if !t.isInstanceOf[BinaryType] => hset
case _: NullType => hset
case _ =>
// for structs use interpreted ordering to be able to compare UnsafeRows with non-UnsafeRows
TreeSet.empty(TypeUtils.getInterpretedOrdering(child.dataType)) ++ (hset - null)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
genCodeWithSet(ctx, ev)
}

private def genCodeWithSet(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
val setTerm = ctx.addReferenceObj("set", set)
s"""
|${ev.value} = $setTerm.size() > 0;
""".stripMargin
})
}

override def sql: String = {
s"(EXISTS (${subQuery}))"
}
}

@ExpressionDescription(
usage = "expr1 _FUNC_ expr2 - Logical AND.")
case class And(left: Expression, right: Expression) extends BinaryOperator with Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, ExistsSubquery, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -171,6 +171,69 @@ case class InSubqueryExec(
}
}

/**
* The physical node of exists-subquery. This is for support use exists in join's on condition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just say The physical node for non-correlated EXISTS subquery. This is pretty general and not only for join conditions

* since some join type we can't pushdown exists condition, we plan it here
*/
case class ExistsExec(child: Expression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

case class A(
    para1: T,
    para2: T): R ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

case class A(
    para1: T,
    para2: T): R ...

Done

subQuery: String,
plan: BaseSubqueryExec,
exprId: ExprId,
private var resultBroadcast: Broadcast[Array[Any]] = null)
extends ExecSubqueryExpression {

@transient private var result: Array[Any] = _

override def dataType: DataType = BooleanType
override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = child.nullable
override def toString: String = s"EXISTS ${plan.name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be EXISTS (${plan.simpleString}), to follow org.apache.spark.sql.execution.ScalarSubquery

override def withNewPlan(plan: BaseSubqueryExec): ExistsExec = copy(plan = plan)

override def semanticEquals(other: Expression): Boolean = other match {
case in: ExistsExec => child.semanticEquals(in.child) && plan.sameResult(in.plan)
case _ => false
}


def updateResult(): Unit = {
val rows = plan.executeCollect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we don't have a physical plan for Exists is: it's not robust. Collecting the entire result of a query plan at the driver side is very likely to hit OOM. That's why we have to convert Exists to a join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we don't have a physical plan for Exists is: it's not robust. Collecting the entire result of a query plan at the driver side is very likely to hit OOM. That's why we have to convert Exists to a join.

We can make it just return rdd.isEmpy() since exists just need to judge if result is empty.

result = child.dataType match {
case _: StructType => rows.toArray
case _ => rows.map(_.get(0, child.dataType))
}
resultBroadcast = plan.sqlContext.sparkContext.broadcast(result)
}

def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value)

private def prepareResult(): Unit = {
require(resultBroadcast != null, s"$this has not finished")
if (result == null) {
result = resultBroadcast.value
}
}

override def eval(input: InternalRow): Any = {
prepareResult()
!result.isEmpty
}

override lazy val canonicalized: ExistsExec = {
copy(
child = child.canonicalized,
subQuery = subQuery,
plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec],
exprId = ExprId(0),
resultBroadcast = null)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can we follow org.apache.spark.sql.execution.ScalarSubquery? The codegen is well implemented there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as well as eval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can we follow org.apache.spark.sql.execution.ScalarSubquery? The codegen is well implemented there.

Done

prepareResult()
ExistsSubquery(child, subQuery, result.toSet).doGenCode(ctx, ev)
}
}

/**
* Plans subqueries that are present in the given [[SparkPlan]].
*/
Expand All @@ -194,6 +257,19 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
}
val executedPlan = new QueryExecution(sparkSession, query).executedPlan
InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
case expressions.Exists(sub, children, exprId) =>
val expr = if (children.length == 1) {
children.head
} else {
CreateNamedStruct(
children.zipWithIndex.flatMap { case (v, index) =>
Seq(Literal(s"col_$index"), v)
}
)
}
val executedPlan = new QueryExecution(sparkSession, sub).executedPlan
ExistsExec(expr, sub.treeString,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ExistsExec needs a expr parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ExistsExec needs a expr parameter?

Remove expr and subQuery

SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
}
}
}
Expand Down