Skip to content

Commit d8ea7b6

Browse files
committed
1 parent 0ba3ab4 commit d8ea7b6

File tree

4 files changed

+48
-26
lines changed

4 files changed

+48
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, Na
2525
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.util.toPrettySQL
28-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TableChange}
28+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TableChange}
2929
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
3030
import org.apache.spark.sql.connector.write.V1Write
3131
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -81,6 +81,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
8181
}
8282
}
8383

84+
private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = {
85+
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
86+
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
87+
}
88+
8489
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
8590
case PhysicalOperation(project, filters,
8691
relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) =>
@@ -164,10 +169,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
164169
catalog match {
165170
case staging: StagingTableCatalog =>
166171
AtomicReplaceTableExec(
167-
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
172+
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate,
173+
invalidateCache) :: Nil
168174
case _ =>
169175
ReplaceTableExec(
170-
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
176+
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate,
177+
invalidateCache) :: Nil
171178
}
172179

173180
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
@@ -176,26 +183,26 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
176183
catalog match {
177184
case staging: StagingTableCatalog =>
178185
AtomicReplaceTableAsSelectExec(
179-
session,
180186
staging,
181187
ident,
182188
parts,
183189
query,
184190
planLater(query),
185191
propsWithOwner,
186192
writeOptions,
187-
orCreate = orCreate) :: Nil
193+
orCreate = orCreate,
194+
invalidateCache) :: Nil
188195
case _ =>
189196
ReplaceTableAsSelectExec(
190-
session,
191197
catalog,
192198
ident,
193199
parts,
194200
query,
195201
planLater(query),
196202
propsWithOwner,
197203
writeOptions,
198-
orCreate = orCreate) :: Nil
204+
orCreate = orCreate,
205+
invalidateCache) :: Nil
199206
}
200207

201208
case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), query, writeOptions,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException}
2424
import org.apache.spark.sql.catalyst.expressions.Attribute
25-
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, TableCatalog}
25+
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
2626
import org.apache.spark.sql.connector.expressions.Transform
2727
import org.apache.spark.sql.types.StructType
2828
import org.apache.spark.util.Utils
@@ -33,10 +33,13 @@ case class ReplaceTableExec(
3333
tableSchema: StructType,
3434
partitioning: Seq[Transform],
3535
tableProperties: Map[String, String],
36-
orCreate: Boolean) extends V2CommandExec {
36+
orCreate: Boolean,
37+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
3738

3839
override protected def run(): Seq[InternalRow] = {
3940
if (catalog.tableExists(ident)) {
41+
val table = catalog.loadTable(ident)
42+
invalidateCache(catalog, table, ident)
4043
catalog.dropTable(ident)
4144
} else if (!orCreate) {
4245
throw new CannotReplaceMissingTableException(ident)
@@ -54,9 +57,14 @@ case class AtomicReplaceTableExec(
5457
tableSchema: StructType,
5558
partitioning: Seq[Transform],
5659
tableProperties: Map[String, String],
57-
orCreate: Boolean) extends V2CommandExec {
60+
orCreate: Boolean,
61+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
5862

5963
override protected def run(): Seq[InternalRow] = {
64+
if (catalog.tableExists(identifier)) {
65+
val table = catalog.loadTable(identifier)
66+
invalidateCache(catalog, table, identifier)
67+
}
6068
val staged = if (orCreate) {
6169
catalog.stageCreateOrReplace(
6270
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
2626
import org.apache.spark.executor.CommitDeniedException
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.sql.SparkSession
3029
import org.apache.spark.sql.catalyst.InternalRow
3130
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
3231
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -130,15 +129,15 @@ case class AtomicCreateTableAsSelectExec(
130129
* ReplaceTableAsSelectStagingExec.
131130
*/
132131
case class ReplaceTableAsSelectExec(
133-
session: SparkSession,
134132
catalog: TableCatalog,
135133
ident: Identifier,
136134
partitioning: Seq[Transform],
137135
plan: LogicalPlan,
138136
query: SparkPlan,
139137
properties: Map[String, String],
140138
writeOptions: CaseInsensitiveStringMap,
141-
orCreate: Boolean) extends TableWriteExecHelper {
139+
orCreate: Boolean,
140+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
142141

143142
override protected def run(): Seq[InternalRow] = {
144143
// Note that this operation is potentially unsafe, but these are the strict semantics of
@@ -151,7 +150,7 @@ case class ReplaceTableAsSelectExec(
151150
// 3. The table returned by catalog.createTable doesn't support writing.
152151
if (catalog.tableExists(ident)) {
153152
val table = catalog.loadTable(ident)
154-
uncacheTable(session, catalog, table, ident)
153+
invalidateCache(catalog, table, ident)
155154
catalog.dropTable(ident)
156155
} else if (!orCreate) {
157156
throw new CannotReplaceMissingTableException(ident)
@@ -176,21 +175,21 @@ case class ReplaceTableAsSelectExec(
176175
* is left untouched.
177176
*/
178177
case class AtomicReplaceTableAsSelectExec(
179-
session: SparkSession,
180178
catalog: StagingTableCatalog,
181179
ident: Identifier,
182180
partitioning: Seq[Transform],
183181
plan: LogicalPlan,
184182
query: SparkPlan,
185183
properties: Map[String, String],
186184
writeOptions: CaseInsensitiveStringMap,
187-
orCreate: Boolean) extends TableWriteExecHelper {
185+
orCreate: Boolean,
186+
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
188187

189188
override protected def run(): Seq[InternalRow] = {
190189
val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
191190
if (catalog.tableExists(ident)) {
192191
val table = catalog.loadTable(ident)
193-
uncacheTable(session, catalog, table, ident)
192+
invalidateCache(catalog, table, ident)
194193
}
195194
val staged = if (orCreate) {
196195
catalog.stageCreateOrReplace(
@@ -364,15 +363,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
364363

365364
Nil
366365
}
367-
368-
protected def uncacheTable(
369-
session: SparkSession,
370-
catalog: TableCatalog,
371-
table: Table,
372-
ident: Identifier): Unit = {
373-
val plan = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
374-
session.sharedState.cacheManager.uncacheQuery(session, plan, cascade = true)
375-
}
376366
}
377367

378368
object DataWritingSparkTask extends Logging {

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,23 @@ class DataSourceV2SQLSuite
752752
assert(t2.v1Table.provider == Some(conf.defaultDataSourceName))
753753
}
754754

755+
test("SPARK-34039: ReplaceTable (atomic or non-atomic) should invalidate cache") {
756+
Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
757+
val view = "view"
758+
withTable(t) {
759+
withTempView(view) {
760+
sql(s"CREATE TABLE $t USING foo AS SELECT id, data FROM source")
761+
sql(s"CACHE TABLE $view AS SELECT id FROM $t")
762+
checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
763+
checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))
764+
765+
sql(s"REPLACE TABLE $t (a bigint) USING foo")
766+
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
767+
}
768+
}
769+
}
770+
}
771+
755772
test("SPARK-33492: ReplaceTableAsSelect (atomic or non-atomic) should invalidate cache") {
756773
Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
757774
val view = "view"

0 commit comments

Comments
 (0)