Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,19 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>v${hive.version.short}/src/main/scala</source>
<source>v${hive.version.short}/src/main/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
Expand All @@ -197,6 +210,7 @@
</goals>
<configuration>
<sources>
<source>v${hive.version.short}/src/test/scala</source>
<source>src/test/gen-java</source>
</sources>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.orc

import org.apache.hadoop.io._
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
Expand Down Expand Up @@ -109,14 +108,13 @@ class OrcDeserializer(
updater.set(ordinal, bytes)

case DateType => (ordinal, value) =>
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(value.asInstanceOf[DateWritable].get))
updater.setInt(ordinal, DateTimeUtils.fromJavaDate(OrcSerializeUtils.getSqlDate(value)))

case TimestampType => (ordinal, value) =>
updater.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(value.asInstanceOf[OrcTimestamp]))

case DecimalType.Fixed(precision, scale) => (ordinal, value) =>
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
val v = Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
val v = OrcSerializeUtils.getDecimal(value)
v.changePrecision(precision, scale)
updater.set(ordinal, v)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.execution.datasources.orc
import org.apache.hadoop.io._
import org.apache.orc.TypeDescription
import org.apache.orc.mapred.{OrcList, OrcMap, OrcStruct, OrcTimestamp}
import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
Expand Down Expand Up @@ -139,14 +137,7 @@ class OrcSerializer(dataSchema: StructType) {
new BytesWritable(getter.getBinary(ordinal))

case DateType =>
if (reuseObj) {
val result = new DateWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter, ordinal) => new DateWritable(getter.getInt(ordinal))
}
OrcSerializeUtils.getDateWritable(reuseObj)

// The following cases are already expensive, reusing object or not doesn't matter.

Expand All @@ -156,9 +147,8 @@ class OrcSerializer(dataSchema: StructType) {
result.setNanos(ts.getNanos)
result

case DecimalType.Fixed(precision, scale) => (getter, ordinal) =>
val d = getter.getDecimal(ordinal, precision, scale)
new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
case DecimalType.Fixed(precision, scale) =>
OrcSerializeUtils.getHiveDecimalWritable(precision, scale)

case st: StructType => (getter, ordinal) =>
val result = createOrcValue(st).asInstanceOf[OrcStruct]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import scala.reflect.runtime.universe.TypeTag
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileBasedDataSourceTest}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION

Expand Down Expand Up @@ -104,4 +108,32 @@ abstract class OrcTest extends QueryTest with FileBasedDataSourceTest with Befor
assert(actual < numRows)
}
}

protected def checkNoFilterPredicate
(predicate: Predicate, noneSupported: Boolean = false)
(implicit df: DataFrame): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))

query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
if (noneSupported) {
assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
} else {
assert(pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
}

case _ =>
throw new AnalysisException("Can not match OrcTable in the query.")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.orc

import java.sql.Date

import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}

import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types.Decimal

/**
* Helper functions for Orc serialize and deserialize.
*/
private[spark] object OrcSerializeUtils {

def getSqlDate(value: Any): Date = value.asInstanceOf[DateWritable].get

def getDecimal(value: Any): Decimal = {
val decimal = value.asInstanceOf[HiveDecimalWritable].getHiveDecimal()
Decimal(decimal.bigDecimalValue, decimal.precision(), decimal.scale())
}

def getDateWritable(reuseObj: Boolean): (SpecializedGetters, Int) => DateWritable = {
if (reuseObj) {
val result = new DateWritable()
(getter, ordinal) =>
result.set(getter.getInt(ordinal))
result
} else {
(getter: SpecializedGetters, ordinal: Int) =>
new DateWritable(getter.getInt(ordinal))
}
}

def getHiveDecimalWritable(precision: Int, scale: Int):
(SpecializedGetters, Int) => HiveDecimalWritable = {
(getter, ordinal) =>
val d = getter.getDecimal(ordinal, precision, scale)
new HiveDecimalWritable(HiveDecimal.create(d.toJavaBigDecimal))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,6 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
checkFilterPredicate(df, predicate, checkLogicalOperator)
}

protected def checkNoFilterPredicate
(predicate: Predicate, noneSupported: Boolean = false)
(implicit df: DataFrame): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))

query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters,
DataSourceV2Relation(orcTable: OrcTable, _, options)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
val scanBuilder = orcTable.newScanBuilder(options)
scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray)
val pushedFilters = scanBuilder.pushedFilters()
if (noneSupported) {
assert(pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
} else {
assert(pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters.createFilter(query.schema, pushedFilters)
assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for $pushedFilters")
}

case _ =>
throw new AnalysisException("Can not match OrcTable in the query.")
}
}

test("filter pushdown - integer") {
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
Expand Down
Loading