Skip to content

Commit 7b1fade

Browse files
committed
[SPARK-54157][SQL] Fix refresh of DSv2 tables between Dataset executions
1 parent 8c76795 commit 7b1fade

File tree

10 files changed

+557
-7
lines changed

10 files changed

+557
-7
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2165,6 +2165,31 @@
21652165
],
21662166
"sqlState" : "42000"
21672167
},
2168+
"INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS" : {
2169+
"message" : [
2170+
"Detected incompatible changes to table <tableName> after DataFrame/Dataset has been resolved and analyzed, meaning the underlying plan is out of sync. Please, re-create DataFrame/Dataset before attempting to execute the query again."
2171+
],
2172+
"subClass" : {
2173+
"COLUMNS_MISMATCH" : {
2174+
"message" : [
2175+
"Data columns have changed:",
2176+
"<errors>"
2177+
]
2178+
},
2179+
"METADATA_COLUMNS_MISMATCH" : {
2180+
"message" : [
2181+
"Metadata columns have changed:",
2182+
"<errors>"
2183+
]
2184+
},
2185+
"TABLE_ID_MISMATCH" : {
2186+
"message" : [
2187+
"Table ID has changed from <capturedTableId> to <detectedTableId>."
2188+
]
2189+
}
2190+
},
2191+
"sqlState" : "51024"
2192+
},
21682193
"INCOMPATIBLE_VIEW_SCHEMA_CHANGE" : {
21692194
"message" : [
21702195
"The SQL query of view <viewName> has an incompatible schema change and column <colName> cannot be resolved. Expected <expectedNum> columns named <colName> but got <actualCols>.",

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ public interface Table {
5050
*/
5151
String name();
5252

53+
/**
54+
* An ID of the table that can be used to reliably check if two table objects refer to the same
55+
* metastore entity. If a table is dropped and recreated again with the same name, the new table ID
56+
* must be different. This method must return null if connectors don't support the notion of table ID.
57+
*/
58+
default String id() {
59+
return null;
60+
}
61+
5362
/**
5463
* Returns the schema of this table. If the table is not readable and doesn't have a schema, an
5564
* empty schema can be returned here.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.catalog
19+
20+
import java.util.Locale
21+
22+
import scala.collection.mutable
23+
24+
import org.apache.spark.sql.catalyst.SQLConfHelper
25+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
26+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
27+
import org.apache.spark.sql.types.DataType
28+
import org.apache.spark.util.ArrayImplicits._
29+
30+
private[sql] object V2TableUtil extends SQLConfHelper {
31+
32+
def validateCapturedColumns(table: Table, relation: DataSourceV2Relation): Seq[String] = {
33+
validateCapturedColumns(table, relation.table.columns.toImmutableArraySeq)
34+
}
35+
36+
def validateCapturedColumns(table: Table, originCols: Seq[Column]): Seq[String] = {
37+
val errors = mutable.ArrayBuffer[String]()
38+
39+
val colsByNormalizedName = indexByNormalizedName(table.columns.toImmutableArraySeq)
40+
val originColsByNormalizedName = indexByNormalizedName(originCols)
41+
42+
originColsByNormalizedName.foreach { case (normalizedName, originCol) =>
43+
colsByNormalizedName.get(normalizedName) match {
44+
case Some(col) =>
45+
if (originCol.dataType != col.dataType || originCol.nullable != col.nullable) {
46+
val oldType = formatType(originCol.dataType, originCol.nullable)
47+
val newType = formatType(col.dataType, col.nullable)
48+
errors += s"`${originCol.name}` type has changed from $oldType to $newType"
49+
}
50+
case None =>
51+
errors += s"${formatColumn(originCol)} is missing"
52+
}
53+
}
54+
55+
colsByNormalizedName.foreach { case (normalizedName, col) =>
56+
if (!originColsByNormalizedName.contains(normalizedName)) {
57+
errors += s"${formatColumn(col)} has been added"
58+
}
59+
}
60+
61+
errors.toSeq
62+
}
63+
64+
def validateCapturedMetadataColumns(
65+
table: Table,
66+
metaAttrs: Seq[AttributeReference]): Seq[String] = {
67+
val errors = mutable.ArrayBuffer[String]()
68+
val metaColsByNormalizedName = metadataColumnsByNormalizedName(table)
69+
70+
metaAttrs.foreach { metaAttr =>
71+
val normalizedName = normalize(metaAttr.name)
72+
metaColsByNormalizedName.get(normalizedName) match {
73+
case Some(metaCol) =>
74+
if (metaAttr.dataType != metaCol.dataType || metaAttr.nullable != metaCol.isNullable) {
75+
val oldType = formatType(metaAttr.dataType, metaAttr.nullable)
76+
val newType = formatType(metaCol.dataType, metaCol.isNullable)
77+
errors += s"`${metaAttr.name}` type has changed from $oldType to $newType"
78+
}
79+
case None =>
80+
errors += s"${formatAttr(metaAttr)} is missing"
81+
}
82+
}
83+
84+
errors.toSeq
85+
}
86+
87+
private def metadataColumnsByNormalizedName(table: Table): Map[String, MetadataColumn] = {
88+
table match {
89+
case hasMeta: SupportsMetadataColumns =>
90+
hasMeta.metadataColumns.map(col => normalize(col.name) -> col).toMap
91+
case _ =>
92+
Map.empty
93+
}
94+
}
95+
96+
private def formatColumn(col: Column): String = {
97+
s"`${col.name}` ${formatType(col.dataType, col.nullable)}"
98+
}
99+
100+
private def formatAttr(attr: AttributeReference): String = {
101+
s"`${attr.name}` ${formatType(attr.dataType, attr.nullable)}"
102+
}
103+
104+
private def formatType(dataType: DataType, nullable: Boolean): String = {
105+
if (nullable) dataType.sql else s"${dataType.sql} NOT NULL"
106+
}
107+
108+
private def indexByNormalizedName(cols: Seq[Column]): Map[String, Column] = {
109+
cols.map(col => normalize(col.name) -> col).toMap
110+
}
111+
112+
private def normalize(name: String): String = {
113+
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
114+
}
115+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,6 +2113,38 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
21132113
}
21142114
}
21152115

2116+
def tableIdChangedAfterAnalysis(
2117+
tableName: String,
2118+
capturedTableId: String,
2119+
detectedTableId: String): Throwable = {
2120+
new AnalysisException(
2121+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH",
2122+
messageParameters = Map(
2123+
"tableName" -> toSQLId(tableName),
2124+
"capturedTableId" -> capturedTableId,
2125+
"detectedTableId" -> detectedTableId))
2126+
}
2127+
2128+
def columnsChangedAfterAnalysis(
2129+
tableName: String,
2130+
errors: Seq[String]): Throwable = {
2131+
new AnalysisException(
2132+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH",
2133+
messageParameters = Map(
2134+
"tableName" -> toSQLId(tableName),
2135+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
2136+
}
2137+
2138+
def metadataColumnsChangedAfterAnalysis(
2139+
tableName: String,
2140+
errors: Seq[String]): Throwable = {
2141+
new AnalysisException(
2142+
errorClass = "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.METADATA_COLUMNS_MISMATCH",
2143+
messageParameters = Map(
2144+
"tableName" -> toSQLId(tableName),
2145+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
2146+
}
2147+
21162148
def numberOfPartitionsNotAllowedWithUnspecifiedDistributionError(): Throwable = {
21172149
new AnalysisException(
21182150
errorClass = "INVALID_WRITE_DISTRIBUTION.PARTITION_NUM_WITH_UNSPECIFIED_DISTRIBUTION",

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
2424
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
2525
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
2626
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, truncatedString, CharVarcharUtils}
27-
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability}
27+
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability, TableCatalog}
28+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
2829
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
2930
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
3031
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -259,10 +260,10 @@ object ExtractV2Table {
259260
}
260261

261262
object ExtractV2CatalogAndIdentifier {
262-
def unapply(relation: DataSourceV2Relation): Option[(CatalogPlugin, Identifier)] = {
263+
def unapply(relation: DataSourceV2Relation): Option[(TableCatalog, Identifier)] = {
263264
relation match {
264265
case DataSourceV2Relation(_, _, Some(catalog), Some(identifier), _, _) =>
265-
Some((catalog, identifier))
266+
Some((catalog.asTableCatalog, identifier))
266267
case _ =>
267268
None
268269
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.connector.catalog
1919

2020
import java.util
21+
import java.util.{Objects, UUID}
2122

2223
import org.apache.spark.sql.connector.catalog.constraints.Constraint
2324
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
@@ -42,7 +43,8 @@ class InMemoryTable(
4243
numPartitions: Option[Int] = None,
4344
advisoryPartitionSize: Option[Long] = None,
4445
isDistributionStrictlyRequired: Boolean = true,
45-
override val numRowsPerSplit: Int = Int.MaxValue)
46+
override val numRowsPerSplit: Int = Int.MaxValue,
47+
override val id: String = UUID.randomUUID().toString)
4648
extends InMemoryBaseTable(name, columns, partitioning, properties, constraints, distribution,
4749
ordering, numPartitions, advisoryPartitionSize, isDistributionStrictlyRequired,
4850
numRowsPerSplit) with SupportsDelete {
@@ -137,7 +139,8 @@ class InMemoryTable(
137139
numPartitions,
138140
advisoryPartitionSize,
139141
isDistributionStrictlyRequired,
140-
numRowsPerSplit)
142+
numRowsPerSplit,
143+
id)
141144

142145
dataMap.synchronized {
143146
dataMap.foreach { case (key, splits) =>
@@ -160,6 +163,16 @@ class InMemoryTable(
160163
copiedTable
161164
}
162165

166+
override def equals(other: Any): Boolean = other match {
167+
case that: InMemoryTable =>
168+
this.id == that.id && this.currentVersion() == that.currentVersion()
169+
case _ => false
170+
}
171+
172+
override def hashCode(): Int = {
173+
Objects.hash(id, currentVersion())
174+
}
175+
163176
class InMemoryWriterBuilderWithOverWrite(override val info: LogicalWriteInfo)
164177
extends InMemoryWriterBuilder(info) with SupportsOverwrite {
165178

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,22 @@ class BasicInMemoryTableCatalog extends TableCatalog {
5757
tables.keySet.asScala.filter(_.namespace.sameElements(namespace)).toArray
5858
}
5959

60+
// load table for scans
6061
override def loadTable(ident: Identifier): Table = {
62+
Option(tables.get(ident)) match {
63+
case Some(table: InMemoryTable) =>
64+
table.copy() // copy to validate logical table equality
65+
case Some(table) =>
66+
table
67+
case _ =>
68+
throw new NoSuchTableException(ident.asMultipartIdentifier)
69+
}
70+
}
71+
72+
// load table for writes
73+
override def loadTable(
74+
ident: Identifier,
75+
writePrivileges: util.Set[TableWritePrivilege]): Table = {
6176
Option(tables.get(ident)) match {
6277
case Some(table) =>
6378
table
@@ -169,7 +184,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
169184
columns = CatalogV2Util.structTypeToV2Columns(schema),
170185
partitioning = finalPartitioning,
171186
properties = properties,
172-
constraints = constraints)
187+
constraints = constraints,
188+
id = table.id)
173189
.alterTableWithData(table.data, schema)
174190
newTable.setCurrentVersion(currentVersion)
175191
changes.foreach {

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
4141
import org.apache.spark.sql.catalyst.util.truncatedString
4242
import org.apache.spark.sql.classic.SparkSession
4343
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
44+
import org.apache.spark.sql.execution.analysis.RefreshTableVersions
4445
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
4546
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
4647
import org.apache.spark.sql.execution.exchange.EnsureRequirements
@@ -203,8 +204,15 @@ class QueryExecution(
203204
}
204205
}
205206

207+
// refresh table versions before looking up cache
208+
private val lazyTableVersionsPinned = LazyTry {
209+
RefreshTableVersions(commandExecuted)
210+
}
211+
212+
private[sql] def tableVersionsPinned: LogicalPlan = lazyTableVersionsPinned.get
213+
206214
private val lazyNormalized = LazyTry {
207-
QueryExecution.normalize(sparkSession, commandExecuted, Some(tracker))
215+
QueryExecution.normalize(sparkSession, tableVersionsPinned, Some(tracker))
208216
}
209217

210218
// The plan that has been normalized by custom rules, so that it's more likely to hit cache.

0 commit comments

Comments
 (0)