-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19257][SQL]location for table/partition/database should be java.net.URI #17149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
38436e8
69a1646
890327a
e792cb6
f2b9bd8
109e2b5
dc0a37b
b6bc466
abfb6f5
80f2c40
681db88
5b92620
5b423f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.catalog | ||
|
|
||
| import java.net.URI | ||
|
|
||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.hadoop.util.Shell | ||
|
|
||
|
|
@@ -162,6 +164,28 @@ object CatalogUtils { | |
| BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols) | ||
| } | ||
|
|
||
| /** | ||
| * Convert URI to String. | ||
| * Since URI.toString does not decode for the uri string, we need to use | ||
| * Path(uri).toString to decode it. | ||
| * @param uri the URI of the path | ||
| * @return the String of the path | ||
| */ | ||
| def URIToString(uri: Option[URI]): Option[String] = { | ||
| uri.map(new Path(_).toString) | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Convert String to URI. | ||
| * Since new URI(string) does not encode for the path string, we need to use | ||
| * Path(string).toURI to encode it. | ||
| * @param str the String of the path | ||
| * @return the URI of the path | ||
| */ | ||
| def stringToURI(str: Option[String]): Option[URI] = { | ||
| str.map(new Path(_).toUri) | ||
| } | ||
|
|
||
| private def normalizeColumnName( | ||
| tableName: String, | ||
| tableCols: Seq[String], | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.catalog | ||
|
|
||
| import java.net.URI | ||
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
| import scala.collection.mutable | ||
|
|
@@ -131,7 +132,7 @@ class SessionCatalog( | |
| * does not contain a scheme, this path will not be changed after the default | ||
| * FileSystem is changed. | ||
| */ | ||
| private def makeQualifiedPath(path: String): Path = { | ||
| private def makeQualifiedPath(path: URI): Path = { | ||
|
||
| val hadoopPath = new Path(path) | ||
| val fs = hadoopPath.getFileSystem(hadoopConf) | ||
| fs.makeQualified(hadoopPath) | ||
|
|
@@ -170,7 +171,7 @@ class SessionCatalog( | |
| "you cannot create a database with this name.") | ||
| } | ||
| validateName(dbName) | ||
| val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString | ||
| val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toUri | ||
| externalCatalog.createDatabase( | ||
| dbDefinition.copy(name = dbName, locationUri = qualifiedPath), | ||
| ignoreIfExists) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,8 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.catalog | ||
|
|
||
| import java.net.URI | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.Path | ||
| import org.scalatest.BeforeAndAfterEach | ||
|
|
@@ -340,7 +342,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac | |
| "db1", | ||
| "tbl", | ||
| Map("partCol1" -> "1", "partCol2" -> "2")).location | ||
| val tableLocation = catalog.getTable("db1", "tbl").location | ||
| val tableLocation = new Path(catalog.getTable("db1", "tbl").location) | ||
| val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2") | ||
| assert(new Path(partitionLocation) == defaultPartitionLocation) | ||
| } | ||
|
|
@@ -508,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac | |
| partitionColumnNames = Seq("partCol1", "partCol2")) | ||
| catalog.createTable(table, ignoreIfExists = false) | ||
|
|
||
| val tableLocation = catalog.getTable("db1", "tbl").location | ||
| val tableLocation = new Path(catalog.getTable("db1", "tbl").location) | ||
|
|
||
| val mixedCasePart1 = CatalogTablePartition( | ||
| Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat) | ||
|
|
@@ -553,7 +555,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac | |
| test("alter partitions") { | ||
| val catalog = newBasicCatalog() | ||
| try { | ||
| val newLocation = newUriForDatabase() | ||
| val newLocation = new Path(newUriForDatabase()).toUri | ||
|
||
| val newSerde = "com.sparkbricks.text.EasySerde" | ||
| val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false") | ||
| // alter but keep spec the same | ||
|
|
@@ -699,7 +701,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac | |
| // File System operations | ||
| // -------------------------------------------------------------------------- | ||
|
|
||
| private def exists(uri: String, children: String*): Boolean = { | ||
| private def exists(uri: URI, children: String*): Boolean = { | ||
| val base = new Path(uri) | ||
| val finalPath = children.foldLeft(base) { | ||
| case (parent, child) => new Path(parent, child) | ||
|
|
@@ -742,7 +744,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac | |
| identifier = TableIdentifier("external_table", Some("db1")), | ||
| tableType = CatalogTableType.EXTERNAL, | ||
| storage = CatalogStorageFormat( | ||
| Some(Utils.createTempDir().getAbsolutePath), | ||
| Some(Utils.createTempDir().toURI), | ||
| None, None, None, false, Map.empty), | ||
| schema = new StructType().add("a", "int").add("b", "string"), | ||
| provider = Some(defaultProvider) | ||
|
|
@@ -790,7 +792,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac | |
| val partWithExistingDir = CatalogTablePartition( | ||
| Map("partCol1" -> "7", "partCol2" -> "8"), | ||
| CatalogStorageFormat( | ||
| Some(tempPath.toURI.toString), | ||
| Some(tempPath.toURI), | ||
| None, None, None, false, Map.empty)) | ||
| catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false) | ||
|
|
||
|
|
@@ -799,7 +801,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac | |
| val partWithNonExistingDir = CatalogTablePartition( | ||
| Map("partCol1" -> "9", "partCol2" -> "10"), | ||
| CatalogStorageFormat( | ||
| Some(tempPath.toURI.toString), | ||
| Some(tempPath.toURI), | ||
| None, None, None, false, Map.empty)) | ||
| catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false) | ||
| assert(tempPath.exists()) | ||
|
|
@@ -883,7 +885,7 @@ abstract class CatalogTestUtils { | |
|
|
||
| def newFunc(): CatalogFunction = newFunc("funcName") | ||
|
|
||
| def newUriForDatabase(): String = Utils.createTempDir().toURI.toString.stripSuffix("/") | ||
| def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can simply it and write
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan after I try to use Path to Compare , I think stripSuffix here is the simple way. Path with a Path with a So just the Path with But I think it is more complicate, here we normalize the URI with stripSuffix from the Orignal then compare two URI directly, it is more simple. should we must to convert it to Path to compare?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's ok if we always compare URI with Path, instead of converting it to string. |
||
|
|
||
| def newDb(name: String): CatalogDatabase = { | ||
| CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) | ||
|
|
@@ -895,7 +897,7 @@ abstract class CatalogTestUtils { | |
| CatalogTable( | ||
| identifier = TableIdentifier(name, database), | ||
| tableType = CatalogTableType.EXTERNAL, | ||
| storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)), | ||
| storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)), | ||
| schema = new StructType() | ||
| .add("col1", "int") | ||
| .add("col2", "string") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ | |
|
|
||
| import org.antlr.v4.runtime.{ParserRuleContext, Token} | ||
| import org.antlr.v4.runtime.tree.TerminalNode | ||
| import org.apache.hadoop.fs.Path | ||
|
||
|
|
||
| import org.apache.spark.sql.SaveMode | ||
| import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
|
|
@@ -386,7 +387,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { | |
| "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + | ||
| "you can only specify one of them.", ctx) | ||
| } | ||
| val customLocation = storage.locationUri.orElse(location) | ||
| val customLocation = storage.locationUri.orElse(CatalogUtils.stringToURI(location)) | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this has a double-de/encoding problem when the input path is an URI. scala> new org.apache.hadoop.fs.Path(new java.io.File("a b").toURI.toString).toUri.toString
res1: String = file:/.../a%2520bA space character in URI is encoded as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, it seems we have a similar util with @windpiger could you double check my comments and open a followup if I was not wrong?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
FYI.. cc @sarutak
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you reproduce it as a bug, please submit a PR to fix it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how could the input path be a URI string?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm.. is the input path always expected to be a path? I thought we support both URI and path forms.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's ambiguous to support both, what if users do wanna create a path
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Up to my knowledge, HDFS's fully qualified path is a URI form. If we do not support this, that virtually means we are going to disallow the fully qualified path. I understand it sounds ambiguous but disallowing does not look a good solution. Also, if users might want to access to local files or S3 when default scheme is
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In addition, I guess we already have a lot of tests with URI input paths and I did many of them to pass the tests on Windows, which I guess implicitly some committers do not disagree with this. IMHO, I guess URIs should be supported first correctly because those local path form is abbreviation of the fully qualified path.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
How about external tables on S3? |
||
| val tableType = if (customLocation.isDefined) { | ||
| CatalogTableType.EXTERNAL | ||
|
|
@@ -1080,8 +1081,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { | |
| if (external && location.isEmpty) { | ||
| operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) | ||
| } | ||
|
|
||
| val locUri = CatalogUtils.stringToURI(location) | ||
| val storage = CatalogStorageFormat( | ||
| locationUri = location, | ||
| locationUri = locUri, | ||
| inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), | ||
| outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), | ||
| serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), | ||
|
|
@@ -1132,7 +1135,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { | |
| // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties | ||
| // are empty Maps. | ||
| val newTableDesc = tableDesc.copy( | ||
| storage = CatalogStorageFormat.empty.copy(locationUri = location), | ||
| storage = CatalogStorageFormat.empty.copy(locationUri = locUri), | ||
| provider = Some(conf.defaultDataSourceName)) | ||
| CreateTable(newTableDesc, mode, Some(q)) | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,10 @@ | |
|
|
||
| package org.apache.spark.sql.execution.command | ||
|
|
||
| import java.net.URI | ||
|
|
||
| import org.apache.hadoop.fs.Path | ||
|
|
||
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
|
|
@@ -54,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |
|
|
||
| // Create the relation to validate the arguments before writing the metadata to the metastore, | ||
| // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. | ||
| val pathOption = table.storage.locationUri.map("path" -> _) | ||
| val pathOption = CatalogUtils.URIToString(table.storage.locationUri).map("path" -> _) | ||
| // Fill in some default table options from the session conf | ||
| val tableWithDefaultOptions = table.copy( | ||
| identifier = table.identifier.copy( | ||
|
|
@@ -146,7 +150,7 @@ case class CreateDataSourceTableAsSelectCommand( | |
| assert(table.schema.isEmpty) | ||
|
|
||
| val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { | ||
| Some(sessionState.catalog.defaultTablePath(table.identifier)) | ||
| Some(new Path(sessionState.catalog.defaultTablePath(table.identifier)).toUri) | ||
|
||
| } else { | ||
| table.storage.locationUri | ||
| } | ||
|
|
@@ -175,12 +179,12 @@ case class CreateDataSourceTableAsSelectCommand( | |
| private def saveDataIntoTable( | ||
| session: SparkSession, | ||
| table: CatalogTable, | ||
| tableLocation: Option[String], | ||
| tableLocation: Option[URI], | ||
| data: LogicalPlan, | ||
| mode: SaveMode, | ||
| tableExists: Boolean): BaseRelation = { | ||
| // Create the relation based on the input logical plan: `data`. | ||
| val pathOption = tableLocation.map("path" -> _) | ||
| val pathOption = CatalogUtils.URIToString(tableLocation).map("path" -> _) | ||
| val dataSource = DataSource( | ||
| session, | ||
| className = table.provider.get, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.