Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,23 @@ class SqlParser extends AbstractSparkSQLParser {
joinedRelation | relationFactor

protected lazy val relationFactor: Parser[LogicalPlan] =
( ident ~ (opt(AS) ~> opt(ident)) ^^ {
case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
(
ident ~ ("." ~> ident) ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ {
case reserveName1 ~ reserveName2 ~ dbName ~ tableName ~ alias =>
UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName2, reserveName1), alias)
}
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
| ident ~ ("." ~> ident) ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ {
case reserveName1 ~ dbName ~ tableName ~ alias =>
UnresolvedRelation(IndexedSeq(tableName, dbName, reserveName1), alias)
}
| ident ~ ("." ~> ident) ~ (opt(AS) ~> opt(ident)) ^^ {
case dbName ~ tableName ~ alias =>
UnresolvedRelation(IndexedSeq(tableName, dbName), alias)
}
| ident ~ (opt(AS) ~> opt(ident)) ^^ {
case tableName ~ alias => UnresolvedRelation(IndexedSeq(tableName), alias)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably just be a single rule. With something like: repsep(ident, ",") for the table identifier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change it to rep1sep(ident, ".")

}
| ("(" ~> start <~ ")") ~ (AS.? ~> ident) ^^ { case s ~ a => Subquery(a, s) }
)

protected lazy val joinedRelation: Parser[LogicalPlan] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,11 @@ class Analyzer(catalog: Catalog,
*/
object ResolveRelations extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(UnresolvedRelation(databaseName, name, alias), _, _, _) =>
case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) =>
i.copy(
table = EliminateAnalysisOperators(catalog.lookupRelation(databaseName, name, alias)))
case UnresolvedRelation(databaseName, name, alias) =>
catalog.lookupRelation(databaseName, name, alias)
table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias)))
case UnresolvedRelation(tableIdentifier, alias) =>
catalog.lookupRelation(tableIdentifier, alias)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,77 +28,63 @@ trait Catalog {

def caseSensitive: Boolean

def tableExists(db: Option[String], tableName: String): Boolean
def tableExists(tableIdentifier: Seq[String]): Boolean

def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan
tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent wrapped arguments 4 spaces.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed


def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit

def unregisterTable(databaseName: Option[String], tableName: String): Unit
def unregisterTable(tableIdentifier: Seq[String]): Unit

def unregisterAllTables(): Unit

protected def processDatabaseAndTableName(
databaseName: Option[String],
tableName: String): (Option[String], String) = {
protected def processTableIdentifier(tableIdentifier: Seq[String]):
Seq[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be wrapped, does it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

if (!caseSensitive) {
(databaseName.map(_.toLowerCase), tableName.toLowerCase)
tableIdentifier.map(_.toLowerCase)
} else {
(databaseName, tableName)
tableIdentifier
}
}

protected def processDatabaseAndTableName(
databaseName: String,
tableName: String): (String, String) = {
if (!caseSensitive) {
(databaseName.toLowerCase, tableName.toLowerCase)
} else {
(databaseName, tableName)
}
}
}

class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()

override def registerTable(
databaseName: Option[String],
tableName: String,
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
tables += ((tblName, plan))
val tableIdent = processTableIdentifier(tableIdentifier)
tables += ((tableIdent.mkString("."), plan))
}

override def unregisterTable(
databaseName: Option[String],
tableName: String) = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
tables -= tblName
override def unregisterTable(tableIdentifier: Seq[String]) = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables -= tableIdent.mkString(".")
}

override def unregisterAllTables() = {
tables.clear()
}

override def tableExists(db: Option[String], tableName: String): Boolean = {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
tables.get(tblName) match {
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables.get(tableIdent.mkString(".")) match {
case Some(_) => true
case None => false
}
}

override def lookupRelation(
databaseName: Option[String],
tableName: String,
tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName"))
val tableWithQualifiers = Subquery(tblName, table)
val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = tableIdent.mkString(".")
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
val tableWithQualifiers = Subquery(tableIdent.head, table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableIdent.head? Should it be last?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I store the seq in reversed order, so table name is at head.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry that was not obvious to me, and seems pretty confusing. Why not store it in the order the user gave it to you?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By reversed order, we know database name will be at 2nd, so it's easy to access it by using lift(1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does not seem worth the confusion of having it in reverse order. Also looking at that code more closely, we would silently discard extra part of the table identifier right? Seems like we should explicitly handle the cases where there are 1, 2, and more elements of the table identifier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed


// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
Expand All @@ -115,43 +101,41 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
trait OverrideCatalog extends Catalog {

// TODO: This doesn't work when the database changes...
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
val overrides = new mutable.HashMap[String, LogicalPlan]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why collapse this to a String? What about a table with a name like the following:

SELECT * FROM `database`.`table.name`
SELECT * FROM `database.table`.`name`

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore it to (Option[String],String)


abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
overrides.get((dbName, tblName)) match {
abstract override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier).mkString(".")
overrides.get(tableIdent) match {
case Some(_) => true
case None => super.tableExists(db, tableName)
case None => super.tableExists(tableIdentifier)
}
}

abstract override def lookupRelation(
databaseName: Option[String],
tableName: String,
tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
val overriddenTable = overrides.get((dbName, tblName))
val tableWithQualifers = overriddenTable.map(r => Subquery(tblName, r))
val tableIdent = processTableIdentifier(tableIdentifier)
val overriddenTable = overrides.get(tableIdent.mkString("."))
val tableWithQualifers = overriddenTable.map(r => Subquery(tableIdent.head, r))

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
// properly qualified with this alias.
val withAlias =
tableWithQualifers.map(r => alias.map(a => Subquery(a, r)).getOrElse(r))

withAlias.getOrElse(super.lookupRelation(dbName, tblName, alias))
withAlias.getOrElse(super.lookupRelation(tableIdentifier, alias))
}

override def registerTable(
databaseName: Option[String],
tableName: String,
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
overrides.put((dbName, tblName), plan)
val tableIdent = processTableIdentifier(tableIdentifier).mkString(".")
overrides.put(tableIdent, plan)
}

override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
overrides.remove((dbName, tblName))
override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier).mkString(".")
overrides.remove(tableIdent)
}

override def unregisterAllTables(): Unit = {
Expand All @@ -167,22 +151,21 @@ object EmptyCatalog extends Catalog {

val caseSensitive: Boolean = true

def tableExists(db: Option[String], tableName: String): Boolean = {
def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
}

def lookupRelation(
databaseName: Option[String],
tableName: String,
tableIdentifier: Seq[String],
alias: Option[String] = None) = {
throw new UnsupportedOperationException
}

def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit = {
throw new UnsupportedOperationException
}

def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
def unregisterTable(tableIdentifier: Seq[String]): Unit = {
throw new UnsupportedOperationException
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
*/
case class UnresolvedRelation(
databaseName: Option[String],
tableName: String,
tableIdentifier: Seq[String],
alias: Option[String] = None) extends LeafNode {
override def output = Nil
override lazy val resolved = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ package object dsl {

def insertInto(tableName: String, overwrite: Boolean = false) =
InsertIntoTable(
analysis.UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)
analysis.UnresolvedRelation(IndexedSeq(tableName)), Map.empty, logicalPlan, overwrite)

def analyze = analysis.SimpleAnalyzer(logicalPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
AttributeReference("e", ShortType)())

before {
caseSensitiveCatalog.registerTable(None, "TaBlE", testRelation)
caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
caseSensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation)
caseInsensitiveCatalog.registerTable(IndexedSeq("TaBlE"), testRelation)
}

test("union project *") {
Expand All @@ -64,45 +64,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
assert(
caseSensitiveAnalyze(
Project(Seq(UnresolvedAttribute("TbL.a")),
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))

val e = intercept[TreeNodeException[_]] {
caseSensitiveAnalyze(
Project(Seq(UnresolvedAttribute("tBl.a")),
UnresolvedRelation(None, "TaBlE", Some("TbL"))))
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL"))))
}
assert(e.getMessage().toLowerCase.contains("unresolved"))

assert(
caseInsensitiveAnalyze(
Project(Seq(UnresolvedAttribute("TbL.a")),
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))

assert(
caseInsensitiveAnalyze(
Project(Seq(UnresolvedAttribute("tBl.a")),
UnresolvedRelation(None, "TaBlE", Some("TbL")))) ===
UnresolvedRelation(IndexedSeq("TaBlE"), Some("TbL")))) ===
Project(testRelation.output, testRelation))
}

test("resolve relations") {
val e = intercept[RuntimeException] {
caseSensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None))
caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None))
}
assert(e.getMessage == "Table Not Found: tAbLe")

assert(
caseSensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
caseSensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) ===
testRelation)

assert(
caseInsensitiveAnalyze(UnresolvedRelation(None, "tAbLe", None)) ===
caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("tAbLe"), None)) ===
testRelation)

assert(
caseInsensitiveAnalyze(UnresolvedRelation(None, "TaBlE", None)) ===
caseInsensitiveAnalyze(UnresolvedRelation(IndexedSeq("TaBlE"), None)) ===
testRelation)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val f: Expression = UnresolvedAttribute("f")

before {
catalog.registerTable(None, "table", relation)
catalog.registerTable(IndexedSeq("table"), relation)
}

private def checkType(expression: Expression, expectedType: DataType): Unit = {
Expand Down
6 changes: 3 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
catalog.registerTable(None, tableName, rdd.queryExecution.logical)
catalog.registerTable(IndexedSeq(tableName), rdd.queryExecution.logical)
}

/**
Expand All @@ -289,7 +289,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def dropTempTable(tableName: String): Unit = {
tryUncacheQuery(table(tableName))
catalog.unregisterTable(None, tableName)
catalog.unregisterTable(IndexedSeq(tableName))
}

/**
Expand All @@ -308,7 +308,7 @@ class SQLContext(@transient val sparkContext: SparkContext)

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
new SchemaRDD(this, catalog.lookupRelation(None, tableName))
new SchemaRDD(this, catalog.lookupRelation(IndexedSeq(tableName)))

/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private[sql] trait SchemaRDDLike {
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit =
sqlContext.executePlan(
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(IndexedSeq(tableName)),
Map.empty, logicalPlan, overwrite)).toRdd

/**
* :: Experimental ::
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
upperCaseData.where('N <= 4).registerTempTable("left")
upperCaseData.where('N >= 3).registerTempTable("right")

val left = UnresolvedRelation(None, "left", None)
val right = UnresolvedRelation(None, "right", None)
val left = UnresolvedRelation(IndexedSeq("left"), None)
val right = UnresolvedRelation(IndexedSeq("right"), None)

checkAnswer(
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* in the Hive metastore.
*/
def analyze(tableName: String) {
val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName))
val relation = EliminateAnalysisOperators(catalog.lookupRelation(IndexedSeq(tableName)))

relation match {
case relation: MetastoreRelation =>
Expand Down
Loading