-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables #30403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
3e532c8
f4ee301
a0687b3
f36bc59
b3fe647
4b2fba0
f232eba
8c0140c
9085189
0bdfcee
fc8a913
7ee6eb0
a5923ab
f22159c
c0e4f3e
47dc974
5e7227b
20b2474
b33d807
4c2d5e2
3c4a0cf
7f5a0b2
d0f49ef
ed1a6db
4e0e82f
911927d
7e788ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -417,9 +417,9 @@ case class DropTable( | |
| } | ||
|
|
||
| /** | ||
| * The logical plan for handling non-existing table for DROP TABLE command. | ||
| * The logical plan for no-op command handling non-existing table. | ||
| */ | ||
| case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command | ||
| case class NoopCommand(commandName: String, multipartIdentifier: Seq[String]) extends Command | ||
|
|
||
| /** | ||
| * The logical plan of the ALTER TABLE command. | ||
|
|
@@ -670,3 +670,29 @@ case class LoadData( | |
| case class ShowCreateTable(child: LogicalPlan, asSerde: Boolean = false) extends Command { | ||
| override def children: Seq[LogicalPlan] = child :: Nil | ||
| } | ||
|
|
||
| /** | ||
| * The logical plan of the CACHE TABLE command. | ||
| */ | ||
| case class CacheTable( | ||
| child: LogicalPlan, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After more thought, I think CACHE TABLE is not a DDL command that needs to interact with catalogs, and it doesn't need a v2 version. The current problem is that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, since it's not resolving to catalogs, we should move it out of |
||
| isLazy: Boolean, | ||
| options: Map[String, String]) extends Command { | ||
| override def children: Seq[LogicalPlan] = child :: Nil | ||
| } | ||
|
|
||
| /** | ||
| * The logical plan of the CACHE TABLE ... AS SELECT command. | ||
| */ | ||
| case class CacheTableAsSelect( | ||
| tempViewName: String, | ||
| plan: LogicalPlan, | ||
| isLazy: Boolean, | ||
| options: Map[String, String]) extends Command | ||
|
|
||
| /** | ||
| * The logical plan of the UNCACHE TABLE command. | ||
| */ | ||
| case class UncacheTable(child: LogicalPlan, ifExists: Boolean) extends Command { | ||
| override def children: Seq[LogicalPlan] = child :: Nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -442,19 +442,15 @@ class ResolveSessionCatalog( | |
| ShowCreateTableCommand(ident.asTableIdentifier) | ||
| } | ||
|
|
||
| case CacheTableStatement(tbl, plan, isLazy, options) => | ||
| val name = if (plan.isDefined) { | ||
| // CACHE TABLE ... AS SELECT creates a temp view with the input query. | ||
| // Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name. | ||
| tbl | ||
| } else { | ||
| parseTempViewOrV1Table(tbl, "CACHE TABLE") | ||
| } | ||
| CacheTableCommand(name.asTableIdentifier, plan, isLazy, options) | ||
| // CACHE TABLE ... AS SELECT creates a temp view with the input query. | ||
|
||
| case CacheTableAsSelect(tempViewName, plan, isLazy, options) => | ||
| CacheTableCommand(TableIdentifier(tempViewName), Some(plan), isLazy, options) | ||
|
|
||
| case UncacheTableStatement(tbl, ifExists) => | ||
| val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE") | ||
| UncacheTableCommand(name.asTableIdentifier, ifExists) | ||
| case CacheTable(ResolvedV1TableOrViewIdentifier(ident), isLazy, options) => | ||
| CacheTableCommand(ident.asTableIdentifier, None, isLazy, options) | ||
|
|
||
| case UncacheTable(ResolvedV1TableOrViewIdentifier(ident), ifExists) => | ||
| UncacheTableCommand(ident.asTableIdentifier, ifExists) | ||
|
|
||
| case TruncateTableStatement(tbl, partitionSpec) => | ||
| val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE") | ||
|
|
@@ -570,12 +566,9 @@ class ResolveSessionCatalog( | |
| "SHOW VIEWS, only SessionCatalog supports this command.") | ||
| } | ||
|
|
||
| case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) => | ||
| case ShowTableProperties(ResolvedV1TableOrViewIdentifier(ident), propertyKey) => | ||
imback82 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey) | ||
|
|
||
| case ShowTableProperties(r: ResolvedView, propertyKey) => | ||
| ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) | ||
|
|
||
| case DescribeFunction(ResolvedFunc(identifier), extended) => | ||
| DescribeFunctionCommand(identifier.asFunctionIdentifier, extended) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import java.util.Locale | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
|
|
||
| object CacheTableUtils extends Logging { | ||
| def getStorageLevel(options: Map[String, String]): Option[String] = { | ||
| val storageLevelKey = "storagelevel" | ||
| val storageLevelValue = | ||
| CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT)) | ||
| val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) | ||
| if (withoutStorageLevel.nonEmpty) { | ||
| logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") | ||
| } | ||
| storageLevelValue | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,11 @@ | |
|
|
||
| package org.apache.spark.sql.execution.command | ||
|
|
||
| import java.util.Locale | ||
|
|
||
| import org.apache.spark.sql.{Dataset, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
| import org.apache.spark.sql.execution.CacheTableUtils | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
| case class CacheTableCommand( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the next thing we can do is to refactor it using the v2 framework (not adding a v2 version). The benefits are: 1. moving the logical plan to catalyst. 2. resolve the table in the analyzer. e.g.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, will do.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One issue I am encountering by moving to the v2 framework (for v2 tables) is the following. When
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, one solution is to follow |
||
|
|
@@ -41,17 +39,10 @@ case class CacheTableCommand( | |
| Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) | ||
| } | ||
|
|
||
| val storageLevelKey = "storagelevel" | ||
| val storageLevelValue = | ||
| CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT)) | ||
| val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) | ||
| if (withoutStorageLevel.nonEmpty) { | ||
| logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") | ||
| } | ||
|
|
||
| if (storageLevelValue.nonEmpty) { | ||
| val optStorageLevel = CacheTableUtils.getStorageLevel(options) | ||
| if (optStorageLevel.nonEmpty) { | ||
| sparkSession.catalog.cacheTable( | ||
| tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get)) | ||
| tableIdent.quotedString, StorageLevel.fromString(optStorageLevel.get)) | ||
| } else { | ||
| sparkSession.catalog.cacheTable(tableIdent.quotedString) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.v2 | ||
|
|
||
| import org.apache.spark.sql.{Dataset, SparkSession} | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} | ||
| import org.apache.spark.sql.execution.CacheTableUtils | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
| /** | ||
| * Physical plan node for caching a table. | ||
| */ | ||
| case class CacheTableExec( | ||
| session: SparkSession, | ||
imback82 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| catalog: TableCatalog, | ||
| table: Table, | ||
| ident: Identifier, | ||
| isLazy: Boolean, | ||
| options: Map[String, String]) extends V2CommandExec { | ||
| override def run(): Seq[InternalRow] = { | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper | ||
|
|
||
| val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) | ||
| val df = Dataset.ofRows(session, v2Relation) | ||
| val tableName = Some(ident.quoted) | ||
| val optStorageLevel = CacheTableUtils.getStorageLevel(options) | ||
| if (optStorageLevel.nonEmpty) { | ||
| session.sharedState.cacheManager.cacheQuery( | ||
| df, tableName, StorageLevel.fromString(optStorageLevel.get)) | ||
|
||
| } else { | ||
| session.sharedState.cacheManager.cacheQuery(df, tableName) | ||
| } | ||
|
|
||
| if (!isLazy) { | ||
| // Performs eager caching. | ||
| df.count() | ||
| } | ||
|
|
||
| Seq.empty | ||
| } | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty | ||
| } | ||
|
|
||
| /** | ||
| * Physical plan node for uncaching a table. | ||
| */ | ||
| case class UncacheTableExec( | ||
| session: SparkSession, | ||
| catalog: TableCatalog, | ||
| table: Table, | ||
| ident: Identifier) extends V2CommandExec { | ||
| override def run(): Seq[InternalRow] = { | ||
| val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) | ||
| val df = Dataset.ofRows(session, v2Relation) | ||
| session.sharedState.cacheManager.uncacheQuery(df, cascade = true) | ||
imback82 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
imback82 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Seq.empty | ||
| } | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.