Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2d762b4
plan exists subquery
AngersZhuuuu Nov 8, 2019
1c577bc
Update subquery.scala
AngersZhuuuu Nov 8, 2019
5fa971b
format import
AngersZhuuuu Nov 8, 2019
1401349
don;t collect executed rdd
AngersZhuuuu Nov 8, 2019
7b943aa
format code
AngersZhuuuu Nov 8, 2019
95e446d
Update predicates.scala
AngersZhuuuu Nov 10, 2019
20cda42
Update subquery.scala
AngersZhuuuu Nov 10, 2019
8e3ce4f
remove ExistsSubquery
AngersZhuuuu Nov 11, 2019
c290411
minimize cost
AngersZhuuuu Nov 11, 2019
866ddc7
follow comment
AngersZhuuuu Nov 11, 2019
3de0ecc
update import
AngersZhuuuu Nov 11, 2019
32f85c3
follow comment
AngersZhuuuu Nov 12, 2019
e47a757
Merge branch 'master' into SPARK-29800
AngersZhuuuu Nov 12, 2019
4c86605
remove broadcaset
AngersZhuuuu Nov 12, 2019
626e41f
Update subquery.scala
AngersZhuuuu Nov 13, 2019
ce76e0c
remove unused import
AngersZhuuuu Nov 13, 2019
4a4ca9b
Update subquery.scala
AngersZhuuuu Nov 13, 2019
88f804d
Merge branch 'master' into SPARK-29800
AngersZhuuuu Nov 21, 2019
7668bd6
ExistsSExec -> ExistsSubqueryExec
AngersZhuuuu Nov 25, 2019
a6b8485
Revert "Merge branch 'master' into SPARK-29800"
AngersZhuuuu Nov 25, 2019
34046be
follow comment
AngersZhuuuu Jan 2, 2020
4c6c04d
follow comment
AngersZhuuuu Jan 2, 2020
ac6a4d2
Update subquery.scala
AngersZhuuuu Jan 2, 2020
59162c6
Update finishAnalysis.scala
AngersZhuuuu Jan 2, 2020
89a1721
Update finishAnalysis.scala
AngersZhuuuu Jan 2, 2020
fb98b54
update
AngersZhuuuu Jan 2, 2020
67b4281
Update finishAnalysis.scala
AngersZhuuuu Jan 3, 2020
821ed40
Update finishAnalysis.scala
AngersZhuuuu Jan 3, 2020
e319fee
fix ut
AngersZhuuuu Jan 3, 2020
2c387f2
Update SubquerySuite.scala
AngersZhuuuu Jan 3, 2020
2aff8eb
Update SubquerySuite.scala
AngersZhuuuu Jan 3, 2020
2b7b417
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
88fcdbf
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
9f084ee
Merge branch 'master' into SPARK-29800
AngersZhuuuu Jan 4, 2020
8c6060a
Update CachedTableSuite.scala
AngersZhuuuu Jan 4, 2020
9a9d9d1
fix comment error
AngersZhuuuu Jan 5, 2020
173942d
follow comment
AngersZhuuuu Jan 6, 2020
26258b0
Update finishAnalysis.scala
AngersZhuuuu Jan 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,36 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with
}
}

case class ExistsSubquery(child: Expression,
subQuery: String,
result: Boolean) extends UnaryExpression with Predicate {

override def toString: String = s"Exists ${subQuery}"

override def nullable: Boolean = child.nullable

protected override def nullSafeEval(value: Any): Any = {
true
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
genCodeWithSet(ctx, ev)
}

private def genCodeWithSet(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
val setTerm = ctx.addReferenceObj("result", result)
s"""
|${ev.value} = $setTerm;
""".stripMargin
})
}

override def sql: String = {
s"(EXISTS (${subQuery}))"
}
}

@ExpressionDescription(
usage = "expr1 _FUNC_ expr2 - Logical AND.")
case class And(left: Expression, right: Expression) extends BinaryOperator with Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,20 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {

// Filter the plan by applying left semi and left anti joins.
withSubquery.foldLeft(newFilter) {
case (p, Exists(sub, conditions, _)) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, sub, LeftSemi, joinCond)
case (p, Not(Exists(sub, conditions, _))) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, sub, LeftAnti, joinCond)
case (p, exists @ Exists(sub, conditions, _)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change the beginning instead?

val (withSubquery, withoutSubquery) = splitConjunctivePredicates(condition).partition { cond =>
  SubqueryExpression.hasInOrExistsSubquery(cond) && !isNonCorrelatedExists
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we change hasInOrExistsSubquery to hasInOrCorrelatedExistsSubquery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasInOrCorrelatedExistsSubquery

Done

if (SubqueryExpression.hasCorrelatedSubquery(exists)) {
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, sub, LeftSemi, joinCond)
} else {
Filter(exists, newFilter)
}
case (p, Not(exists @ Exists(sub, conditions, _))) =>
if (SubqueryExpression.hasCorrelatedSubquery(exists)) {
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
buildJoin(outerPlan, sub, LeftAnti, joinCond)
} else {
Filter(Not(exists), newFilter)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @dilipbiswal
Change here to support non-correct exists subquery run like mentioned in this comment https://github.com/apache/spark/pull/26437/files#r344203937

Copy link
Contributor

@dilipbiswal dilipbiswal Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu I discussed this with Wenchen briefly. Do you think we can safely inject a "LIMIT 1" into our subplan to expedite its execution ? Pl. lets us know what you think ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu I discussed this with Wenchen briefly. Do you think we can safely inject a "LIMIT 1" into our subplan to expedite its execution ? Pl. lets us know what you think ?

I am also thinking about reduce the execution cost of this sub query.
LIMIT 1 is ok .
My direction is making this execution like Spark Thrift Server's incremental collect.
Only execute one partition.

Discuss these two ways safety and cost?

case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) =>
// Deduplicate conflicting attributes if any.
val newSub = dedupSubqueryOnSelfJoin(p, sub, Some(values))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, ExistsSubquery, Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -171,6 +171,63 @@ case class InSubqueryExec(
}
}

/**
* The physical node of exists-subquery. This is for support use exists in join's on condition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just say The physical node for non-correlated EXISTS subquery. This is pretty general and not only for join conditions

* since some join type we can't pushdown exists condition, we plan it here
*/
case class ExistsExec(child: Expression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

case class A(
    para1: T,
    para2: T): R ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

case class A(
    para1: T,
    para2: T): R ...

Done

subQuery: String,
plan: BaseSubqueryExec,
exprId: ExprId,
private var resultBroadcast: Broadcast[Boolean] = null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need broadcast? Can we follow the physical scalar subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need broadcast? Can we follow the physical scalar subquery?

Make it can be used by each partition, reduce return data size during compute or return result

extends ExecSubqueryExpression {

@transient private var result: Boolean = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should define the type as Option[Boolean]. The default value for boolean is false, not null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value for boolean is false,

Forgot this point, can explain strange things I meet.


override def dataType: DataType = BooleanType
override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = child.nullable
override def toString: String = s"EXISTS ${plan.name}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be EXISTS (${plan.simpleString}), to follow org.apache.spark.sql.execution.ScalarSubquery

override def withNewPlan(plan: BaseSubqueryExec): ExistsExec = copy(plan = plan)

override def semanticEquals(other: Expression): Boolean = other match {
case in: ExistsExec => child.semanticEquals(in.child) && plan.sameResult(in.plan)
case _ => false
}


def updateResult(): Unit = {
result = !plan.execute().isEmpty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this is better to execute a non-correlated EXISTS subquery. Maybe we should update RewritePredicateSubquery to only handle correlated EXISTS subquery. @dilipbiswal what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this is better to execute a non-correlated EXISTS subquery. Maybe we should update RewritePredicateSubquery to only handle correlated EXISTS subquery. @dilipbiswal what do you think?

Yeah, wait for his advise.

Copy link
Contributor

@dilipbiswal dilipbiswal Nov 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @AngersZhuuuu Thanks for pinging me. Just for me to understand, since we refer to another pr in this pr.

So we are considering planning the Subqueries appearing inside ON clause as a Join, right ?

Assuming above, so if the query was :

SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND T1.C1 EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1)

We are considering to plan it as :

(T1 LeftSemi T3 ON T1.C1 = T3.C1) Join T2 ON T1.C1 = T2.C2

This Looks okay to me for inner joins. I am just not sure about outer joins.. What do you think Wenchen ?

Now, coming to the non-correlated subqueries, if we keep it as a PlanExpression and execute it, one thing we have to see is "what is the join strategy thats being picked". Its always going to be broadcast nested loop as it won't be a "equi-join" ? right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal

SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND T1.C1 EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1)

Is not correct .

You mean below ?

SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1)

For this type sql we need to change RewritePredicateSubquery as cloud-fan said.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AngersZhuuuu You r right. Sorry.. i had written it as IN initially and forgot to adjust to exists :-)

Yeah, we need to change RewritePredicateSubquery which handles correlated subquery rewrites. The only thing i am not sure is about the outer joins.

Copy link
Contributor Author

@AngersZhuuuu AngersZhuuuu Nov 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we need to change RewritePredicateSubquery which handles correlated subquery rewrites. The only thing i am not sure is about the outer joins.

Yes, outer join is complex, if we do this, we need to add end to end test case cover each case to make sure the final plans are as expected.

resultBroadcast = plan.sqlContext.sparkContext.broadcast[Boolean](result)
}

def values(): Option[Boolean] = Option(resultBroadcast).map(_.value)

private def prepareResult(): Unit = {
require(resultBroadcast != null, s"$this has not finished")
result = resultBroadcast.value
}

override def eval(input: InternalRow): Any = {
prepareResult()
result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be result.get

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be result.get

Yea..updated.

}

override lazy val canonicalized: ExistsExec = {
copy(
child = child.canonicalized,
subQuery = subQuery,
plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec],
exprId = ExprId(0),
resultBroadcast = null)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can we follow org.apache.spark.sql.execution.ScalarSubquery? The codegen is well implemented there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as well as eval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, can we follow org.apache.spark.sql.execution.ScalarSubquery? The codegen is well implemented there.

Done

prepareResult()
ExistsSubquery(child, subQuery, result).doGenCode(ctx, ev)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we create ExistsSubquery to only do codegen? can we put the codegen logic in ExistsExec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we create ExistsSubquery to only do codegen? can we put the codegen logic in ExistsExec?

There are conflicts between ExecSubqueryExpression and UnaryExpression,
They are both abstract class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to extend UnaryExpression and we can still implement codegen, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to extend UnaryExpression and we can still implement codegen, right?

Done

}
}

/**
* Plans subqueries that are present in the given [[SparkPlan]].
*/
Expand All @@ -194,6 +251,19 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
}
val executedPlan = new QueryExecution(sparkSession, query).executedPlan
InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
case expressions.Exists(sub, children, exprId) =>
val expr = if (children.length == 1) {
children.head
} else {
CreateNamedStruct(
children.zipWithIndex.flatMap { case (v, index) =>
Seq(Literal(s"col_$index"), v)
}
)
}
val executedPlan = new QueryExecution(sparkSession, sub).executedPlan
ExistsExec(expr, sub.treeString,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ExistsExec needs a expr parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ExistsExec needs a expr parameter?

Remove expr and subQuery

SubqueryExec(s"subquery#${exprId.id}", executedPlan), exprId)
}
}
}
Expand Down