Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.columnar.InMemoryRelation
Expand Down Expand Up @@ -131,6 +132,12 @@ class CacheManager extends Logging {

/** Replaces segments of the given logical plan with cached versions where possible. */
def useCachedData(plan: LogicalPlan): LogicalPlan = {
useCachedDataInternal(plan) transformAllExpressions {
case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
}
}

private def useCachedDataInternal(plan: LogicalPlan): LogicalPlan = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rethinking about it, we do not need to add a new function. We can combine them into a single function, like:

  /** Replaces segments of the given logical plan with cached versions where possible. */
  def useCachedData(plan: LogicalPlan): LogicalPlan = {
    val newPlan = plan transformDown {
      case currentFragment =>
        lookupCachedData(currentFragment)
          .map(_.cachedRepresentation.withOutput(currentFragment.output))
          .getOrElse(currentFragment)
    }
    
    newPlan transformAllExpressions {
      case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thank you very much. I have addressed your comments.

plan transformDown {
case currentFragment =>
lookupCachedData(currentFragment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.language.postfixOps
import org.scalatest.concurrent.Eventually._

import org.apache.spark.CleanerListener
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
Expand Down Expand Up @@ -565,4 +567,82 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
case i: InMemoryRelation => i
}.size == 1)
}

test("SPARK-19093 Caching in side subquery") {
withTempView("t1") {
Seq(1).toDF("c1").createOrReplaceTempView("t1")
spark.catalog.cacheTable("t1")
val cachedPlan =
sql(
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t1)
""".stripMargin).queryExecution.optimizedPlan
assert(
cachedPlan.collect {
case i: InMemoryRelation => i
}.size == 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

spark.catalog.uncacheTable("t1")
}
}

test("SPARK-19093 scalar and nested predicate query") {
def getCachedPlans(plan: LogicalPlan): Seq[LogicalPlan] = {
plan collect {
case i: InMemoryRelation => i
}
}
withTempView("t1", "t2", "t3", "t4") {
Seq(1).toDF("c1").createOrReplaceTempView("t1")
Seq(2).toDF("c1").createOrReplaceTempView("t2")
Seq(1).toDF("c1").createOrReplaceTempView("t3")
Seq(1).toDF("c1").createOrReplaceTempView("t4")
spark.catalog.cacheTable("t1")
spark.catalog.cacheTable("t2")
spark.catalog.cacheTable("t3")
spark.catalog.cacheTable("t4")

// Nested predicate subquery
val cachedPlan =
sql(
"""
|SELECT * FROM t1
|WHERE
|c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
""".stripMargin).queryExecution.optimizedPlan

assert(
cachedPlan.collect {
case i: InMemoryRelation => i
}.size == 3)
Copy link
Member

@gatorsmile gatorsmile Jan 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, this can be simplified to

assert (getNumInMemoryRelations(cachedPlan) == 3)


// Scalar subquery and predicate subquery
val cachedPlan2 =
sql(
"""
|SELECT * FROM (SELECT max(c1) FROM t1 GROUP BY c1)
|WHERE
|c1 = (SELECT max(c1) FROM t2 GROUP BY c1)
|OR
|EXISTS (SELECT c1 FROM t3)
|OR
|c1 IN (SELECT c1 FROM t4)
""".stripMargin).queryExecution.optimizedPlan


val cachedRelations = scala.collection.mutable.MutableList.empty[Seq[LogicalPlan]]
cachedRelations += getCachedPlans(cachedPlan2)
cachedPlan2 transformAllExpressions {
case e: SubqueryExpression => cachedRelations += getCachedPlans(e.plan)
e
}
assert(cachedRelations.flatten.size == 4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, this can be simplified to

assert (getNumInMemoryRelations(cachedPlan2) == 4)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Thanks... I will make the change


spark.catalog.uncacheTable("t1")
spark.catalog.uncacheTable("t2")
spark.catalog.uncacheTable("t3")
spark.catalog.uncacheTable("t4")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can call clearCache() and then no need to uncache each table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  override def afterEach(): Unit = {
    try {
      clearCache()
    } finally {
      super.afterEach()
    }
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this? @dilipbiswal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile sorry.. missed this one .. Will make the change.

}
}
}