Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,110 @@ def test_orc_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_li

assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.orc([first_data_path, second_data_path, third_data_path]),
conf=all_confs)
conf=all_confs)

# Spark(330,_) allows aggregate pushdown on ORC by enabling spark.sql.orc.aggregatePushdown.
# Note that Min/Max don't push down partition column. Only Count does.
# The following tests that GPU falls back to CPU when aggregates are pushed down on ORC.
#
# When the spark configuration is enabled we check the following:
# ----------------------------------------------+
# | Aggregate | Partition Column | FallBack CPU |
# +-----------+------------------+--------------+
# | COUNT | Y | Y |
# | MIN | Y | N |
# | MAX | Y | N |
# | COUNT | N | Y |
# | MIN | N | Y |
# | MAX | N | Y |

_aggregate_orc_list_col_partition = ['COUNT']
_aggregate_orc_list_no_col_partition = ['MAX', 'MIN']
_aggregate_orc_list = _aggregate_orc_list_col_partition + _aggregate_orc_list_no_col_partition
_orc_aggregate_pushdown_enabled_conf = {'spark.sql.orc.aggregatePushdown': 'true',
"spark.sql.sources.useV1SourceList": ""}

def _do_orc_scan_with_agg(spark, path, agg):
return spark.read.orc(path).selectExpr('{}(p)'.format(agg))

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
@pytest.mark.parametrize('aggregate', _aggregate_orc_list)
@allow_non_gpu(any = True)
def test_orc_scan_with_aggregate_pushdown(spark_tmp_path, aggregate):
"""
Spark(330,_) allows aggregate pushdown on ORC by enabling spark.sql.orc.aggregatePushdown.
When the spark configuration is enabled we check the following:
---------------------------+
| Aggregate | FallBack CPU |
+-----------+--------------+
| COUNT | Y |
| MIN | Y |
| MAX | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_00.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.orc(data_path))

# fallback to CPU
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
exist_classes="BatchScanExec",
non_exist_classes="GpuBatchScanExec",
conf=_orc_aggregate_pushdown_enabled_conf)

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
@pytest.mark.parametrize('aggregate', _aggregate_orc_list_col_partition)
@allow_non_gpu(any = True)
def test_orc_scan_with_aggregate_pushdown_on_col_partition(spark_tmp_path, aggregate):
"""
Spark(330,_) allows aggregate pushdown on ORC by enabling spark.sql.orc.aggregatePushdown.
Note that Min/Max don't push down partition column. Only Count does.
This test checks that GPU falls back to CPU when aggregates are pushed down on ORC.
When the spark configuration is enabled we check the following:
----------------------------------------------+
| Aggregate | Partition Column | FallBack CPU |
+-----------+------------------+--------------+
| COUNT | Y | Y |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_01.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))

# fallback to CPU only if aggregate is COUNT
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
exist_classes="BatchScanExec",
non_exist_classes="GpuBatchScanExec",
conf=_orc_aggregate_pushdown_enabled_conf)

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
@pytest.mark.parametrize('aggregate', _aggregate_orc_list_no_col_partition)
def test_orc_scan_with_aggregate_no_pushdown_on_col_partition(spark_tmp_path, aggregate):
"""
Spark(330,_) allows aggregate pushdown on ORC by enabling spark.sql.orc.aggregatePushdown.
Note that Min/Max don't push down partition column.
When the spark configuration is enabled we check the following:
----------------------------------------------+
| Aggregate | Partition Column | FallBack CPU |
+-----------+------------------+--------------+
| MIN | Y | N |
| MAX | Y | N |
"""
data_path = spark_tmp_path + '/ORC_DATA/pushdown_02.orc'
# GPU ORC write with statistics is not correctly working.
# Create ORC file in CPU session as a workaround
# Partition column P
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.orc(data_path))

# should not fallback to CPU
assert_gpu_and_cpu_are_equal_collect(
lambda spark: _do_orc_scan_with_agg(spark, data_path, aggregate),
conf=_orc_aggregate_pushdown_enabled_conf)
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -694,6 +695,7 @@
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
Expand Down Expand Up @@ -758,6 +760,7 @@
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,42 +388,10 @@ abstract class Spark30XdbShims extends Spark30XdbShimsBase with Logging {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuOrcScanBase, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan

class RapidsOrcScanMeta(
oScan: OrcScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[OrcScan](oScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuOrcScanBase.tagSupport(this)
}

override def convertToGpu(): Scan =
GpuOrcScan(oScan.sparkSession,
oScan.hadoopConf,
oScan.fileIndex,
oScan.dataSchema,
oScan.readDataSchema,
oScan.readPartitionSchema,
oScan.options,
oScan.pushedFilters,
oScan.partitionFilters,
oScan.dataFilters,
conf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuParquetScanBase, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan

class RapidsParquetScanMeta(
pScan: ParquetScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[ParquetScan](pScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuParquetScanBase.tagSupport(this)
}

override def convertToGpu(): Scan = {
GpuParquetScan(pScan.sparkSession,
pScan.hadoopConf,
pScan.fileIndex,
pScan.dataSchema,
pScan.readDataSchema,
pScan.readPartitionSchema,
pScan.pushedFilters,
pScan.options,
pScan.partitionFilters,
pScan.dataFilters,
conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,42 +161,10 @@ trait Spark301until320Shims extends SparkShims {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,42 +507,10 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Loading