Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.catalog

import java.net.URI
import java.util.Locale
import java.util.concurrent.Callable
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
Expand Down Expand Up @@ -126,13 +127,55 @@ class SessionCatalog(
}

/**
* A cache of qualified table names to table relation plans.
*/
* A cache of qualified table names to table relation plans.
* Accessing tableRelationCache directly is not recommended,
* since it will introduce exposures to guava libraries.
*/
val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to have been introduced in 2.2, can you make it private instead, and change all call sites?

val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
}

/**
* This method provides a way to get a cached plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "without exposing components to Guava" part of all these comments is unnecessary.

* without exposing components to Guava library.
*/
def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
tableRelationCache.get(t, c)
}

/**
* This method provides a way to get a cached plan if the key exists
* without exposing components to Guava library.
*/
def getCachedTableIfPresent(key: QualifiedTableName): LogicalPlan = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCachedTable instead. I'd even just use cachedTable but this class at least seems to use verbs more than the rest of the code.

tableRelationCache.getIfPresent(key)
}

/**
* This method provides a way to cache a plan
* without exposing components to Guava library.
*/
def putTableInCache(t: QualifiedTableName, l: LogicalPlan): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheTable

tableRelationCache.put(t, l)
}

/**
* This method provides a way to invalidate a cached plan
* without exposing components to Guava library.
*/
def invalidateCachedTable(key: QualifiedTableName): Unit = {
tableRelationCache.invalidate(key)
}

/**
* This method provides a way to invalidate all the cached plans
* without exposing components to Guava library.
*/
def invalidateAllCachedTables(): Unit = {
tableRelationCache.invalidateAll()
}

/**
* This method is used to make the given path qualified before we
* store this path in the underlying external catalog. So, when a path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
val table = r.tableMeta
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val cache = sparkSession.sessionState.catalog.tableRelationCache
val catalogProxy = sparkSession.sessionState.catalog

val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val dataSource =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
// these are def_s and not val/lazy val since the latter would introduce circular references
private def sessionState = sparkSession.sessionState
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
private def catalogProxy = sparkSession.sessionState.catalog
import HiveMetastoreCatalog._

/** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
Expand All @@ -61,7 +61,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val key = QualifiedTableName(
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
table.table.toLowerCase)
tableRelationCache.getIfPresent(key)
catalogProxy.getCachedTableIfPresent(key)
}

private def getCached(
Expand All @@ -71,7 +71,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
expectedFileFormat: Class[_ <: FileFormat],
partitionSchema: Option[StructType]): Option[LogicalRelation] = {

tableRelationCache.getIfPresent(tableIdentifier) match {
catalogProxy.getCachedTableIfPresent(tableIdentifier) match {
case null => None // Cache miss
case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
val cachedRelationFileFormatClass = relation.fileFormat.getClass
Expand All @@ -92,21 +92,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(logical)
} else {
// If the cached relation is not updated, we invalidate it right away.
tableRelationCache.invalidate(tableIdentifier)
catalogProxy.invalidateCachedTable(tableIdentifier)
None
}
case _ =>
logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
s"However, we are getting a ${relation.fileFormat} from the metastore cache. " +
"This cached entry will be invalidated.")
tableRelationCache.invalidate(tableIdentifier)
catalogProxy.invalidateCachedTable(tableIdentifier)
None
}
case other =>
logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " +
s"However, we are getting a $other from the metastore cache. " +
"This cached entry will be invalidated.")
tableRelationCache.invalidate(tableIdentifier)
catalogProxy.invalidateCachedTable(tableIdentifier)
None
}
}
Expand Down Expand Up @@ -176,7 +176,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormat = fileFormat,
options = options)(sparkSession = sparkSession)
val created = LogicalRelation(fsRelation, updatedTable)
tableRelationCache.put(tableIdentifier, created)
catalogProxy.putTableInCache(tableIdentifier, created)
created
}

Expand Down Expand Up @@ -205,7 +205,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
className = fileType).resolveRelation(),
table = updatedTable)

tableRelationCache.put(tableIdentifier, created)
catalogProxy.putTableInCache(tableIdentifier, created)
created
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class HiveSchemaInferenceSuite

override def afterEach(): Unit = {
super.afterEach()
spark.sessionState.catalog.tableRelationCache.invalidateAll()
spark.sessionState.catalog.invalidateAllCachedTables()
FileStatusCache.resetForTesting()
}

Expand Down