-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27473][SQL] Support filter push down for status fields in binary file data source #24387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #104639 has finished for PR 24387 at commit
|
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
|
|
||
| private[binaryfile] def createFilterFunctions(filter: Filter): Seq[FileStatus => Boolean] = { | ||
| filter match { | ||
| case andFilter: And => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case And(left, right) =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we support Or?
| filter match { | ||
| case andFilter: And => | ||
| createFilterFunctions(andFilter.left) ++ createFilterFunctions(andFilter.right) | ||
| case LessThan("length", value: Long) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of group by filter, could you group them by the field name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| case andFilter: And => | ||
| createFilterFunctions(andFilter.left) ++ createFilterFunctions(andFilter.right) | ||
| case LessThan("length", value: Long) => | ||
| Seq((status: FileStatus) => status.getLen < value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Seq => Some
| Seq((status: FileStatus) => status.getLen < value) | ||
| case LessThan("modificationTime", value: Timestamp) => | ||
| Seq((status: FileStatus) => status.getModificationTime < value.getTime) | ||
| case LessThanOrEqual("length", value: Long) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, does SQL always convert the value to Long?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, if the data type is long type.
| Seq((status: FileStatus) => status.getLen >= value) | ||
| case GreaterThanOrEqual("modificationTime", value: Timestamp) => | ||
| Seq((status: FileStatus) => status.getModificationTime >= value.getTime) | ||
| case _ => Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: => None
| col("year") // this is a partition column | ||
| ) | ||
| if (sqlFilter != null) { | ||
| resultDF = resultDF.filter(sqlFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test doesn't really test filter push down because without it the test still passes. I think we should unit test the buildReader function alone.
| } | ||
|
|
||
|
|
||
| def testCreateFilterFunctions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above. We can just unit test the buildReader func, which would look similar to your test here.
...test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
Show resolved
Hide resolved
|
@WeichenXu123 Can you also handle |
|
Test build #104688 has finished for PR 24387 at commit
|
|
Test build #104743 has finished for PR 24387 at commit
|
|
Jenkins, retest this please. |
|
Test build #104757 has finished for PR 24387 at commit
|
|
Test build #104772 has finished for PR 24387 at commit
|
Change to unit test `createFilterFunction`, which should be sufficient.
|
Thanks @mengxr 's update! Looks good to me. |
|
@WeichenXu123 I pushed some changes to your branch that added more tests. |
|
LGTM pending Jenkins. @cloud-fan Do you want to make a pass? |
|
Test build #104777 has finished for PR 24387 at commit
|
|
Test build #104778 has finished for PR 24387 at commit
|
|
Test build #104779 has finished for PR 24387 at commit
|
| case GreaterThanOrEqual(MODIFICATION_TIME, value: Timestamp) => | ||
| _.getModificationTime >= value.getTime | ||
| case EqualTo(MODIFICATION_TIME, value: Timestamp) => | ||
| _.getModificationTime == value.getTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks EqualNullSafe and In are missing.
…ry file data source
## What changes were proposed in this pull request?
Support 4 kinds of filters:
- LessThan
- LessThanOrEqual
- GreatThan
- GreatThanOrEqual
Support filters applied on 2 columns:
- modificationTime
- length
Note:
In order to support datasource filter push-down, I flatten schema to be:
```
val schema = StructType(
StructField("path", StringType, false) ::
StructField("modificationTime", TimestampType, false) ::
StructField("length", LongType, false) ::
StructField("content", BinaryType, true) :: Nil)
```
## How was this patch tested?
To be added.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes apache#24387 from WeichenXu123/binary_ds_filter.
Lead-authored-by: WeichenXu <[email protected]>
Co-authored-by: Xiangrui Meng <[email protected]>
Signed-off-by: Xiangrui Meng <[email protected]>
What changes were proposed in this pull request?
Support 5 kinds of filters with and/or:
Support filters applied on 2 columns:
Note:
In order to support datasource filter push-down, I flatten schema to be:
How was this patch tested?
To be added.
Please review http://spark.apache.org/contributing.html before opening a pull request.