-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27534][SQL] Do not load content column in binary data source if it is not selected
#24473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
| case CONTENT => readContent | ||
| case name => throw new RuntimeException(s"Unexcepted field name: ${name}") | ||
| } | ||
| InternalRow(values: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to change inferSchema() or still return content field with null values? cc: @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding a "keep invalid" option, when file read error, fill content column "null"?
Now when file loaded error, the datasource loading broken.
...test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
| case PATH => UTF8String.fromString(path) | ||
| case LENGTH => status.getLen | ||
| case MODIFICATION_TIME => DateTimeUtils.fromMillis(status.getModificationTime) | ||
| case CONTENT => readContent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a strong reason to prune other columns that are inexpensive. Code is much simpler if we only prune content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think current code is simpler.
The previous code contains some code which is hard to read:
val fullOutput = dataSchema.map { f =>
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()
}
val requiredOutput = fullOutput.filter { a =>
requiredSchema.fieldNames.contains(a.name)
}
val requiredColumns = GenerateUnsafeProjection.generate(requiredOutput, fullOutput)
...
Iterator(requiredColumns(internalRow))
| } | ||
|
|
||
|
|
||
| test("genPrunedRow") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just test buildReader on one file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need (and how to) test when pruned, the file is actually not read ?
|
Test build #104942 has finished for PR 24473 at commit
|
|
Test build #104947 has finished for PR 24473 at commit
|
|
@WeichenXu123 I pushed some changes. @cloud-fan Could you help review? Thanks! |
|
Test build #104956 has finished for PR 24473 at commit
|
|
retest this please. |
| ) | ||
|
|
||
| Iterator(requiredColumns(internalRow)) | ||
| Iterator.single(InternalRow(values: _*)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't project to unsafe row like previously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does projecting to unsafe row improve performance ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems dataSchema is not used, is it possible that the required schema contains fields that not exist in the dataSchema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall that we return unsafe row if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @viirya, unsafe row can be more efficient in space, and if there are no gain to do this change, we'd better keep it unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated using UnsafeRowWriter
|
Test build #104959 has finished for PR 24473 at commit
|
|
LGTM. |
| } | ||
| val requiredOutput = fullOutput.filter { a => | ||
| requiredSchema.fieldNames.contains(a.name) | ||
| if (pathGlobPattern.forall(new GlobFilter(_).accept(path))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it one if-else while we're here? For instance,
val isPatternMatched = pathGlobPattern.forall(new GlobFilter(_).accept(fsPath))
// These vals are intentionally lazy to avoid unnecessary file access via short-circuiting.
lazy val fs = fsPath.getFileSystem(broadcastedHadoopConf.value.value)
lazy val fileStatus = fs.getFileStatus(fsPath)
lazy val shouldNotFilterOut = filterFuncs.forall(_.apply(fileStatus))
if (isPatternMatched && shouldNotFilterOut) {
...
Iterator(requiredColumns(internalRow))
} else {
Iterator.empty[InternalRow]
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current impl, getFileStatus and filterFuncs are not touched if path doesn't match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this suggestion does not touch both because they are lazy
...test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
| assert(p.asInstanceOf[String].endsWith(file.getAbsolutePath)) | ||
| } | ||
| file.setReadable(false) | ||
| withClue("cannot read content") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like both positive and negative cases are within one test. Can we split them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary? test is grouped by function/feature already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not neccesary but why don't we make the test case simple and separate :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can still retain the group too, for instance, column pruning - positive and column pruning - negative. Not a big deal but I don't think it's difficult or too demanding to fix.
|
Looks fine to me too otherwise. |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good since my other comments left are rather style and cleanups. I will leave it to you @WeichenXu123 and @mengxr.
|
@HyukjinKwon Thanks for the review! I'm merging this into master. I do think the suggested changes are unnecessary. For example, two |
… if it is not selected
## What changes were proposed in this pull request?
A follow-up task from SPARK-25348. To save I/O cost, Spark shouldn't attempt to read the file if users didn't request the `content` column. For example:
```
spark.read.format("binaryFile").load(path).filter($"length" < 1000000).count()
```
## How was this patch tested?
Unit test added.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes apache#24473 from WeichenXu123/SPARK-27534.
Lead-authored-by: Xiangrui Meng <[email protected]>
Co-authored-by: WeichenXu <[email protected]>
Signed-off-by: Xiangrui Meng <[email protected]>
What changes were proposed in this pull request?
A follow-up task from SPARK-25348. To save I/O cost, Spark shouldn't attempt to read the file if users didn't request the
contentcolumn. For example:How was this patch tested?
Unit test added.
Please review http://spark.apache.org/contributing.html before opening a pull request.