Skip to content

Commit f5bf861

Browse files
huaxingaocloud-fan
authored andcommitted
[SPARK-36760][SQL] Add interface SupportsPushDownV2Filters
Co-Authored-By: DB Tsai d_tsaiapple.com Co-Authored-By: Huaxin Gao huaxin_gaoapple.com ### What changes were proposed in this pull request? This is the 2nd PR for V2 Filter support. This PR does the following: - Add interface SupportsPushDownV2Filters Future work: - refactor `OrcFilters`, `ParquetFilters`, `JacksonParser`, `UnivocityParser` so both V1 file source and V2 file source can use them - For V2 file source: implement v2 filter -> parquet/orc filter. csv and Json don't have real filters, but also need to change the current code to have v2 filter -> `JacksonParser`/`UnivocityParser` - For V1 file source, keep what we currently have: v1 filter -> parquet/orc filter - We don't need v1filter.toV2 and v2filter.toV1 since we have two separate paths The reasons that we have reached the above conclusion: - The major motivation to implement V2Filter is to eliminate the unnecessary conversion between Catalyst types and Scala types when using Filters. - We provide this `SupportsPushDownV2Filters` in this PR so V2 data source (e.g. iceberg) can implement it and use V2 Filters - There are lots of work to implement v2 filters in the V2 file sources because of the following reasons: possible approaches for implementing V2Filter: 1. keep what we have for file source v1: v1 filter -> parquet/orc filter file source v2 we will implement v2 filter -> parquet/orc filter We don't need v1->v2 and v2->v1 problem with this approach: there are lots of code duplication 2. We will implement v2 filter -> parquet/orc filter file source v1: v1 filter -> v2 filter -> parquet/orc filter We will need V1 -> V2 This is the approach I am using in apache#33973 In that PR, I have v2 orc: v2 filter -> orc filter V1 orc: v1 -> v2 -> orc filter v2 csv: v2->v1, new UnivocityParser v1 csv: new UnivocityParser v2 Json: v2->v1, new JacksonParser v1 Json: new JacksonParser csv and Json don't have real filters, they just use filter references, should be OK to use either v1 and v2. Easier to use v1 because no need to change. I haven't finished parquet yet. The PR doesn't have the parquet V2Filter implementation, but I plan to have v2 parquet: v2 filter -> parquet filter v1 parquet: v1 -> v2 -> parquet filter Problem with this approach: 1. It's not easy to implement V1->V2 because V2 filter have `LiteralValue` and needs type info. We already lost the type information when we convert Expression filer to v1 filter. 2. parquet is OK Use Timestamp as example, parquet filter takes long for timestamp v2 parquet: v2 filter -> parquet filter timestamp Expression (Long) -> v2 filter (LiteralValue Long)-> parquet filter (Long) V1 parquet: v1 -> v2 -> parquet filter timestamp Expression (Long) -> v1 filter (timestamp) -> v2 filter (LiteralValue Long)-> parquet filter (Long) but we have problem for orc because orc filter takes java Timestamp v2 orc: v2 filter -> orc filter timestamp Expression (Long) -> v2 filter (LiteralValue Long)-> parquet filter (Timestamp) V1 orc: v1 -> v2 -> orc filter Expression (Long) -> v1 filter (timestamp) -> v2 filter (LiteralValue Long)-> parquet filter (Timestamp) This defeats the purpose of implementing v2 filters. 3. keep what we have for file source v1: v1 filter -> parquet/orc filter file source v2: v2 filter -> v1 filter -> parquet/orc filter We will need V2 -> V1 we have similar problem as approach 2. So the conclusion is: approach 1 (keep what we have for file source v1: v1 filter -> parquet/orc filter file source v2 we will implement v2 filter -> parquet/orc filter) is better, but there are lots of code duplication. We will need to refactor `OrcFilters`, `ParquetFilters`, `JacksonParser`, `UnivocityParser` so both V1 file source and V2 file source can use them. ### Why are the changes needed? Use V2Filters to eliminate the unnecessary conversion between Catalyst types and Scala types. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Added new UT Closes apache#34001 from huaxingao/v2filter. Lead-authored-by: Huaxin Gao <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 64697b2 commit f5bf861

7 files changed

Lines changed: 593 additions & 11 deletions

File tree

sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Filter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.connector.expressions.filter;
1919

20+
import java.io.Serializable;
21+
2022
import org.apache.spark.annotation.Evolving;
2123
import org.apache.spark.sql.connector.expressions.Expression;
2224
import org.apache.spark.sql.connector.expressions.NamedReference;
@@ -27,7 +29,7 @@
2729
* @since 3.3.0
2830
*/
2931
@Evolving
30-
public abstract class Filter implements Expression {
32+
public abstract class Filter implements Expression, Serializable {
3133

3234
protected static final NamedReference[] EMPTY_REFERENCE = new NamedReference[0];
3335

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
import org.apache.spark.sql.connector.expressions.filter.Filter;
22+
23+
/**
24+
* A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
25+
* push down filters to the data source and reduce the size of the data to be read.
26+
*
27+
* @since 3.3.0
28+
*/
29+
@Evolving
30+
public interface SupportsPushDownV2Filters extends ScanBuilder {
31+
32+
/**
33+
* Pushes down filters, and returns filters that need to be evaluated after scanning.
34+
* <p>
35+
* Rows should be returned from the data source if and only if all of the filters match. That is,
36+
* filters must be interpreted as ANDed together.
37+
*/
38+
Filter[] pushFilters(Filter[] filters);
39+
40+
/**
41+
* Returns the filters that are pushed to the data source via {@link #pushFilters(Filter[])}.
42+
* <p>
43+
* There are 3 kinds of filters:
44+
* <ol>
45+
* <li>pushable filters which don't need to be evaluated again after scanning.</li>
46+
* <li>pushable filters which still need to be evaluated after scanning, e.g. parquet row
47+
* group filter.</li>
48+
* <li>non-pushable filters.</li>
49+
* </ol>
50+
* <p>
51+
* Both case 1 and 2 should be considered as pushed filters and should be returned by this method.
52+
* <p>
53+
* It's possible that there is no filters in the query and {@link #pushFilters(Filter[])}
54+
* is never called, empty array should be returned for this case.
55+
*/
56+
Filter[] pushedFilters();
57+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,30 @@
1818
package org.apache.spark.sql.execution.datasources.v2
1919

2020
import scala.collection.JavaConverters._
21+
import scala.collection.mutable
2122

2223
import org.apache.spark.sql.{SparkSession, Strategy}
2324
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
24-
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
25+
import org.apache.spark.sql.catalyst.expressions
26+
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, EmptyRow, Expression, Literal, NamedExpression, PredicateHelper, SubqueryExpression}
2527
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2628
import org.apache.spark.sql.catalyst.plans.logical._
2729
import org.apache.spark.sql.catalyst.util.toPrettySQL
2830
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
31+
import org.apache.spark.sql.connector.expressions.{FieldReference, Literal => V2Literal, LiteralValue}
32+
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, EqualNullSafe => V2EqualNullSafe, EqualTo => V2EqualTo, Filter => V2Filter, GreaterThan => V2GreaterThan, GreaterThanOrEqual => V2GreaterThanOrEqual, In => V2In, IsNotNull => V2IsNotNull, IsNull => V2IsNull, LessThan => V2LessThan, LessThanOrEqual => V2LessThanOrEqual, Not => V2Not, Or => V2Or, StringContains => V2StringContains, StringEndsWith => V2StringEndsWith, StringStartsWith => V2StringStartsWith}
2933
import org.apache.spark.sql.connector.read.LocalScan
3034
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
3135
import org.apache.spark.sql.connector.write.V1Write
3236
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3337
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
34-
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
38+
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PushableColumn, PushableColumnBase}
3539
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
3640
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
41+
import org.apache.spark.sql.types.{BooleanType, StringType}
3742
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3843
import org.apache.spark.storage.StorageLevel
44+
import org.apache.spark.unsafe.types.UTF8String
3945

4046
class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper {
4147

@@ -427,3 +433,158 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
427433
case _ => Nil
428434
}
429435
}
436+
437+
private[sql] object DataSourceV2Strategy {
438+
439+
private def translateLeafNodeFilterV2(
440+
predicate: Expression,
441+
pushableColumn: PushableColumnBase): Option[V2Filter] = predicate match {
442+
case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>
443+
Some(new V2EqualTo(FieldReference(name), LiteralValue(v, t)))
444+
case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>
445+
Some(new V2EqualTo(FieldReference(name), LiteralValue(v, t)))
446+
447+
case expressions.EqualNullSafe(pushableColumn(name), Literal(v, t)) =>
448+
Some(new V2EqualNullSafe(FieldReference(name), LiteralValue(v, t)))
449+
case expressions.EqualNullSafe(Literal(v, t), pushableColumn(name)) =>
450+
Some(new V2EqualNullSafe(FieldReference(name), LiteralValue(v, t)))
451+
452+
case expressions.GreaterThan(pushableColumn(name), Literal(v, t)) =>
453+
Some(new V2GreaterThan(FieldReference(name), LiteralValue(v, t)))
454+
case expressions.GreaterThan(Literal(v, t), pushableColumn(name)) =>
455+
Some(new V2LessThan(FieldReference(name), LiteralValue(v, t)))
456+
457+
case expressions.LessThan(pushableColumn(name), Literal(v, t)) =>
458+
Some(new V2LessThan(FieldReference(name), LiteralValue(v, t)))
459+
case expressions.LessThan(Literal(v, t), pushableColumn(name)) =>
460+
Some(new V2GreaterThan(FieldReference(name), LiteralValue(v, t)))
461+
462+
case expressions.GreaterThanOrEqual(pushableColumn(name), Literal(v, t)) =>
463+
Some(new V2GreaterThanOrEqual(FieldReference(name), LiteralValue(v, t)))
464+
case expressions.GreaterThanOrEqual(Literal(v, t), pushableColumn(name)) =>
465+
Some(new V2LessThanOrEqual(FieldReference(name), LiteralValue(v, t)))
466+
467+
case expressions.LessThanOrEqual(pushableColumn(name), Literal(v, t)) =>
468+
Some(new V2LessThanOrEqual(FieldReference(name), LiteralValue(v, t)))
469+
case expressions.LessThanOrEqual(Literal(v, t), pushableColumn(name)) =>
470+
Some(new V2GreaterThanOrEqual(FieldReference(name), LiteralValue(v, t)))
471+
472+
case in @ expressions.InSet(pushableColumn(name), set) =>
473+
val values: Array[V2Literal[_]] =
474+
set.toSeq.map(elem => LiteralValue(elem, in.dataType)).toArray
475+
Some(new V2In(FieldReference(name), values))
476+
477+
// Because we only convert In to InSet in Optimizer when there are more than certain
478+
// items. So it is possible we still get an In expression here that needs to be pushed
479+
// down.
480+
case in @ expressions.In(pushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) =>
481+
val hSet = list.map(_.eval(EmptyRow))
482+
Some(new V2In(FieldReference(name),
483+
hSet.toArray.map(LiteralValue(_, in.value.dataType))))
484+
485+
case expressions.IsNull(pushableColumn(name)) =>
486+
Some(new V2IsNull(FieldReference(name)))
487+
case expressions.IsNotNull(pushableColumn(name)) =>
488+
Some(new V2IsNotNull(FieldReference(name)))
489+
490+
case expressions.StartsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
491+
Some(new V2StringStartsWith(FieldReference(name), v))
492+
493+
case expressions.EndsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
494+
Some(new V2StringEndsWith(FieldReference(name), v))
495+
496+
case expressions.Contains(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
497+
Some(new V2StringContains(FieldReference(name), v))
498+
499+
case expressions.Literal(true, BooleanType) =>
500+
Some(new V2AlwaysTrue)
501+
502+
case expressions.Literal(false, BooleanType) =>
503+
Some(new V2AlwaysFalse)
504+
505+
case _ => None
506+
}
507+
508+
/**
509+
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
510+
*
511+
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
512+
*/
513+
protected[sql] def translateFilterV2(
514+
predicate: Expression,
515+
supportNestedPredicatePushdown: Boolean): Option[V2Filter] = {
516+
translateFilterV2WithMapping(predicate, None, supportNestedPredicatePushdown)
517+
}
518+
519+
/**
520+
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
521+
*
522+
* @param predicate The input [[Expression]] to be translated as [[Filter]]
523+
* @param translatedFilterToExpr An optional map from leaf node filter expressions to its
524+
* translated [[Filter]]. The map is used for rebuilding
525+
* [[Expression]] from [[Filter]].
526+
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
527+
*/
528+
protected[sql] def translateFilterV2WithMapping(
529+
predicate: Expression,
530+
translatedFilterToExpr: Option[mutable.HashMap[V2Filter, Expression]],
531+
nestedPredicatePushdownEnabled: Boolean)
532+
: Option[V2Filter] = {
533+
predicate match {
534+
case expressions.And(left, right) =>
535+
// See SPARK-12218 for detailed discussion
536+
// It is not safe to just convert one side if we do not understand the
537+
// other side. Here is an example used to explain the reason.
538+
// Let's say we have (a = 2 AND trim(b) = 'blah') OR (c > 0)
539+
// and we do not understand how to convert trim(b) = 'blah'.
540+
// If we only convert a = 2, we will end up with
541+
// (a = 2) OR (c > 0), which will generate wrong results.
542+
// Pushing one leg of AND down is only safe to do at the top level.
543+
// You can see ParquetFilters' createFilter for more details.
544+
for {
545+
leftFilter <- translateFilterV2WithMapping(
546+
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
547+
rightFilter <- translateFilterV2WithMapping(
548+
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
549+
} yield new V2And(leftFilter, rightFilter)
550+
551+
case expressions.Or(left, right) =>
552+
for {
553+
leftFilter <- translateFilterV2WithMapping(
554+
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
555+
rightFilter <- translateFilterV2WithMapping(
556+
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
557+
} yield new V2Or(leftFilter, rightFilter)
558+
559+
case expressions.Not(child) =>
560+
translateFilterV2WithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
561+
.map(new V2Not(_))
562+
563+
case other =>
564+
val filter = translateLeafNodeFilterV2(
565+
other, PushableColumn(nestedPredicatePushdownEnabled))
566+
if (filter.isDefined && translatedFilterToExpr.isDefined) {
567+
translatedFilterToExpr.get(filter.get) = predicate
568+
}
569+
filter
570+
}
571+
}
572+
573+
protected[sql] def rebuildExpressionFromFilter(
574+
filter: V2Filter,
575+
translatedFilterToExpr: mutable.HashMap[V2Filter, Expression]): Expression = {
576+
filter match {
577+
case and: V2And =>
578+
expressions.And(rebuildExpressionFromFilter(and.left, translatedFilterToExpr),
579+
rebuildExpressionFromFilter(and.right, translatedFilterToExpr))
580+
case or: V2Or =>
581+
expressions.Or(rebuildExpressionFromFilter(or.left, translatedFilterToExpr),
582+
rebuildExpressionFromFilter(or.right, translatedFilterToExpr))
583+
case not: V2Not =>
584+
expressions.Not(rebuildExpressionFromFilter(not.child, translatedFilterToExpr))
585+
case other =>
586+
translatedFilterToExpr.getOrElse(other,
587+
throw new IllegalStateException("Failed to rebuild Expression for filter: " + filter))
588+
}
589+
}
590+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2424
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
2525
import org.apache.spark.sql.connector.expressions.FieldReference
2626
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
27-
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
28-
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
29-
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
30-
import org.apache.spark.sql.execution.datasources.PushableColumnWithoutNestedColumn
27+
import org.apache.spark.sql.connector.expressions.filter.{Filter => V2Filter}
28+
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
29+
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PushableColumnWithoutNestedColumn}
3130
import org.apache.spark.sql.internal.SQLConf
3231
import org.apache.spark.sql.sources
3332
import org.apache.spark.sql.types.StructType
@@ -40,7 +39,7 @@ object PushDownUtils extends PredicateHelper {
4039
*/
4140
def pushFilters(
4241
scanBuilder: ScanBuilder,
43-
filters: Seq[Expression]): (Seq[sources.Filter], Seq[Expression]) = {
42+
filters: Seq[Expression]): (Either[Seq[sources.Filter], Seq[V2Filter]], Seq[Expression]) = {
4443
scanBuilder match {
4544
case r: SupportsPushDownFilters =>
4645
// A map from translated data source leaf node filters to original catalyst filter
@@ -69,9 +68,38 @@ object PushDownUtils extends PredicateHelper {
6968
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
7069
DataSourceStrategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
7170
}
72-
(r.pushedFilters(), (untranslatableExprs ++ postScanFilters).toSeq)
71+
(Left(r.pushedFilters()), (untranslatableExprs ++ postScanFilters).toSeq)
7372

74-
case _ => (Nil, filters)
73+
case r: SupportsPushDownV2Filters =>
74+
// A map from translated data source leaf node filters to original catalyst filter
75+
// expressions. For a `And`/`Or` predicate, it is possible that the predicate is partially
76+
// pushed down. This map can be used to construct a catalyst filter expression from the
77+
// input filter, or a superset(partial push down filter) of the input filter.
78+
val translatedFilterToExpr = mutable.HashMap.empty[V2Filter, Expression]
79+
val translatedFilters = mutable.ArrayBuffer.empty[V2Filter]
80+
// Catalyst filter expression that can't be translated to data source filters.
81+
val untranslatableExprs = mutable.ArrayBuffer.empty[Expression]
82+
83+
for (filterExpr <- filters) {
84+
val translated =
85+
DataSourceV2Strategy.translateFilterV2WithMapping(
86+
filterExpr, Some(translatedFilterToExpr), nestedPredicatePushdownEnabled = true)
87+
if (translated.isEmpty) {
88+
untranslatableExprs += filterExpr
89+
} else {
90+
translatedFilters += translated.get
91+
}
92+
}
93+
94+
// Data source filters that need to be evaluated again after scanning. which means
95+
// the data source cannot guarantee the rows returned can pass these filters.
96+
// As a result we must return it so Spark can plan an extra filter operator.
97+
val postScanFilters = r.pushFilters(translatedFilters.toArray).map { filter =>
98+
DataSourceV2Strategy.rebuildExpressionFromFilter(filter, translatedFilterToExpr)
99+
}
100+
(Right(r.pushedFilters), (untranslatableExprs ++ postScanFilters).toSeq)
101+
102+
case _ => (Left(Nil), filters)
75103
}
76104
}
77105

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
5858
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
5959
val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
6060
sHolder.builder, normalizedFiltersWithoutSubquery)
61+
val pushedFiltersStr = if (pushedFilters.isLeft) {
62+
pushedFilters.left.get.mkString(", ")
63+
} else {
64+
pushedFilters.right.get.mkString(", ")
65+
}
66+
6167
val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery
6268

6369
logInfo(
6470
s"""
6571
|Pushing operators to ${sHolder.relation.name}
66-
|Pushed Filters: ${pushedFilters.mkString(", ")}
72+
|Pushed Filters: $pushedFiltersStr
6773
|Post-Scan Filters: ${postScanFilters.mkString(",")}
6874
""".stripMargin)
6975

0 commit comments

Comments
 (0)