Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c194d5e
add metadata field to StructField and Attribute
mengxr Sep 17, 2014
367d237
add test
mengxr Oct 7, 2014
d65072e
remove Map.empty
mengxr Oct 7, 2014
67fdebb
add test on join
mengxr Oct 7, 2014
d8af0ed
move tests to SQLQuerySuite
mengxr Oct 8, 2014
c41a664
merge master
mengxr Oct 8, 2014
7e5a322
do not output metadata in StructField.toString
mengxr Oct 8, 2014
61b8e0f
merge master
mengxr Oct 9, 2014
618e349
make tests work in scala
mengxr Oct 9, 2014
905bb89
java conversions
mengxr Oct 9, 2014
93518fb
support metadata in python
mengxr Oct 9, 2014
e42c452
merge master
mengxr Oct 13, 2014
60614c7
add metadata
mengxr Oct 14, 2014
60cc131
add doc and header
mengxr Oct 14, 2014
1fcbf13
change metadata type in StructField for Scala/Java
mengxr Oct 14, 2014
c9d7301
organize imports
mengxr Oct 14, 2014
473a7c5
merge master
mengxr Oct 14, 2014
24a9f80
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 15, 2014
3f49aab
remove StructField.toString
mengxr Oct 15, 2014
4266f4d
add StructField.toString back for backward compatibility
mengxr Oct 15, 2014
a438440
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 16, 2014
ddfcfad
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 21, 2014
611d3c2
move metadata from Expr to NamedExpr
mengxr Oct 21, 2014
1e2abcf
change default value of metadata to None in python
mengxr Oct 23, 2014
589f314
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 23, 2014
886b85c
Expose Metadata and MetadataBuilder through the public scala and java…
marmbrus Oct 29, 2014
c35203f
Merge pull request #1 from marmbrus/pr/2701
mengxr Oct 30, 2014
5ef930a
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Nov 1, 2014
dedda56
merge remote
mengxr Nov 1, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object ScalaReflection {
/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case Schema(s: StructType, _) =>
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
}

/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ abstract class Expression extends TreeNode[Expression] {
def nullable: Boolean
def references: AttributeSet = AttributeSet(children.flatMap(_.references.iterator))

/** Returns the metadata when an expression is a reference to another expression with metadata. */
def metadata: Map[String, Any] = Map.empty

/** Returns the result of evaluating this expression on a given input Row */
def eval(input: Row = null): EvaluatedType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class Generator extends Expression {
override type EvaluatedType = TraversableOnce[Row]

override lazy val dataType =
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable))))
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))))

override def nullable = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ case class Alias(child: Expression, name: String)

override def dataType = child.dataType
override def nullable = child.nullable
override def metadata: Map[String, Any] = child.metadata

override def toAttribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
AttributeReference(name, child.dataType, child.nullable, child.metadata)(exprId, qualifiers)
} else {
UnresolvedAttribute(name)
}
Expand All @@ -106,15 +107,20 @@ case class Alias(child: Expression, name: String)
* @param name The name of this attribute, should only be used during analysis or for debugging.
* @param dataType The [[DataType]] of this attribute.
* @param nullable True if null is a valid value for this attribute.
* @param metadata The metadata of this attribute.
* @param exprId A globally unique id used to check if different AttributeReferences refer to the
* same attribute.
* @param qualifiers a list of strings that can be used to referred to this attribute in a fully
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
*/
case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true)
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends Attribute with trees.LeafNode[Expression] {
case class AttributeReference(
name: String,
dataType: DataType,
nullable: Boolean = true,
override val metadata: Map[String, Any] = Map.empty)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {

override def references = AttributeSet(this :: Nil)

Expand All @@ -131,7 +137,8 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
h
}

override def newInstance() = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
override def newInstance() =
AttributeReference(name, dataType, nullable, metadata)(qualifiers = qualifiers)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -140,7 +147,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (nullable == newNullability) {
this
} else {
AttributeReference(name, dataType, newNullability)(exprId, qualifiers)
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers)
}
}

Expand All @@ -159,7 +166,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (newQualifiers == qualifiers) {
this
} else {
AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ object DataType extends RegexParsers {
protected lazy val structField: Parser[StructField] =
("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
case name ~ tpe ~ nullable =>
StructField(name, tpe, nullable = nullable)
// TODO: parse metadata
StructField(name, tpe, nullable = nullable, Map.empty)
}

protected lazy val boolVal: Parser[Boolean] =
Expand Down Expand Up @@ -321,8 +322,15 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
* @param name The name of this field.
* @param dataType The data type of this field.
* @param nullable Indicates if values of this field can be `null` values.
* @param metadata The metadata of this field, which is a map from string to simple type that can be
* serialized to JSON automatically. The metadata should be preserved during
* transformation if the content of the column is not modified, e.g, in selection.
*/
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean,
metadata: Map[String, Any] = Map.empty) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n")
Expand All @@ -332,7 +340,7 @@ case class StructField(name: String, dataType: DataType, nullable: Boolean) {

object StructType {
protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}

case class StructType(fields: Seq[StructField]) extends DataType {
Expand All @@ -348,8 +356,8 @@ case class StructType(fields: Seq[StructField]) extends DataType {
* have a name matching the given name, `null` will be returned.
*/
def apply(name: String): StructField = {
nameToField.get(name).getOrElse(
throw new IllegalArgumentException(s"Field ${name} does not exist."))
nameToField.getOrElse(name,
throw new IllegalArgumentException(s"Field $name does not exist."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need to wrap

}

/**
Expand All @@ -367,7 +375,7 @@ case class StructType(fields: Seq[StructField]) extends DataType {
}

protected[sql] def toAttributes =
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())

def treeString: String = {
val builder = new StringBuilder
Expand Down
13 changes: 4 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,15 @@ private[sql] object JsonRDD extends Logging {
}
}.flatMap(field => field).toSeq

StructType(
(topLevelFields ++ structFields).sortBy {
case StructField(name, _, _) => name
})
StructType((topLevelFields ++ structFields).sortBy(_.name))
}

makeStruct(resolved.keySet.toSeq, Nil)
}

private[sql] def nullTypeToStringType(struct: StructType): StructType = {
val fields = struct.fields.map {
case StructField(fieldName, dataType, nullable) => {
case StructField(fieldName, dataType, nullable, _) => {
val newType = dataType match {
case NullType => StringType
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
Expand Down Expand Up @@ -158,9 +155,7 @@ private[sql] object JsonRDD extends Logging {
StructField(name, dataType, true)
}
}
StructType(newFields.toSeq.sortBy {
case StructField(name, _, _) => name
})
StructType(newFields.toSeq.sortBy(_.name))
}
case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
Expand Down Expand Up @@ -385,7 +380,7 @@ private[sql] object JsonRDD extends Logging {
// TODO: Reuse the row instead of creating a new one for every record.
val row = new GenericMutableRow(schema.fields.length)
schema.fields.zipWithIndex.foreach {
case (StructField(name, dataType, _), i) =>
case (StructField(name, dataType, _, _), i) =>
row.update(i, json.get(name).flatMap(v => Option(v)).map(
enforceCorrectType(_, dataType)).orNull)
}
Expand Down
24 changes: 23 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -680,9 +680,31 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
sql("SELECT CAST(TRUE AS STRING), CAST(FALSE AS STRING) FROM testData LIMIT 1"),
("true", "false") :: Nil)
}

test("metadata is propagated correctly") {
val person = sql("SELECT * FROM person")
val schema = person.schema
val docKey = "doc"
val docValue = "first name"
val schemaWithMeta = new StructType(Seq(
schema("id"), schema("name").copy(metadata = Map(docKey -> docValue)), schema("age")))
val personWithMeta = applySchema(person, schemaWithMeta)
def validateMetadata(rdd: SchemaRDD): Unit = {
assert(rdd.schema("name").metadata(docKey) === docValue)
}
personWithMeta.registerTempTable("personWithMeta")
validateMetadata(personWithMeta.select('name))
validateMetadata(personWithMeta.select("name".attr))
validateMetadata(personWithMeta.select('id, 'name))
validateMetadata(sql("SELECT * FROM personWithMeta"))
validateMetadata(sql("SELECT id, name FROM personWithMeta"))
validateMetadata(sql("SELECT * FROM personWithMeta JOIN salary ON id = personId"))
validateMetadata(sql("SELECT name, salary FROM personWithMeta JOIN salary ON id = personId"))
}

test("SPARK-3371 Renaming a function expression with group by gives error") {
registerFunction("len", (s: String) => s.length)
checkAnswer(
sql("SELECT len(value) as temp FROM testData WHERE key = 1 group by len(value)"), 1)}
sql("SELECT len(value) as temp FROM testData WHERE key = 1 group by len(value)"), 1)
}
}
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,15 @@ object TestData {
// An RDD with 4 elements and 8 partitions
val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8)
withEmptyParts.registerTempTable("withEmptyParts")

case class Person(id: Int, name: String, age: Int)
case class Salary(personId: Int, salary: Double)
val person = TestSQLContext.sparkContext.parallelize(
Person(0, "mike", 30) ::
Person(1, "jim", 20) :: Nil)
person.registerTempTable("person")
val salary = TestSQLContext.sparkContext.parallelize(
Salary(0, 2000.0) ::
Salary(1, 1000.0) :: Nil)
salary.registerTempTable("salary")
}