Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
Expand Down Expand Up @@ -307,6 +308,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case object CommandStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil

case CreateTableUsing(tableName, provider, true, options) =>
ExecutedCommand(
CreateTempTableUsing(tableName, provider, options)) :: Nil

case CreateTableUsing(tableName, provider, false, options) =>
sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")

case logical.SetCommand(kv) =>
Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
case logical.ExplainCommand(logicalPlan, extended) =>
Expand Down
39 changes: 29 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
*/
protected lazy val createTable: Parser[LogicalPlan] =
CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ provider ~ opts =>
CreateTableUsing(tableName, provider, opts)
(CREATE ~> TEMPORARY.? <~ TABLE) ~ ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case temp ~ tableName ~ provider ~ opts =>
CreateTableUsing(tableName, provider, temp.isDefined, opts)
}

protected lazy val options: Parser[Map[String, String]] =
Expand All @@ -85,12 +85,11 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) }
}

private[sql] case class CreateTableUsing(
tableName: String,
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext) = {
object ResolvedDataSource {
def apply(
sqlContext: SQLContext,
provider: String,
options: Map[String, String]): ResolvedDataSource = {
val loader = Utils.getContextOrSparkClassLoader
val clazz: Class[_] = try loader.loadClass(provider) catch {
case cnf: java.lang.ClassNotFoundException =>
Expand All @@ -102,7 +101,27 @@ private[sql] case class CreateTableUsing(
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))

sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
new ResolvedDataSource(clazz, relation)
}
}

private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)

private[sql] case class CreateTableUsing(
tableName: String,
provider: String,
temporary: Boolean,
options: Map[String, String]) extends Command

private [sql] case class CreateTempTableUsing(
tableName: String,
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, provider, options)

sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
Seq.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}

def refreshTable(tableName: String): Unit = {
// TODO: Database support...
catalog.refreshTable("default", tableName)
}

protected[hive] def invalidateTable(tableName: String): Unit = {
// TODO: Database support...
catalog.invalidateTable("default", tableName)
}

/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
Expand Down Expand Up @@ -340,8 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
DataSourceStrategy,
CommandStrategy,
HiveCommandStrategy(self),
CommandStrategy,
TakeOrdered,
ParquetOperations,
InMemoryScans,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util.{List => JList}

import com.google.common.cache.{CacheLoader, CacheBuilder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource, BaseRelation}

import scala.util.parsing.combinator.RegexParsers

import org.apache.hadoop.util.ReflectionUtils

import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, SerDeInfo, FieldSchema}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
import org.apache.hadoop.hive.ql.plan.CreateTableDesc
import org.apache.hadoop.hive.serde.serdeConstants
Expand All @@ -55,8 +56,60 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)

// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class TableIdent(database: String, name: String) {
def toLowerCase = TableIdent(database.toLowerCase, name.toLowerCase)
}

/** A cache of Spark SQL data source tables that have been accessed. */
protected[hive] val cachedDataSourceTables = CacheBuilder.newBuilder()
.maximumSize(1000)
.build(
new CacheLoader[TableIdent, LogicalPlan]() {
override def load(in: TableIdent): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
val table = client.getTable(in.database, in.name)

// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val method = classOf[Table].getDeclaredMethod("getSerdeInfo")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be used to retrieve all SerDe properties:

table.getTTable.getSd.getSerdeInfo.getParameters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think SerDeInfo might not be a proper place to put external data source table options. Semantically, these options are more like general table properties, thus using table.putProperty might be better.

@yhuai Are there notable differences between SerDe properties and general table properties in Hive? Or to be more specifically, differences between properties saved in metastore.Table.getTTable.getParameters and those in `metastore.Table.getTTable.getSd.getSerdeInfo.getParameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning here was that table properties might have other things that could conflict with the options that a data source requires. It seems like SerDe properties are scoped to hold just the options that describe how the serialization library should read the data (which seems analogous to our data sources).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanations, then this sounds good to me.

method.setAccessible(true)
val serdeInfo = method.invoke(table).asInstanceOf[SerDeInfo]

val resolvedRelation =
ResolvedDataSource(
hive,
table.getProperty("spark.sql.sources.provider"),
serdeInfo.getParameters.toMap)

LogicalRelation(resolvedRelation.relation)
}
})

def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(TableIdent(databaseName, tableName).toLowerCase)
}

def invalidateTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.invalidate(TableIdent(databaseName, tableName).toLowerCase)
}

val caseSensitive: Boolean = false

def createDataSourceTable(tableName: String, provider: String, options: Map[String, String]) = {
val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
val tbl = new Table(dbName, tblName)

tbl.setProperty("spark.sql.sources.provider", provider)
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using serde properties to store all parameters that will be passed to a relation provider (for creating a relation), right? Probably we can add a comment at here.


// create the table
synchronized {
client.createTable(tbl, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark tables from external data sources as external table? We can do this by

tbl.putToParameters("EXTERNAL", "TRUE")
tbl.setTableType(TableType.EXTERNAL_TABLE.toString())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thats a good idea at least for the types of tables you can create now. We should think about options here and if we want to support non-external tables.

Also I assume you meant something like this?

    tbl.setProperty("EXTERNAL", "TRUE")
    tbl.setTableType(TableType.EXTERNAL_TABLE)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes. I was reading part of HCatalog code, which is actually manipulating metastore.api.Table, whose setTableType method accepts a string.

}
}

def tableExists(db: Option[String], tableName: String): Boolean = {
val (databaseName, tblName) = processDatabaseAndTableName(
db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
Expand All @@ -70,7 +123,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val (databaseName, tblName) =
processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
val table = client.getTable(databaseName, tblName)
if (table.isView) {

if (table.getProperty("spark.sql.sources.provider") != null) {
cachedDataSourceTables(TableIdent(databaseName, tblName).toLowerCase)
} else if (table.isView) {
// if the unresolved relation is from hive view
// parse the text into logic node.
HiveQl.createPlanForView(table, alias)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.CreateTableUsing
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -219,6 +220,10 @@ private[hive] trait HiveStrategies {
ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil
}

case CreateTableUsing(tableName, provider, false, options) =>
ExecutedCommand(
CreateMetastoreDataSource(tableName, provider, options)) :: Nil

case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {

clearCache()
loadedTables.clear()
catalog.cachedDataSourceTables.invalidateAll()
catalog.client.getAllTables("default").foreach { t =>
logDebug(s"Deleting table $t")
val table = catalog.client.getTable("default", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ case class DropTable(
override def run(sqlContext: SQLContext) = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
val ifExistsClause = if (ifExists) "IF EXISTS " else ""
try hiveContext.tryUncacheQuery(hiveContext.table(tableName)) catch {
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
hiveContext.catalog.unregisterTable(None, tableName)
Seq.empty[Row]
Expand Down Expand Up @@ -85,3 +89,16 @@ case class AddFile(path: String) extends RunnableCommand {
Seq.empty[Row]
}
}

case class CreateMetastoreDataSource(
tableName: String,
provider: String,
options: Map[String, String]) extends RunnableCommand {

override def run(sqlContext: SQLContext) = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
hiveContext.catalog.createDataSourceTable(tableName, provider, options)

Seq.empty[Row]
}
}
Loading