Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface TableCatalog extends CatalogPlugin {

/**
* A reserved property to specify the location of the table. The files of the table
* should be under this location.
* should be under this location. The location is a Hadoop Path string.
*/
String PROP_LOCATION = "location";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TableIdentifierHelper
import org.apache.spark.sql.connector.catalog.V1Table.addV2TableProperties
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, Transform}
Expand All @@ -38,7 +38,7 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
lazy val options: Map[String, String] = {
v1Table.storage.locationUri match {
case Some(uri) =>
v1Table.storage.properties + ("path" -> uri.toString)
v1Table.storage.properties + ("path" -> CatalogUtils.URIToString(uri))
case _ =>
v1Table.storage.properties
}
Expand Down Expand Up @@ -81,7 +81,9 @@ private[sql] object V1Table {
TableCatalog.OPTION_PREFIX + key -> value } ++
v1Table.provider.map(TableCatalog.PROP_PROVIDER -> _) ++
v1Table.comment.map(TableCatalog.PROP_COMMENT -> _) ++
v1Table.storage.locationUri.map(TableCatalog.PROP_LOCATION -> _.toString) ++
v1Table.storage.locationUri.map { loc =>
TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(loc)
} ++
(if (managed) Some(TableCatalog.PROP_IS_MANAGED_LOCATION -> "true") else None) ++
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
Some(TableCatalog.PROP_OWNER -> v1Table.owner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
serdeProperties,
partitionSpec)

case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident, partitionSpec, location)
case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) =>
AlterTableSetLocationCommand(ident, None, location)

// V2 catalog doesn't support setting partition location yet, we must use v1 command here.
case SetTableLocation(
ResolvedTable(catalog, _, t: V1Table, _),
Some(partitionSpec),
location) if isSessionCatalog(catalog) =>
AlterTableSetLocationCommand(t.v1Table.identifier, Some(partitionSpec), location)

case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -105,8 +106,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

private def qualifyLocInTableSpec(tableSpec: TableSpec): TableSpec = {
tableSpec.withNewLocation(tableSpec.location.map(loc => CatalogUtils.makeQualifiedPath(
CatalogUtils.stringToURI(loc), hadoopConf).toString))
val newLoc = tableSpec.location.map { loc =>
val locationUri = CatalogUtils.stringToURI(loc)
val qualified = if (locationUri.isAbsolute) {
locationUri
} else if (new Path(locationUri).isAbsolute) {
CatalogUtils.makeQualifiedPath(locationUri, hadoopConf)
} else {
// Leave it to the catalog implementation to qualify relative paths.
locationUri
}
CatalogUtils.URIToString(qualified)
}
tableSpec.withNewLocation(newLoc)
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ CREATE TABLE spark_catalog.default.tbl (
b STRING,
c INT)
USING parquet
LOCATION 'file:///path/to/table'
LOCATION 'file:/path/to/table'


-- !query
Expand Down Expand Up @@ -108,7 +108,7 @@ CREATE TABLE spark_catalog.default.tbl (
b STRING,
c INT)
USING parquet
LOCATION 'file:///path/to/table'
LOCATION 'file:/path/to/table'


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
spark.range(20).write.format(v2Format).option("path", "/abc").saveAsTable(t1)
val cat = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1))
assert(tableInfo.properties().get("location") === "file:///abc")
assert(tableInfo.properties().get("location") === "file:/abc")
assert(tableInfo.properties().get("provider") === v2Format)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,25 @@ class DataSourceV2SQLSuiteV1Filter
}
}

test("SPARK-49152: CreateTable should store location as qualified") {
val tbl = "testcat.table_name"

def testWithLocation(location: String, qualified: String): Unit = {
withTable(tbl) {
sql(s"CREATE TABLE $tbl USING foo LOCATION '$location'")
val loc = catalog("testcat").asTableCatalog
.loadTable(Identifier.of(Array.empty, "table_name"))
.properties().get(TableCatalog.PROP_LOCATION)
assert(loc === qualified)
}
}

testWithLocation("/absolute/path", "file:/absolute/path")
testWithLocation("s3://host/full/path", "s3://host/full/path")
testWithLocation("relative/path", "relative/path")
testWithLocation("/path/special+ char", "file:/path/special+ char")
}

test("SPARK-37545: CreateTableAsSelect should store location as qualified") {
val basicIdentifier = "testcat.table_name"
val atomicIdentifier = "testcat_atomic.table_name"
Expand All @@ -442,7 +461,7 @@ class DataSourceV2SQLSuiteV1Filter
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
assert(location === "file:/tmp/foo")
}
}
}
Expand All @@ -459,7 +478,7 @@ class DataSourceV2SQLSuiteV1Filter
val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
.filter("col_name = 'Location'")
.select("data_type").head().getString(0)
assert(location === "file:///tmp/foo")
assert(location === "file:/tmp/foo")
}
}
}
Expand Down Expand Up @@ -1357,8 +1376,7 @@ class DataSourceV2SQLSuiteV1Filter
val identifier = Identifier.of(Array(), "reservedTest")
val location = tableCatalog.loadTable(identifier).properties()
.get(TableCatalog.PROP_LOCATION)
assert(location.startsWith("file:") && location.endsWith("foo"),
"path as a table property should not have side effects")
assert(location == "foo", "path as a table property should not have side effects")
assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar",
"path as a table property should not have side effects")
assert(tableCatalog.loadTable(identifier).properties().get("Path") == "noop",
Expand Down Expand Up @@ -3148,7 +3166,7 @@ class DataSourceV2SQLSuiteV1Filter
val properties = table.properties
assert(properties.get(TableCatalog.PROP_PROVIDER) == "parquet")
assert(properties.get(TableCatalog.PROP_COMMENT) == "This is a comment")
assert(properties.get(TableCatalog.PROP_LOCATION) == "file:///tmp")
assert(properties.get(TableCatalog.PROP_LOCATION) == "file:/tmp")
assert(properties.containsKey(TableCatalog.PROP_OWNER))
assert(properties.get(TableCatalog.PROP_EXTERNAL) == "true")
assert(properties.get(s"${TableCatalog.OPTION_PREFIX}from") == "0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
"'via' = '2')",
"PARTITIONED BY (a)",
"COMMENT 'This is a comment'",
"LOCATION 'file:///tmp'",
"LOCATION 'file:/tmp'",
"TBLPROPERTIES (",
"'password' = '*********(redacted)',",
"'prop1' = '1',",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(table.properties().get("comment").equals(description))
assert(table.properties().get("path").equals(dir.getAbsolutePath))
assert(table.properties().get("external").equals("true"))
assert(table.properties().get("location").equals("file://" + dir.getAbsolutePath))
assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
}
}

Expand Down