Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class InMemoryCatalog(
tableDefinition.storage.locationUri.isEmpty

val tableWithLocation = if (needDefaultTableLocation) {
val defaultTableLocation = new Path(catalog(db).db.locationUri, table)
val defaultTableLocation = new Path(new Path(catalog(db).db.locationUri), table)
try {
val fs = defaultTableLocation.getFileSystem(hadoopConfig)
fs.mkdirs(defaultTableLocation)
Expand All @@ -211,7 +211,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
tableDefinition
}
Expand Down Expand Up @@ -274,7 +274,7 @@ class InMemoryCatalog(
"Managed table should always have table location, as we will assign a default location " +
"to it if it doesn't have one.")
val oldDir = new Path(oldDesc.table.location)
val newDir = new Path(catalog(db).db.locationUri, newName)
val newDir = new Path(new Path(catalog(db).db.locationUri), newName)
try {
val fs = oldDir.getFileSystem(hadoopConfig)
fs.rename(oldDir, newDir)
Expand All @@ -283,7 +283,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
}
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}

catalog(db).tables.put(newName, oldDesc)
Expand Down Expand Up @@ -389,7 +389,7 @@ class InMemoryCatalog(

existingParts.put(
p.spec,
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))))
}
}

Expand Down Expand Up @@ -462,7 +462,7 @@ class InMemoryCatalog(
}
oldPartition.copy(
spec = newSpec,
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri)))
} else {
oldPartition.copy(spec = newSpec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class SessionCatalog(
"you cannot create a database with this name.")
}
validateName(dbName)
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
val qualifiedPath = makeQualifiedPath(new Path(dbDefinition.locationUri).toString).toUri
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makeQualifiedPath should accept URI now

externalCatalog.createDatabase(
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI
import java.util.Date

import com.google.common.base.Objects
Expand Down Expand Up @@ -48,10 +49,7 @@ case class CatalogFunction(
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
// TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must
// be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and
// path.toUri respectively before use as a filesystem path due to URI char escaping.
locationUri: Option[String],
locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
Expand Down Expand Up @@ -105,7 +103,7 @@ case class CatalogTablePartition(
}

/** Return the partition location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
def location: URI = storage.locationUri.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
}
Expand Down Expand Up @@ -210,7 +208,7 @@ case class CatalogTable(
}

/** Return the table location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
def location: URI = storage.locationUri.getOrElse {
throw new AnalysisException(s"table $identifier did not specify locationUri")
}

Expand Down Expand Up @@ -241,7 +239,7 @@ case class CatalogTable(

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[String] = storage.locationUri,
locationUri: Option[URI] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
Expand Down Expand Up @@ -337,7 +335,7 @@ object CatalogTableType {
case class CatalogDatabase(
name: String,
description: String,
locationUri: String,
locationUri: URI,
properties: Map[String, String])


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -340,8 +342,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
"db1",
"tbl",
Map("partCol1" -> "1", "partCol2" -> "2")).location
val tableLocation = catalog.getTable("db1", "tbl").location
val defaultPartitionLocation = new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2")
val tableLocationPath = new Path(catalog.getTable("db1", "tbl").location)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableLocationPath sound weird, let's keep the previous name

val defaultPartitionLocation = new Path(new Path(tableLocationPath, "partCol1=1"), "partCol2=2")
assert(new Path(partitionLocation) == defaultPartitionLocation)
}

Expand All @@ -365,10 +367,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac

val partition1 =
CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
storageFormat.copy(locationUri = Some(newLocationPart1)))
storageFormat.copy(locationUri = Some(new Path(newLocationPart1).toUri)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't newLocationPart1 already a URI?

val partition2 =
CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
storageFormat.copy(locationUri = Some(newLocationPart2)))
storageFormat.copy(locationUri = Some(new Path(newLocationPart2).toUri)))
catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false)
catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false)

Expand Down Expand Up @@ -508,7 +510,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

val tableLocation = catalog.getTable("db1", "tbl").location
val tableLocationPath = new Path(catalog.getTable("db1", "tbl").location)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep the previous name


val mixedCasePart1 = CatalogTablePartition(
Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat)
Expand All @@ -518,12 +520,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
catalog.createPartitions("db1", "tbl", Seq(mixedCasePart1), ignoreIfExists = false)
assert(
new Path(catalog.getPartition("db1", "tbl", mixedCasePart1.spec).location) ==
new Path(new Path(tableLocation, "partCol1=1"), "partCol2=2"))
new Path(new Path(tableLocationPath, "partCol1=1"), "partCol2=2"))

catalog.renamePartitions("db1", "tbl", Seq(mixedCasePart1.spec), Seq(mixedCasePart2.spec))
assert(
new Path(catalog.getPartition("db1", "tbl", mixedCasePart2.spec).location) ==
new Path(new Path(tableLocation, "partCol1=3"), "partCol2=4"))
new Path(new Path(tableLocationPath, "partCol1=3"), "partCol2=4"))

// For external tables, RENAME PARTITION should not update the partition location.
val existingPartLoc = catalog.getPartition("db2", "tbl2", part1.spec).location
Expand Down Expand Up @@ -553,21 +555,21 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
test("alter partitions") {
val catalog = newBasicCatalog()
try {
val newLocation = newUriForDatabase()
val newLocationUri = new Path(newUriForDatabase()).toUri
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the previous name

val newSerde = "com.sparkbricks.text.EasySerde"
val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false")
// alter but keep spec the same
val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
catalog.alterPartitions("db2", "tbl2", Seq(
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocationUri))),
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocationUri)))))
val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
assert(newPart1.storage.locationUri == Some(newLocation))
assert(newPart2.storage.locationUri == Some(newLocation))
assert(oldPart1.storage.locationUri != Some(newLocation))
assert(oldPart2.storage.locationUri != Some(newLocation))
assert(newPart1.storage.locationUri == Some(newLocationUri))
assert(newPart2.storage.locationUri == Some(newLocationUri))
assert(oldPart1.storage.locationUri != Some(newLocationUri))
assert(oldPart2.storage.locationUri != Some(newLocationUri))
// alter other storage information
catalog.alterPartitions("db2", "tbl2", Seq(
oldPart1.copy(storage = storageFormat.copy(serde = Some(newSerde))),
Expand Down Expand Up @@ -699,7 +701,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
// File System operations
// --------------------------------------------------------------------------

private def exists(uri: String, children: String*): Boolean = {
private def exists(uri: URI, children: String*): Boolean = {
val base = new Path(uri)
val finalPath = children.foldLeft(base) {
case (parent, child) => new Path(parent, child)
Expand Down Expand Up @@ -742,7 +744,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some(defaultProvider)
Expand Down Expand Up @@ -790,7 +792,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithExistingDir = CatalogTablePartition(
Map("partCol1" -> "7", "partCol2" -> "8"),
CatalogStorageFormat(
Some(tempPath.toURI.toString),
Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false)

Expand All @@ -799,7 +801,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithNonExistingDir = CatalogTablePartition(
Map("partCol1" -> "9", "partCol2" -> "10"),
CatalogStorageFormat(
Some(tempPath.toURI.toString),
Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false)
assert(tempPath.exists())
Expand Down Expand Up @@ -883,7 +885,7 @@ abstract class CatalogTestUtils {

def newFunc(): CatalogFunction = newFunc("funcName")

def newUriForDatabase(): String = Utils.createTempDir().toURI.toString.stripSuffix("/")
def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simply it and write Utils.createTempDir().toURI

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utils.createTempDir().toURI has a suffix '/', here we have to strip it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan after I try to use Path to Compare , I think stripSuffix here is the simple way.

Path with a String type constructor will be equal when one has a /, and another does not.

scala> val x = new Path("/ab/c/")
x: org.apache.hadoop.fs.Path = /ab/c

scala> val y = new Path("/ab/c")
y: org.apache.hadoop.fs.Path = /ab/c

scala> x == y
res0: Boolean = true

Path with a URI type constructor will be not equal when one has a /, and another does not.

scala> val x =new URI("/a/b/c/")
x: java.net.URI = /a/b/c/

scala> val y =new URI("/a/b/c")
y: java.net.URI = /a/b/c

scala> x == y
res1: Boolean = false

scala> val x1 =new Path(x)
x1: org.apache.hadoop.fs.Path = /a/b/c/

scala> val y1 =new Path(y)
y1: org.apache.hadoop.fs.Path = /a/b/c

scala> x1 == y1
res2: Boolean = false

So just the Path with String type can ignore the suffix /, then if we have a URI in hand, and we want to compare with another URI, we should first transform them to String , and use this String to constructor a Path, after this two actions, we can compare them with ignore the suffix /.

But I think it is more complicate, here we normalize the URI with stripSuffix from the Orignal then compare two URI directly, it is more simple.

should we must to convert it to Path to compare?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok if we always compare URI with Path, instead of converting it to string.


def newDb(name: String): CatalogDatabase = {
CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty)
Expand All @@ -895,7 +897,7 @@ abstract class CatalogTestUtils {
CatalogTable(
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)),
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalog

import java.net.URI
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary import?

import javax.annotation.Nullable

import org.apache.spark.annotation.InterfaceStability
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._

import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.hadoop.fs.Path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this


import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
Expand Down Expand Up @@ -397,7 +398,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val tableDesc = CatalogTable(
identifier = table,
tableType = tableType,
storage = storage.copy(locationUri = customLocation),
storage = storage.copy(locationUri = customLocation.map{ loc =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

storage.copy(
  locationUri = xxx)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add comments to explain why we don't create URI directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern appears many times in this PR, how about we create a util method toURI in CatalogUtils and add comments there?

new Path(loc.toString).toUri}),
schema = schema.getOrElse(new StructType),
provider = Some(provider),
partitionColumnNames = partitionColumnNames,
Expand Down Expand Up @@ -1080,8 +1082,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
}

val locUri = location.map{ loc => new Path(loc).toUri }
val storage = CatalogStorageFormat(
locationUri = location,
locationUri = locUri,
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
Expand Down Expand Up @@ -1132,7 +1136,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
val newTableDesc = tableDesc.copy(
storage = CatalogStorageFormat.empty.copy(locationUri = location),
storage = CatalogStorageFormat.empty.copy(locationUri = locUri),
provider = Some(conf.defaultDataSourceName))
CreateTable(newTableDesc, mode, Some(q))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql.execution.command

import java.net.URI

import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -54,7 +58,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo

// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> _)
val pathOption = table.storage.locationUri.map("path" -> new Path(_).toString)
// Fill in some default table options from the session conf
val tableWithDefaultOptions = table.copy(
identifier = table.identifier.copy(
Expand Down Expand Up @@ -146,7 +150,7 @@ case class CreateDataSourceTableAsSelectCommand(
assert(table.schema.isEmpty)

val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
Some(new Path(sessionState.catalog.defaultTablePath(table.identifier)).toUri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the defaultTablePath should also return URI now.

} else {
table.storage.locationUri
}
Expand Down Expand Up @@ -175,12 +179,12 @@ case class CreateDataSourceTableAsSelectCommand(
private def saveDataIntoTable(
session: SparkSession,
table: CatalogTable,
tableLocation: Option[String],
tableLocation: Option[URI],
data: LogicalPlan,
mode: SaveMode,
tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
val pathOption = tableLocation.map("path" -> _)
val pathOption = tableLocation.map("path" -> new Path(_).toString)
val dataSource = DataSource(
session,
className = table.provider.get,
Expand Down
Loading