Skip to content

Commit ae88eeb

Browse files
committed
DataSourceStrategy
1 parent 04e53d2 commit ae88eeb

File tree

2 files changed

+223
-109
lines changed

2 files changed

+223
-109
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -442,53 +442,65 @@ object DataSourceStrategy {
442442
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
443443
*/
444444
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
445+
// Recursively try to find an attribute name from the top level that can be pushed down.
446+
def attrName(e: Expression): Option[String] = e match {
447+
// In Spark and many data sources such as parquet, dots are used as a column path delimiter;
448+
// thus, we don't translate such expressions.
449+
case a: Attribute if !a.name.contains(".") =>
450+
Some(a.name)
451+
case s: GetStructField if !s.childSchema(s.ordinal).name.contains(".") =>
452+
attrName(s.child).map(_ + s".${s.childSchema(s.ordinal).name}")
453+
case _ =>
454+
None
455+
}
456+
445457
predicate match {
446-
case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
447-
Some(sources.EqualTo(a.name, convertToScala(v, t)))
448-
case expressions.EqualTo(Literal(v, t), a: Attribute) =>
449-
Some(sources.EqualTo(a.name, convertToScala(v, t)))
450-
451-
case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
452-
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
453-
case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
454-
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
455-
456-
case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
457-
Some(sources.GreaterThan(a.name, convertToScala(v, t)))
458-
case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
459-
Some(sources.LessThan(a.name, convertToScala(v, t)))
460-
461-
case expressions.LessThan(a: Attribute, Literal(v, t)) =>
462-
Some(sources.LessThan(a.name, convertToScala(v, t)))
463-
case expressions.LessThan(Literal(v, t), a: Attribute) =>
464-
Some(sources.GreaterThan(a.name, convertToScala(v, t)))
465-
466-
case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
467-
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
468-
case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
469-
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
470-
471-
case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
472-
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
473-
case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
474-
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
475-
476-
case expressions.InSet(a: Attribute, set) =>
477-
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
478-
Some(sources.In(a.name, set.toArray.map(toScala)))
458+
case expressions.EqualTo(e: Expression, Literal(v, t)) =>
459+
attrName(e).map(name => sources.EqualTo(name, convertToScala(v, t)))
460+
case expressions.EqualTo(Literal(v, t), e: Expression) =>
461+
attrName(e).map(name => sources.EqualTo(name, convertToScala(v, t)))
462+
463+
case expressions.EqualNullSafe(e: Expression, Literal(v, t)) =>
464+
attrName(e).map(name => sources.EqualNullSafe(name, convertToScala(v, t)))
465+
case expressions.EqualNullSafe(Literal(v, t), e: Expression) =>
466+
attrName(e).map(name => sources.EqualNullSafe(name, convertToScala(v, t)))
467+
468+
case expressions.GreaterThan(e: Expression, Literal(v, t)) =>
469+
attrName(e).map(name => sources.GreaterThan(name, convertToScala(v, t)))
470+
case expressions.GreaterThan(Literal(v, t), e: Expression) =>
471+
attrName(e).map(name => sources.LessThan(name, convertToScala(v, t)))
472+
473+
case expressions.LessThan(e: Expression, Literal(v, t)) =>
474+
attrName(e).map(name => sources.LessThan(name, convertToScala(v, t)))
475+
case expressions.LessThan(Literal(v, t), e: Expression) =>
476+
attrName(e).map(name => sources.GreaterThan(name, convertToScala(v, t)))
477+
478+
case expressions.GreaterThanOrEqual(e: Expression, Literal(v, t)) =>
479+
attrName(e).map(name => sources.GreaterThanOrEqual(name, convertToScala(v, t)))
480+
case expressions.GreaterThanOrEqual(Literal(v, t), e: Expression) =>
481+
attrName(e).map(name => sources.LessThanOrEqual(name, convertToScala(v, t)))
482+
483+
case expressions.LessThanOrEqual(e: Expression, Literal(v, t)) =>
484+
attrName(e).map(name => sources.LessThanOrEqual(name, convertToScala(v, t)))
485+
case expressions.LessThanOrEqual(Literal(v, t), e: Expression) =>
486+
attrName(e).map(name => sources.GreaterThanOrEqual(name, convertToScala(v, t)))
487+
488+
case expressions.InSet(e: Expression, set) =>
489+
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
490+
attrName(e).map(name => sources.In(name, set.toArray.map(toScala)))
479491

480492
// Because we only convert In to InSet in Optimizer when there are more than certain
481493
// items. So it is possible we still get an In expression here that needs to be pushed
482494
// down.
483-
case expressions.In(a: Attribute, list) if list.forall(_.isInstanceOf[Literal]) =>
484-
val hSet = list.map(_.eval(EmptyRow))
485-
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
486-
Some(sources.In(a.name, hSet.toArray.map(toScala)))
495+
case expressions.In(e: Expression, list) if list.forall(_.isInstanceOf[Literal]) =>
496+
val hSet = list.map(e => e.eval(EmptyRow))
497+
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
498+
attrName(e).map(name => sources.In(name, hSet.toArray.map(toScala)))
487499

488-
case expressions.IsNull(a: Attribute) =>
489-
Some(sources.IsNull(a.name))
490-
case expressions.IsNotNull(a: Attribute) =>
491-
Some(sources.IsNotNull(a.name))
500+
case expressions.IsNull(e: Expression) =>
501+
attrName(e).map(name => sources.IsNull(name))
502+
case expressions.IsNotNull(e: Expression) =>
503+
attrName(e).map(name => sources.IsNotNull(name))
492504

493505
case expressions.And(left, right) =>
494506
// See SPARK-12218 for detailed discussion
@@ -514,14 +526,14 @@ object DataSourceStrategy {
514526
case expressions.Not(child) =>
515527
translateFilter(child).map(sources.Not)
516528

517-
case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
518-
Some(sources.StringStartsWith(a.name, v.toString))
529+
case expressions.StartsWith(e: Expression, Literal(v: UTF8String, StringType)) =>
530+
attrName(e).map(name => sources.StringStartsWith(name, v.toString))
519531

520-
case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
521-
Some(sources.StringEndsWith(a.name, v.toString))
532+
case expressions.EndsWith(e: Expression, Literal(v: UTF8String, StringType)) =>
533+
attrName(e).map(name => sources.StringEndsWith(name, v.toString))
522534

523-
case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
524-
Some(sources.StringContains(a.name, v.toString))
535+
case expressions.Contains(e: Expression, Literal(v: UTF8String, StringType)) =>
536+
attrName(e).map(name => sources.StringContains(name, v.toString))
525537

526538
case expressions.Literal(true, BooleanType) =>
527539
Some(sources.AlwaysTrue)

0 commit comments

Comments
 (0)