Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ private[sql] object OrcFilters {
}
}

// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
private def quoteAttributeNameIfNeeded(name: String) : String = {
if (!name.contains("`") && name.contains(".")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this condition take the backtick in column name into account? For instance,

>>> spark.range(1).toDF("abc`.abc").show()
+--------+
|abc`.abc|
+--------+
|       0|
+--------+

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review. I'll consider that, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon . Actually, Spark 2.3.2 ORC (native/hive) doesn't support a backtick character in column names. It fails on writing operation. And, although Spark 2.4.0 broadens the supported special characters like . and " in column names, the backtick character is not handled yet.

So, for that one, I'll proceed in another PR since it's an improvement instead of a regression.

Also, cc @gatorsmile and @dbtsai .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ORC and AVRO improvement, SPARK-25722 is created.

s"`$name`"
} else {
name
}
}

/**
* Create ORC filter as a SearchArgument instance.
*/
Expand Down Expand Up @@ -178,38 +188,47 @@ private[sql] object OrcFilters {
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).

case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().equals(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end())

case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().nullSafeEquals(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end())

case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThan(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end())

case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThanEquals(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end())

case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThanEquals(attribute, getType(attribute), castedValue).end())
Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end())

case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThan(attribute, getType(attribute), castedValue).end())
Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end())

case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().isNull(attribute, getType(attribute)).end())
val quotedName = quoteAttributeNameIfNeeded(attribute)
Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())

case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startNot().isNull(attribute, getType(attribute)).end())
val quotedName = quoteAttributeNameIfNeeded(attribute)
Some(builder.startNot().isNull(quotedName, getType(attribute)).end())

case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
Some(builder.startAnd().in(attribute, getType(attribute),
Some(builder.startAnd().in(quotedName, getType(attribute),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())

case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.execution.datasources.orc

import java.io.File
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand Down Expand Up @@ -383,4 +384,15 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
)).get.toString
}
}

test("SPARK-25579 ORC PPD should support column names with dot") {
import testImplicits._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test at OrcFilterSuite too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, this is OrcFilterSuite.

Can we add a test at OrcFilterSuite too?

For HiveOrcFilterSuite, hive ORC implementation doesn't support dot.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. One end to end test should be enough


withTempDir { dir =>
val path = new File(dir, "orc").getCanonicalPath
Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path)
val df = spark.read.orc(path).where("`col.dot.1` = 1 and `col.dot.2` = 2")
checkAnswer(stripSparkFilter(df), Row(1, 2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, technically shouldn't we test if the stripes are filtered? I added some tests a long ago (stripSparkFilter is added by me FWIW as well):

test("Support for pushing down filters for decimal types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
spark.createDataFrame(data).toDF("a").repartition(10)
.write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
val actual = stripSparkFilter(df).count()
assert(actual < 10)
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like that test, this test also generates two ORC files with one row and test if PPD works.

}
}
}