Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, Expression, IsNotNull, IsNull, NamedExpression}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long.

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which can be wildcard when there are more than 6 entities per https://github.com/databricks/scala-style-guide#imports

import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -110,7 +110,12 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
val projectionRootFields = projects.flatMap(getRootFields)
val filterRootFields = filters.flatMap(getRootFields)

(projectionRootFields ++ filterRootFields).distinct
val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
.distinct.partition(_.contentAccessed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments here please.


optRootFields.filter { opt =>
!rootFields.exists(_.field.name == opt.field.name)
} ++ rootFields
}

/**
Expand Down Expand Up @@ -156,7 +161,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
// in the resulting schema may differ from their ordering in the logical relation's
// original schema
val mergedSchema = requestedRootFields
.map { case RootField(field, _) => StructType(Array(field)) }
.map { case RootField(field, _, _) => StructType(Array(field)) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but .map { root: RootField => StructType(Array(root.field)) } per https://github.com/databricks/scala-style-guide#pattern-matching

.reduceLeft(_ merge _)
val dataSchemaFieldNames = fileDataSchema.fieldNames.toSet
val mergedDataSchema =
Expand Down Expand Up @@ -196,6 +201,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
*/
private def getRootFields(expr: Expression): Seq[RootField] = {
expr match {
// Those expressions don't really use the nested fields of a root field.
case i@(IsNotNull(_: Attribute) | IsNull(_: Attribute)) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -> i @ (IsNotNull(_: ...

getRootFields(i.children(0)).map(_.copy(contentAccessed = false))
case att: Attribute =>
RootField(StructField(att.name, att.dataType, att.nullable), derivedFromAtt = true) :: Nil
case SelectedField(field) => RootField(field, derivedFromAtt = false) :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

      case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
        expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
      case _ =>
        expr.children.flatMap(getRootFields)

Expand Down Expand Up @@ -250,8 +258,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
}

/**
* A "root" schema field (aka top-level, no-parent) and whether it was derived from
* an attribute or had a proper child.
* A "root" schema field (aka top-level, no-parent), whether it was derived from
* an attribute or had a proper child, and whether it was accessed with its content.
*/
private case class RootField(field: StructField, derivedFromAtt: Boolean)
private case class RootField(field: StructField, derivedFromAtt: Boolean,
contentAccessed: Boolean = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting and please elaborate the comment

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,26 @@ class ParquetSchemaPruningSuite
with SchemaPruningTest
with SharedSQLContext {
case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
id: Int,
name: FullName,
address: String,
pets: Int,
friends: Array[FullName] = Array.empty,
relatives: Map[String, FullName] = Map.empty)
relatives: Map[String, FullName] = Map.empty,
employer: Employer = null)

val janeDoe = FullName("Jane", "X.", "Doe")
val johnDoe = FullName("John", "Y.", "Doe")
val susanSmith = FullName("Susan", "Z.", "Smith")

val company = Employer(0, Company("abc", "123 Business Street"))

private val contacts =
Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
relatives = Map("brother" -> johnDoe)) ::
relatives = Map("brother" -> johnDoe), employer = company) ::
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil

case class Name(first: String, last: String)
Expand All @@ -66,13 +71,14 @@ class ParquetSchemaPruningSuite
pets: Int,
friends: Array[FullName] = Array(),
relatives: Map[String, FullName] = Map(),
employer: Employer = null,
p: Int)

case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int)

private val contactsWithDataPartitionColumn =
contacts.map { case Contact(id, name, address, pets, friends, relatives) =>
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) }
contacts.map { case Contact(id, name, address, pets, friends, relatives, employer) =>
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, employer, 1) }
private val briefContactsWithDataPartitionColumn =
briefContacts.map { case BriefContact(id, name, address) =>
BriefContactWithDataPartitionColumn(id, name, address, 2) }
Expand Down Expand Up @@ -155,6 +161,47 @@ class ParquetSchemaPruningSuite
Row(null) :: Row(null) :: Nil)
}

testSchemaPruning("select a single complex field and in where clause") {
val query1 = sql("select name.first from contacts where name.first = 'Jane'")
checkScan(query1, "struct<name:struct<first:string>>")
checkAnswer(query1, Row("Jane") :: Nil)

val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'")
checkScan(query2, "struct<name:struct<first:string,last:string>>")
checkAnswer(query2, Row("Jane", "Doe") :: Nil)

val query3 = sql("select name.first from contacts " +
"where employer.company.name = 'abc' and p = 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say a user adds where employer.company is not null, can we still read schema with employer:struct<company:struct<name:string>>> as we only mark contentAccessed = false when IsNotNull is on an attribute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one query test for this case. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is a nested field access in the query like employer.company.name, then we don't need other fields inside employ.company other than name.

But if there is no such access but just employer.company is not null in where clause, it will read full schema of employ.company.

checkScan(query3, "struct<name:struct<first:string>," +
"employer:struct<company:struct<name:string>>>")
checkAnswer(query3, Row("Jane") :: Nil)
}

testSchemaPruning("select a single complex field and is null expression in project") {
val query = sql("select name.first, address is not null from contacts")
checkScan(query, "struct<name:struct<first:string>,address:string>")
checkAnswer(query.orderBy("id"),
Row("Jane", true) :: Row("John", true) :: Row("Janet", true) :: Row("Jim", true) :: Nil)
}

testSchemaPruning("select a single complex field array and in clause") {
val query = sql("select friends.middle from contacts where friends.first[0] = 'Susan'")
checkScan(query,
"struct<friends:array<struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row(Array("Z.")) :: Nil)
}

testSchemaPruning("select a single complex field from a map entry and in clause") {
val query =
sql("select relatives[\"brother\"].middle from contacts " +
"where relatives[\"brother\"].first = 'John'")
checkScan(query,
"struct<relatives:map<string,struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row("Y.") :: Nil)
}

private def testSchemaPruning(testName: String)(testThunk: => Unit) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
test(s"Spark vectorized reader - without partition data column - $testName") {
Expand Down