Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,55 @@ def test_orc_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_li

assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.orc([first_data_path, second_data_path, third_data_path]),
conf=all_confs)
conf=all_confs)

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
@pytest.mark.parametrize('pushdown_enabled', ['true', 'false'])
@pytest.mark.parametrize('col_partition', ['true', 'false'])
@pytest.mark.parametrize('aggregate', ['Count', 'Max', 'Min'])
def test_orc_scan_with_aggregate_pushdown(spark_tmp_path, pushdown_enabled,
col_partition, aggregate):
"""
Spark(330,_) allows aggregate pushdown on ORC by enabling spark.sql.orc.aggregatePushdown.
Note that Min/Max don't push down partition column. Only Count does.
This test checks that GPU falls back to CPU when aggregates are pushed down on ORC.
When the spark configuration is enabled we check the following:
----------------------------------------------+
| Aggregate | Partition Column | FallBack CPU |
+-----------+------------------+--------------+
| COUNT | Y | Y |
| MIN | Y | N |
| MAX | Y | N |
| COUNT | N | Y |
| MIN | N | Y |
| MAX | N | Y |
"""
def do_orc_scan(spark, path, agg):
df = spark.read.orc(path).selectExpr('{}(p)'.format(agg))
return df

data_path = spark_tmp_path + '/pushdown.orc'
orc_aggregate_push_down_conf = {"spark.sql.orc.aggregatePushdown": pushdown_enabled,
"spark.sql.sources.useV1SourceList": ""}
should_fallback = pushdown_enabled == 'true' and (aggregate == 'Count' or col_partition == 'false')

if col_partition == 'true':
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.partitionBy("p")
.mode("overwrite")
.orc(data_path))
else:
with_cpu_session(lambda spark: spark.range(10).selectExpr("id", "id % 3 as p").write
.mode("overwrite")
.orc(data_path))

if should_fallback:
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: do_orc_scan(spark, data_path, aggregate),
exist_classes="BatchScanExec",
non_exist_classes="GpuBatchScanExec",
conf=orc_aggregate_push_down_conf)
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark: do_orc_scan(spark, data_path, aggregate),
conf=orc_aggregate_push_down_conf)
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -694,6 +695,7 @@
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
Expand Down Expand Up @@ -758,6 +760,7 @@
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,42 +388,10 @@ abstract class Spark30XdbShims extends Spark30XdbShimsBase with Logging {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuOrcScanBase, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan

class RapidsOrcScanMeta(
oScan: OrcScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[OrcScan](oScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuOrcScanBase.tagSupport(this)
}

override def convertToGpu(): Scan =
GpuOrcScan(oScan.sparkSession,
oScan.hadoopConf,
oScan.fileIndex,
oScan.dataSchema,
oScan.readDataSchema,
oScan.readPartitionSchema,
oScan.options,
oScan.pushedFilters,
oScan.partitionFilters,
oScan.dataFilters,
conf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuParquetScanBase, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan

class RapidsParquetScanMeta(
pScan: ParquetScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[ParquetScan](pScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuParquetScanBase.tagSupport(this)
}

override def convertToGpu(): Scan = {
GpuParquetScan(pScan.sparkSession,
pScan.hadoopConf,
pScan.fileIndex,
pScan.dataSchema,
pScan.readDataSchema,
pScan.readPartitionSchema,
pScan.pushedFilters,
pScan.options,
pScan.partitionFilters,
pScan.dataFilters,
conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,42 +161,10 @@ trait Spark301until320Shims extends SparkShims {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,42 +507,10 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuCSVScan, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan

class RapidsCsvScanMeta(
cScan: CSVScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[CSVScan](cScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuCSVScan.tagSupport(this)
// we are being overly cautious and that Csv does not support this yet
if (cScan.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Csv does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}
}

override def convertToGpu(): Scan =
GpuCSVScan(cScan.sparkSession,
cScan.fileIndex,
cScan.dataSchema,
cScan.readDataSchema,
cScan.readPartitionSchema,
cScan.options,
cScan.partitionFilters,
cScan.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes)
}
Loading