Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ case class FileSourceScanExec(
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {

override val supportsBatch: Boolean = relation.fileFormat.supportBatch(
override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we maybe add a comment about the reason we are making them lazy?

relation.sparkSession, StructType.fromAttributes(output))

override val needsUnsafeRowConversion: Boolean = {
override lazy val needsUnsafeRowConversion: Boolean = {
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you mentioned SparkSession, that line caught my attention where the active SparkSession is accessed using SparkSession.getActiveSession.get not relation.sparkSession as is the case for other places. I think that's something worth considering changing since we're at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this out of this PR's scope. That's more like making the plan workable whereas this PR targets the plan can be canonicalized.

} else {
Expand Down Expand Up @@ -199,7 +199,7 @@ case class FileSourceScanExec(
ret
}

override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens on the driver so no need for the lazy here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be computed anyway, though, when we create a new FileSourceScanExec in the canonicalization process, if it is not lazy, so I'd say that this is needed, as well as all the others.

val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
Expand Down Expand Up @@ -270,7 +270,7 @@ case class FileSourceScanExec(
private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")

override val metadata: Map[String, String] = {
override lazy val metadata: Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's driver-only too, isn't it? Why is this lazy required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be in executor side actually:

	at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:275)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
	at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. I'd have never thought about any code with RDD and physical operators on the executor-side (!) Learnt it today.

def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val location = relation.location
val locationDesc =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ import org.apache.spark.util.ThreadUtils
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {

/**
* A handle to the SQL Context that was used to create this plan. Since many operators need
* A handle to the SQL Context that was used to create this plan. Since many operators need
* access to the sqlContext for RDD operations or configuration this field is automatically
* populated by the query planning infrastructure.
*/
@transient
final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
@transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull

protected def sparkContext = sqlContext.sparkContext

// sqlContext will be null when SparkPlan nodes are created without the active sessions.
// So far, this only happens in the test cases.
val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.subexpressionEliminationEnabled
} else {
Expand All @@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

/** Overridden make copy also propagates sqlContext to copied plan. */
override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
SparkSession.setActiveSession(sqlContext.sparkSession)
if (sqlContext != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why wasn't the makeCopy problem discovered in the previous PR/investigation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it looks failed ahead. Once we go with lazy then it's discovered later (the exception message in the PR description).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks, I wondered because it seems a more generic issue, easier to happen, but probably we never met it as all the trials included FileSourceScanExec which caused an earlier failure... thanks.

SparkSession.setActiveSession(sqlContext.sparkSession)
}
super.makeCopy(newArgs)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.test.SharedSQLContext

class FileSourceScanExecSuite extends SharedSQLContext {
test("FileSourceScanExec should be canonicalizable in executor side") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/in/on

withTempPath { path =>
spark.range(1).toDF().write.parquet(path.getAbsolutePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant toDF

val df = spark.read.parquet(path.getAbsolutePath)
val fileSourceScanExec =
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isInstanceOf is a bit non-Scala IMHO and I'd prefer collectFirst { case op: FileSourceScanExec => op } instead.

try {
spark.range(1).foreach(_ => fileSourceScanExec.canonicalized)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether it is feasible (maybe in a followup?), but it would be great if we can test the canonicalization of all the Exec nodes in order to prevent such issue in the future... what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think of course it is.. it took me a while to make a small and simple test for it.. Hope leave it out of this PR's scope though.

} catch {
case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a named test so I'd get rid of the try-catch block because:

  1. It's going to fail the test anyway
  2. The title of the test matches the fail message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this gives an explicit scope about which condition is a failure case though. I believe this is a rather pattern. If both are okay, let me just keep in this way.

}
}
}
}