Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,47 @@ def test_orc_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_li

assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.orc([first_data_path, second_data_path, third_data_path]),
conf=all_confs)
conf=all_confs)

conf_for_orc_aggregate_pushdown = {
"spark.sql.orc.aggregatePushdown": "true",
"spark.sql.sources.useV1SourceList": ""
}

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
@allow_non_gpu(any = True)
def test_orc_scan_with_aggregation_pushdown_fallback(spark_tmp_path):
"""
The aggregation will be pushed down in this test, so we should fallback to CPU
"""
data_path = spark_tmp_path + '/pushdown.orc'

def do_orc_scan(spark):
df = spark.read.orc(data_path).selectExpr("count(p)")
return df

with_cpu_session(lambda spark : spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").orc(data_path))

assert_cpu_and_gpu_are_equal_collect_with_capture(
do_orc_scan,
exist_classes= "BatchScanExec",
non_exist_classes= "GpuBatchScanExec",
conf = conf_for_orc_aggregate_pushdown)

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on ORC is a new feature of Spark 330')
def test_orc_scan_without_aggregation_pushdown_not_fallback(spark_tmp_path):
"""
No aggregation will be pushed down in this test, so we should not fallback to CPU
"""
data_path = spark_tmp_path + "/pushdown.orc"

def do_orc_scan(spark):
df = spark.read.orc(data_path).selectExpr("Max(p)")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is max not pushed down but count is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking to Spark-PR-34298.
AFAIU, Min/Max don't push down partition column. Only Count does.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine, then add a comment about it and update the name of this issues so only Count is included. Spark messed up by keeping the name of the issue and PR the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the test to cover all the aggregates (min/max/count) Vs. (None/- partition columns).

return df

with_cpu_session(lambda spark : spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").orc(data_path))

assert_gpu_and_cpu_are_equal_collect(
do_orc_scan,
conf_for_orc_aggregate_pushdown
)
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -694,6 +695,7 @@
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
Expand Down Expand Up @@ -758,6 +760,7 @@
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,42 +388,10 @@ abstract class Spark30XdbShims extends Spark30XdbShimsBase with Logging {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuOrcScanBase, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan

class RapidsOrcScanMeta(
oScan: OrcScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[OrcScan](oScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuOrcScanBase.tagSupport(this)
}

override def convertToGpu(): Scan =
GpuOrcScan(oScan.sparkSession,
oScan.hadoopConf,
oScan.fileIndex,
oScan.dataSchema,
oScan.readDataSchema,
oScan.readPartitionSchema,
oScan.options,
oScan.pushedFilters,
oScan.partitionFilters,
oScan.dataFilters,
conf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuParquetScanBase, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan

class RapidsParquetScanMeta(
pScan: ParquetScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[ParquetScan](pScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuParquetScanBase.tagSupport(this)
}

override def convertToGpu(): Scan = {
GpuParquetScan(pScan.sparkSession,
pScan.hadoopConf,
pScan.fileIndex,
pScan.dataSchema,
pScan.readDataSchema,
pScan.readPartitionSchema,
pScan.pushedFilters,
pScan.options,
pScan.partitionFilters,
pScan.dataFilters,
conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,42 +161,10 @@ trait Spark301until320Shims extends SparkShims {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,42 +507,10 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this)

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
(a, conf, p, r) => new RapidsParquetScanMeta(a, conf, p, r)),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit =
GpuOrcScanBase.tagSupport(this)

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
})
(a, conf, p, r) => new RapidsOrcScanMeta(a, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap

override def getPartitionFileNames(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{DataFromReplacementRule, GpuCSVScan, RapidsConf, RapidsMeta, ScanMeta}

import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan

class RapidsCsvScanMeta(
cScan: CSVScan,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends ScanMeta[CSVScan](cScan, conf, parent, rule) {

override def tagSelfForGpu(): Unit = {
GpuCSVScan.tagSupport(this)
// we are being overly cautious and that Csv does not support this yet
if (cScan.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Csv does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}
}

override def convertToGpu(): Scan =
GpuCSVScan(cScan.sparkSession,
cScan.fileIndex,
cScan.dataSchema,
cScan.readDataSchema,
cScan.readPartitionSchema,
cScan.options,
cScan.partitionFilters,
cScan.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes)
}
Loading