Skip to content
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,37 @@
package org.apache.spark.sql.json

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.sources._

private[sql] class DefaultSource extends RelationProvider {
private[sql] class DefaultSource extends SchemaRelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
parameters: Map[String, String],
schema: Option[StructType]): BaseRelation = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot change the function signature, otherwise we will break existing libraries. Instead I think we need to create a new interface SchemaRelationProvider maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or using a default value for schema: schema: Option[StructType] = None ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default values do not preserve binary compatibility, only source compatibility.

val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(fileName, samplingRatio)(sqlContext)
JSONRelation(fileName, samplingRatio, schema)(sqlContext)
}
}

private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
private[sql] case class JSONRelation(
fileName: String,
samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
extends TableScan {

private def baseRDD = sqlContext.sparkContext.textFile(fileName)

override val schema =
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)
override val schema = userSpecifiedSchema.getOrElse(
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)))

override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,37 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate

import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.util.ContextUtil

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{Partition => SparkPartition, Logging}
import org.apache.spark.rdd.{NewHadoopPartition, RDD}

import org.apache.spark.sql.{SQLConf, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{SQLConf, SQLContext}

import scala.collection.JavaConversions._


/**
* Allows creation of parquet based tables using the syntax
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
class DefaultSource extends RelationProvider {
class DefaultSource extends SchemaRelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
parameters: Map[String, String],
schema: Option[StructType]): BaseRelation = {
val path =
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))

ParquetRelation2(path)(sqlContext)
ParquetRelation2(path, schema)(sqlContext)
}
}

Expand Down Expand Up @@ -82,7 +82,9 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
* discovery.
*/
@DeveloperApi
case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
case class ParquetRelation2(
path: String,
userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext)
extends CatalystScan with Logging {

def sparkContext = sqlContext.sparkContext
Expand Down Expand Up @@ -133,12 +135,13 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)

override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum

val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
ParquetTypesConverter.readSchemaFromFile(
partitions.head.files.head.getPath,
Some(sparkContext.hadoopConfiguration),
sqlContext.isParquetBinaryAsString))

val dataSchema = userSpecifiedSchema.getOrElse(
StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
ParquetTypesConverter.readSchemaFromFile(
partitions.head.files.head.getPath,
Some(sparkContext.hadoopConfiguration),
sqlContext.isParquetBinaryAsString))
)
val dataIncludesKey =
partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)

Expand Down
125 changes: 112 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.sql.sources

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.util.Utils

import scala.language.implicitConversions
import scala.util.parsing.combinator.lexical.StdLexical
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.combinator.PackratParsers

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical

Expand All @@ -44,6 +43,14 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
}
}

def parseType(input: String): DataType = {
phrase(dataType)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x =>
sys.error(s"Unsupported dataType: $x")
}
}

protected case class Keyword(str: String)

protected implicit def asParser(k: Keyword): Parser[String] =
Expand All @@ -55,6 +62,24 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")

// Data types.
protected val STRING = Keyword("STRING")
protected val BINARY = Keyword("BINARY")
protected val BOOLEAN = Keyword("BOOLEAN")
protected val TINYINT = Keyword("TINYINT")
protected val SMALLINT = Keyword("SMALLINT")
protected val INT = Keyword("INT")
protected val BIGINT = Keyword("BIGINT")
protected val FLOAT = Keyword("FLOAT")
protected val DOUBLE = Keyword("DOUBLE")
protected val DECIMAL = Keyword("DECIMAL")
protected val DATE = Keyword("DATE")
protected val TIMESTAMP = Keyword("TIMESTAMP")
protected val VARCHAR = Keyword("VARCHAR")
protected val ARRAY = Keyword("ARRAY")
protected val MAP = Keyword("MAP")
protected val STRUCT = Keyword("STRUCT")

// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
this.getClass
Expand All @@ -67,26 +92,92 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable

/**
* CREATE TEMPORARY TABLE avroTable
* `CREATE TEMPORARY TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
*/
protected lazy val createTable: Parser[LogicalPlan] =
CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ provider ~ opts =>
CreateTableUsing(tableName, provider, opts)
(
CREATE ~ TEMPORARY ~ TABLE ~> ident
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
}
)

protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"

protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }

protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}

protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) }

protected lazy val column: Parser[StructField] =
ident ~ dataType ^^ { case columnName ~ typ =>
StructField(columnName, typ)
}

protected lazy val primitiveType: Parser[DataType] =
STRING ^^^ StringType |
BINARY ^^^ BinaryType |
BOOLEAN ^^^ BooleanType |
TINYINT ^^^ ByteType |
SMALLINT ^^^ ShortType |
INT ^^^ IntegerType |
BIGINT ^^^ LongType |
FLOAT ^^^ FloatType |
DOUBLE ^^^ DoubleType |
fixedDecimalType | // decimal with precision/scale
DECIMAL ^^^ DecimalType.Unlimited | // decimal with no precision/scale
DATE ^^^ DateType |
TIMESTAMP ^^^ TimestampType |
VARCHAR ~ "(" ~ numericLit ~ ")" ^^^ StringType

protected lazy val fixedDecimalType: Parser[DataType] =
(DECIMAL ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ {
case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
}

protected lazy val arrayType: Parser[DataType] =
ARRAY ~> "<" ~> dataType <~ ">" ^^ {
case tpe => ArrayType(tpe)
}

protected lazy val mapType: Parser[DataType] =
MAP ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
case t1 ~ _ ~ t2 => MapType(t1, t2)
}

protected lazy val structField: Parser[StructField] =
ident ~ ":" ~ dataType ^^ {
case fieldName ~ _ ~ tpe => StructField(fieldName, tpe, nullable = true)
}

protected lazy val structType: Parser[DataType] =
(STRUCT ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
case fields => new StructType(fields)
}) |
(STRUCT ~> "<>" ^^ {
case fields => new StructType(Nil)
})

private[sql] lazy val dataType: Parser[DataType] =
arrayType |
mapType |
structType |
primitiveType
}

private[sql] case class CreateTableUsing(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to move this class CreateTableUsing to org.apache.spark.sql.sources.commands.scala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will move it

tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

Expand All @@ -99,8 +190,16 @@ private[sql] case class CreateTableUsing(
sys.error(s"Failed to load class for data source: $provider")
}
}
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
val relation = clazz.newInstance match {
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), userSpecifiedSchema)
}

sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources

import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType}
import org.apache.spark.sql.{Row, SQLContext, StructType}
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}

/**
Expand All @@ -44,6 +44,33 @@ trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with
* 1. USING clause: to specify the implemented SchemaRelationProvider
* 2. User defined schema: users can define schema optionally when create table
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
*/
@DeveloperApi
trait SchemaRelationProvider {
/**
* Returns a new base relation with the given parameters and user defined schema.
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
*/
def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: Option[StructType]): BaseRelation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an option? we have two traits and option is not very friendly to java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial idea is to compatible with the old traits, since we will have two traits i will fix this.

}

/**
* ::DeveloperApi::
* Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
Expand Down
Loading