-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Columnar #25008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
add to whitelist |
|
ok to test |
|
Test build #107027 has finished for PR 25008 at commit
|
|
Retest this please. |
| } | ||
|
|
||
| override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) | ||
| // override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this cleanly?
| val df = sql("SELECT * FROM a WHERE p <= (SELECT MIN(id) FROM b)") | ||
| checkAnswer(df, Seq(Row(0, 0), Row(2, 0))) | ||
| // need to execute the query before we can examine fs.inputRDDs() | ||
| df.explain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we clean up this?
| // Push predicate to the cached table. | ||
| val df2 = df1.where("y = 3") | ||
|
|
||
| logWarning(s"ORIG QUERY PLAN:\n${df2.queryExecution.executedPlan}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, shall we clean up these three logWarnings in this test suite? This will not be read.
|
Test build #107147 has finished for PR 25008 at commit
|
|
Test build #107367 has finished for PR 25008 at commit
|
|
Test build #107407 has finished for PR 25008 at commit
|
abellina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall it makes sense. Just had some high level questions.
| buffers | ||
| .map(createAndDecompressColumn(_, offHeapColumnVectorEnabled)) | ||
| .map(b => { | ||
| numOutputRows += b.numRows() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should numOutputRows be max(numRows)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no because b is a ColumnarBatch, so we are iterating over possibly multiple batches. We are not iterating over individual columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah right, missed that. Thanks
| // need to execute the query before we can examine fs.inputRDDs() | ||
| assert(df.queryExecution.executedPlan match { | ||
| case WholeStageCodegenExec(fs @ FileSourceScanExec(_, _, _, partitionFilters, _, _, _)) => | ||
| case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so ColumnarToRowExec is here because InputAdapter may supportColumnar, right? Why is InputAdapter getting added here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WholeStageCodeGen marks the end of a code generation stage. InputAdapter marks the beginning of a code generation stage. So what we had before was a WholeStageCodeGen that had it's first entry a FileSourceScanExec because before this change FileSourceScanExec supported code generation to convert ColumnarBatchs into rows. The InputAdapter would logically have been a child of FileSourceScanExec, but it has no children so it is not there.
After this change ColumnarToRowExec is the only thing in the code generation stage, so it is flanked by the WholeStageCodegenExec and the InputAdaptor. FileSourceScanExec is returning batches and is not doing code gen because it is not needed any longer.
| assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) | ||
| } | ||
|
|
||
| test("cache for primitive type should be in WholeStageCodegen with InMemoryTableScanExec") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear to me why this test doesn't apply anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was verifying that InMemoryTableScanExec was inside of a WholeStageCodegen phase, but after this change InMemoryTableScanExec no longer supports codegen so the test is invalid. The ColumnarToRowExec is what will be in the codegen section instead.
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes look good.
| dataFilters: Seq[Expression], | ||
| override val tableIdentifier: Option[TableIdentifier]) | ||
| extends DataSourceScanExec with ColumnarBatchScan { | ||
| extends DataSourceScanExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change making all DataSourceScanExec nodes not codegen support, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, but there were only 2 things that the code gen was doing. Either convert ColumnarBatch into UnsafeRows or to convert whatever other rows were being returned by the DataSourceScanExec into UnsafeRows. The ColumnarBatch conversion is now covered by ColumnarToRowExec. The row to row conversion is covered by UnsafeProjections that are either inserted as a part of this patch or were already in the code, so we ended up doing a double conversion.
| }) | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ColumnarToRowExec's comment also mentions ColumnarBatchScan. If you are like to remove all reference to ColumnarBatchScan...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. I thought I got rid of all of them. Will grep though again.
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to avoid unnecessary change like this.
|
Test build #107467 has finished for PR 25008 at commit
|
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
…umnar ## What changes were proposed in this pull request? This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to apache#24795 ## How was this patch tested? I did some manual tests and ran/updated the automated tests I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise. Closes apache#25008 from revans2/columnar-remove-batch-scan. Authored-by: Robert (Bobby) Evans <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
| protected def stripSparkFilter(df: DataFrame): DataFrame = { | ||
| val schema = df.schema | ||
| val withoutFilters = df.queryExecution.sparkPlan.transform { | ||
| val withoutFilters = df.queryExecution.executedPlan.transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why making this change in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the existing columnar logic within each plans have moved to RowToColumnarExec and ColumnarToRowExec to deduplicate but it's now dependent on ApplyColumnarRulesAndInsertTransitions rule, which, now, requires execution preparation (QueryExecution.preparations).
However, per the doc, executedPlan should be only used for execution ideally. It could have been best to avoid. @revans2 even though it's too late, can you please describe what does this PR fixes in the PR description (presumably by listing each item)?
I have no idea what this PR fixes from reading the PR description and JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR replaces ColumnarBatchScan with ColumnarToRowExec. This involved changes to all subclasses of COlumnarBatchScan and to AdaptiveSparkPlanExec so it would also execute the needed columnar transition rules. I also made any fixes needed for tests. I preferred to keep the changes as small as possible for the tests, which is why I made a small change here in a test utility class.
The issue was that some tests were directly execution the plan returned by this function, which used to work for some very limited use cases, but did not work in all cases. I am happy to try and fix issues with this approach for the tests, just let me know what is the correct way to do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know the correct way to fix it for now - it needs some investigations. Can you clarify why it didn't work well?
The problem seems like by the internal behaviour changes - we now rely on ApplyColumnarRulesAndInsertTransitions. Can we make some investigation to confirm that it doesn't affect anything, and list up what changes were made in this PR description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is exactly what it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // and ColumnarToRow transformations in the middle of it, but they will not have the tag | ||
| // we want, so skip them if they are the first thing we see | ||
| private def isScanPlanTree(plan: SparkPlan, first: Boolean): Boolean = plan match { | ||
| case i: InputAdapter if !first => isScanPlanTree(i.child, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is InputAdapter pop up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prior to this PR, a subclass of ColumanarBatchScan would be in a plan that looked like.
INPUT (subclass of ColumnarBatchScan) -> (code gen supported nodes) -> WholeStageCodegenExec -> ...
After this change, it now looks like
INPUT (not a subclass of ColumnarBatchScan) -> InputAdapter -> ColumnarToRowExec -> (code gen supported nodes) -> WholeStageCodegenExec -> ...
Because the INPUT class no longer supports code generation the code generation rule will insert an InputAdapter after it and before the ColumnarToRowExec that does support code generation.
|
Hey, I see roughly the core logic itself is deduplicated fine without changing the exiting codes itself but seems like it changes the other stuff. Let's be clear on what this PR fixes next time - I thought this was just a simple refactoring but now realised that this is actually pretty invasive. |
What changes were proposed in this pull request?
This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to #24795
How was this patch tested?
I did some manual tests and ran/updated the automated tests
I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise.