Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ class CacheManager extends Logging {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
val qe = sparkSession.sessionState.executePlan(planToCache)
val inMemoryRelation = InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
qe.executedPlan,
tableName,
planToCache)
optimizedPlan = qe.optimizedPlan)
this.synchronized {
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Data has already been cached.")
Expand Down Expand Up @@ -184,10 +185,10 @@ class CacheManager extends Logging {
}
needToRecache.map { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
val plan = spark.sessionState.executePlan(cd.plan).executedPlan
val qe = spark.sessionState.executePlan(cd.plan)
val newCache = InMemoryRelation(
cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = plan),
logicalPlan = cd.plan)
cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = qe.executedPlan),
optimizedPlan = qe.optimizedPlan)
val recomputedPlan = cd.copy(cachedRepresentation = newCache)
this.synchronized {
if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,17 @@ object InMemoryRelation {
storageLevel: StorageLevel,
child: SparkPlan,
tableName: Option[String],
logicalPlan: LogicalPlan): InMemoryRelation = {
optimizedPlan: LogicalPlan): InMemoryRelation = {
val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)
val relation = new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)
relation.statsOfPlanToCache = logicalPlan.stats
val relation = new InMemoryRelation(child.output, cacheBuilder, optimizedPlan.outputOrdering)
relation.statsOfPlanToCache = optimizedPlan.stats
relation
}

def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = {
def apply(cacheBuilder: CachedRDDBuilder, optimizedPlan: LogicalPlan): InMemoryRelation = {
val relation = new InMemoryRelation(
cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)
relation.statsOfPlanToCache = logicalPlan.stats
cacheBuilder.cachedPlan.output, cacheBuilder, optimizedPlan.outputOrdering)
relation.statsOfPlanToCache = optimizedPlan.stats
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the stats of analyzed plan is the same as the optimized plan, please hold this PR until they become different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can be different if we do DS v2 operator pushdown in the optimizer, but AFAIK it's not done yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stats already differ in the v1 path because partition pruning happens in the optimizer for data source tables (PrunedInMemoryFileIndex).

I see no reason not to get this in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that's a good point, I agree with it. @jzhuge can we write a test using file source partition pruning? I'm not comfortable merging a fix without tests. The change itself LGTM.

relation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,24 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
assert(df2LimitInnerPlan.isDefined &&
df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
}

test("SPARK-27739 Save stats from optimized plan") {
withTable("a") {
spark.range(4)
.selectExpr("id", "id % 2 AS p")
.write
.partitionBy("p")
.saveAsTable("a")

val df = sql("SELECT * FROM a WHERE p = 0")
df.cache()
df.count()
df.queryExecution.withCachedData match {
case i: InMemoryRelation =>
// Optimized plan has non-default size in bytes
assert(i.statsOfPlanToCache.sizeInBytes !==
df.sparkSession.sessionState.conf.defaultSizeInBytes)
}
}
}
}