Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, ResolveDefaultColumns => DefaultCols}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -284,10 +284,20 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
AnalyzeColumnCommand(ident, columnNames, allColumns)

case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) =>
// V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here.
case RepairTable(
ResolvedV1TableIdentifierInSessionCatalog(ident),
addPartitions,
dropPartitions) =>
RepairTableCommand(ident, addPartitions, dropPartitions)

case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
// V2 catalog doesn't support LOAD DATA yet, we must use v1 command here.
case LoadData(
ResolvedV1TableIdentifierInSessionCatalog(ident),
path,
isLocal,
isOverwrite,
partition) =>
LoadDataCommand(
ident,
path,
Expand Down Expand Up @@ -336,7 +346,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
ShowColumnsCommand(db, v1TableName, output)

case RecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
// V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here.
case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) =>
RepairTableCommand(
ident,
enableAddPartitions = true,
Expand Down Expand Up @@ -364,8 +375,9 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
purge,
retainData = false)

// V2 catalog doesn't support setting serde properties yet, we must use v1 command here.
case SetTableSerDeProperties(
ResolvedV1TableIdentifier(ident),
ResolvedV1TableIdentifierInSessionCatalog(ident),
serdeClassName,
serdeProperties,
partitionSpec) =>
Expand All @@ -380,10 +392,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

// V2 catalog doesn't support setting partition location yet, we must use v1 command here.
case SetTableLocation(
ResolvedTable(catalog, _, t: V1Table, _),
ResolvedV1TableIdentifierInSessionCatalog(ident),
Some(partitionSpec),
location) if isSessionCatalog(catalog) =>
AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location)
location) =>
AlterTableSetLocationCommand(ident, Some(partitionSpec), location)

case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)
Expand Down Expand Up @@ -600,6 +612,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
}

object ResolvedV1TableIdentifierInSessionCatalog {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) =>
Some(t.catalogTable.identifier)
case _ => None
}
}

object ResolvedV1TableOrViewIdentifier {
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedV1TableIdentifier(ident) => Some(ident)
Expand Down Expand Up @@ -684,7 +704,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}

private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
isSessionCatalog(catalog) &&
SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty
isSessionCatalog(catalog) && (
SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty ||
catalog.isInstanceOf[DelegatingCatalogExtension])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ class DataSourceV2SQLSessionCatalogSuite
sql(s"CREATE EXTERNAL TABLE t (i INT) USING $v2Format TBLPROPERTIES($prop)")
}
}

test("SPARK-49152: partition columns should be put at the end") {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case name sounds a little mismatched. Is this a correct reproducer for DelegatingCatalogExtension issue, @amaliujia ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the Spark behavior with the built-in catalog. A DelegatingCatalogExtension shouldn't change it.

Strictly speaking, this shouldn't be the only issue, as there might be other subtle differences between v1 and v2 CREATE TABLE command, or other commands.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah as @cloud-fan have mentioned, this verifies a specific case so we make sure the behavior is not changed. And there might be a list to verify though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. It makes sense. Thank you for the explanation.

withTable("t") {
sql("CREATE TABLE t (c1 INT, c2 INT) USING json PARTITIONED BY (c1)")
// partition columns should be put at the end.
assert(getTableMetadata("default.t").columns().map(_.name()) === Seq("c2", "c1"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2125,10 +2125,18 @@ class DataSourceV2SQLSuiteV1Filter
}

test("REPLACE TABLE: v1 table") {
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
val v2Catalog = catalog("spark_catalog").asTableCatalog
val table = v2Catalog.loadTable(Identifier.of(Array("default"), "tbl"))
assert(table.properties().get(TableCatalog.PROP_PROVIDER) == classOf[SimpleScanSource].getName)
val e = intercept[AnalysisException] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")
}
checkError(
exception = e,
errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
sqlState = "0A000",
parameters = Map(
"tableName" -> "`spark_catalog`.`default`.`tbl`",
"operation" -> "REPLACE TABLE"
)
)
}

test("DeleteFrom: - delete with invalid predicate") {
Expand Down