-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25680][SQL] SQL execution listener shouldn't happen on execution thread #22674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
a456226
436197b
642ddd3
a25524b
3ffa536
0bfc240
6e3a345
c42b499
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,8 @@ object SQLExecution { | |
| */ | ||
| def withNewExecutionId[T]( | ||
| sparkSession: SparkSession, | ||
| queryExecution: QueryExecution)(body: => T): T = { | ||
| queryExecution: QueryExecution, | ||
| name: Option[String] = None)(body: => T): T = { | ||
| val sc = sparkSession.sparkContext | ||
| val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) | ||
| val executionId = SQLExecution.nextExecutionId | ||
|
|
@@ -71,14 +72,35 @@ object SQLExecution { | |
| val callSite = sc.getCallSite() | ||
|
|
||
| withSQLConfPropagated(sparkSession) { | ||
| sc.listenerBus.post(SparkListenerSQLExecutionStart( | ||
| executionId, callSite.shortForm, callSite.longForm, queryExecution.toString, | ||
| SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis())) | ||
| var ex: Option[Exception] = None | ||
| val startTime = System.currentTimeMillis() | ||
| try { | ||
| sc.listenerBus.post(SparkListenerSQLExecutionStart( | ||
| executionId = executionId, | ||
| description = callSite.shortForm, | ||
| details = callSite.longForm, | ||
| physicalPlanDescription = queryExecution.toString, | ||
| // `queryExecution.executedPlan` triggers query planning. If it fails, the exception | ||
| // will be caught and reported in the `SparkListenerSQLExecutionEnd` | ||
| sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), | ||
| time = startTime)) | ||
| body | ||
| } catch { | ||
| case e: Exception => | ||
| ex = Some(e) | ||
| throw e | ||
| } finally { | ||
| sc.listenerBus.post(SparkListenerSQLExecutionEnd( | ||
| executionId, System.currentTimeMillis())) | ||
| val endTime = System.currentTimeMillis() | ||
| val event = SparkListenerSQLExecutionEnd(executionId, endTime) | ||
| // Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the `name` | ||
| // parameter. The `ExecutionListenerManager` only watches SQL executions with name. We | ||
| // can specify the execution name in more places in the future, so that | ||
| // `QueryExecutionListener` can track more cases. | ||
| event.executionName = name | ||
| event.duration = endTime - startTime | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. duration used to be reported in nanos. Now it's millis. I would still report it as nanos if possible.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah good catch! |
||
| event.qe = queryExecution | ||
| event.executionFailure = ex | ||
| sc.listenerBus.post(event) | ||
| } | ||
| } | ||
| } finally { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,15 +17,15 @@ | |
|
|
||
| package org.apache.spark.sql.execution.ui | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonIgnore | ||
| import com.fasterxml.jackson.databind.JavaType | ||
| import com.fasterxml.jackson.databind.`type`.TypeFactory | ||
| import com.fasterxml.jackson.databind.annotation.JsonDeserialize | ||
| import com.fasterxml.jackson.databind.util.Converter | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.sql.execution.SparkPlanInfo | ||
| import org.apache.spark.sql.execution.metric._ | ||
| import org.apache.spark.sql.execution.{QueryExecution, SparkPlanInfo} | ||
|
|
||
| @DeveloperApi | ||
| case class SparkListenerSQLExecutionStart( | ||
|
|
@@ -39,7 +39,14 @@ case class SparkListenerSQLExecutionStart( | |
|
|
||
| @DeveloperApi | ||
| case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) | ||
| extends SparkListenerEvent | ||
| extends SparkListenerEvent { | ||
|
|
||
| @JsonIgnore private[sql] var executionName: Option[String] = None | ||
|
||
| // These 3 fields are only accessed when `executionName` is defined. | ||
| @JsonIgnore private[sql] var duration: Long = 0L | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did you verify that the JsonIgnore annotation actually works? For some reason, I actually needed to annotate the class as @JsonIgnoreProperties(Array("a", b", "c"))
class SomeClass {
@JsonProperty("a") val a: ...
@JsonProperty("b") val b: ...
}the reason being Json4s understands that API better. I believe we use Json4s for all of these events
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a test to verify it: https://github.com/apache/spark/pull/22674/files#diff-6fa1d00d1cb20554dda238f2a3bc3ecbR55 I also used |
||
| @JsonIgnore private[sql] var qe: QueryExecution = null | ||
| @JsonIgnore private[sql] var executionFailure: Option[Exception] = None | ||
| } | ||
|
|
||
| /** | ||
| * A message used to update SQL metric value for driver-side updates (which doesn't get reflected | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,15 +17,16 @@ | |
|
|
||
| package org.apache.spark.sql.util | ||
|
|
||
| import java.util.concurrent.locks.ReentrantReadWriteLock | ||
| import java.util.concurrent.CopyOnWriteArrayList | ||
|
|
||
| import scala.collection.mutable.ListBuffer | ||
| import scala.util.control.NonFatal | ||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.annotation.{DeveloperApi, Experimental, InterfaceStability} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.execution.QueryExecution | ||
| import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd | ||
| import org.apache.spark.sql.internal.StaticSQLConf._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -75,95 +76,69 @@ trait QueryExecutionListener { | |
| */ | ||
| @Experimental | ||
| @InterfaceStability.Evolving | ||
| class ExecutionListenerManager private extends Logging { | ||
| class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we shall add param comments. |
||
| extends SparkListener with Logging { | ||
|
|
||
| private[sql] def this(conf: SparkConf) = { | ||
| this() | ||
| private[this] val listeners = new CopyOnWriteArrayList[QueryExecutionListener] | ||
|
|
||
| if (loadExtensions) { | ||
| val conf = session.sparkContext.conf | ||
| conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames => | ||
| Utils.loadExtensions(classOf[QueryExecutionListener], classNames, conf).foreach(register) | ||
| } | ||
| } | ||
|
|
||
| session.sparkContext.listenerBus.addToSharedQueue(this) | ||
|
|
||
| /** | ||
| * Registers the specified [[QueryExecutionListener]]. | ||
| */ | ||
| @DeveloperApi | ||
| def register(listener: QueryExecutionListener): Unit = writeLock { | ||
| listeners += listener | ||
| def register(listener: QueryExecutionListener): Unit = { | ||
| listeners.add(listener) | ||
| } | ||
|
|
||
| /** | ||
| * Unregisters the specified [[QueryExecutionListener]]. | ||
| */ | ||
| @DeveloperApi | ||
| def unregister(listener: QueryExecutionListener): Unit = writeLock { | ||
| listeners -= listener | ||
| def unregister(listener: QueryExecutionListener): Unit = { | ||
| listeners.remove(listener) | ||
| } | ||
|
|
||
| /** | ||
| * Removes all the registered [[QueryExecutionListener]]. | ||
| */ | ||
| @DeveloperApi | ||
| def clear(): Unit = writeLock { | ||
| def clear(): Unit = { | ||
| listeners.clear() | ||
| } | ||
|
|
||
| /** | ||
| * Get an identical copy of this listener manager. | ||
| */ | ||
| @DeveloperApi | ||
| override def clone(): ExecutionListenerManager = writeLock { | ||
| val newListenerManager = new ExecutionListenerManager | ||
| listeners.foreach(newListenerManager.register) | ||
| private[sql] def clone(session: SparkSession): ExecutionListenerManager = { | ||
| val newListenerManager = new ExecutionListenerManager(session, loadExtensions = false) | ||
| listeners.iterator().asScala.foreach(newListenerManager.register) | ||
| newListenerManager | ||
| } | ||
|
|
||
| private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { | ||
| readLock { | ||
| withErrorHandling { listener => | ||
| listener.onSuccess(funcName, qe, duration) | ||
| override def onOtherEvent(event: SparkListenerEvent): Unit = event match { | ||
| case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) => | ||
| val funcName = e.executionName.get | ||
| e.executionFailure match { | ||
| case Some(ex) => | ||
| listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, ex)) | ||
|
||
| case _ => | ||
| listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, e.duration)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { | ||
| readLock { | ||
| withErrorHandling { listener => | ||
| listener.onFailure(funcName, qe, exception) | ||
| } | ||
| } | ||
| case _ => // Ignore | ||
| } | ||
|
|
||
| private[this] val listeners = ListBuffer.empty[QueryExecutionListener] | ||
|
|
||
| /** A lock to prevent updating the list of listeners while we are traversing through them. */ | ||
| private[this] val lock = new ReentrantReadWriteLock() | ||
|
|
||
| private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = { | ||
| for (listener <- listeners) { | ||
| try { | ||
| f(listener) | ||
| } catch { | ||
| case NonFatal(e) => logWarning("Error executing query execution listener", e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** Acquires a read lock on the cache for the duration of `f`. */ | ||
| private def readLock[A](f: => A): A = { | ||
| val rl = lock.readLock() | ||
| rl.lock() | ||
| try f finally { | ||
| rl.unlock() | ||
| } | ||
| } | ||
|
|
||
| /** Acquires a write lock on the cache for the duration of `f`. */ | ||
| private def writeLock[A](f: => A): A = { | ||
| val wl = lock.writeLock() | ||
| wl.lock() | ||
| try f finally { | ||
| wl.unlock() | ||
| } | ||
| private def shouldCatchEvent(e: SparkListenerSQLExecutionEnd): Boolean = { | ||
| // Only catch SQL execution with a name, and triggered by the same spark session that this | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this is what bugs me. You are adding separation between the SparkSession and its listeners, to undo that here. It seems like a bit of a hassle to go through because you basically need async execution.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea. Assuming we have many spark sessions, running queries at the same time. Each session sends query execution events to the central event bus, and sets up a listener to watch its own query execution events, asynchronously. To make it work, the most straightforward way is to carry the session identifier in the events, and the listener only watch events with the expected session identifier. Maybe a better way is to introduce session in the Spark core, so the listener framework can dispatch events w.r.t. session automatically. But that's a lot of work.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we had the same problem in the StreamingQueryListener. You can check how we solved it in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @brkyvz thanks for the information! It seems the |
||
| // listener manager belongs. | ||
| e.executionName.isDefined && e.qe.sparkSession.eq(this.session) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this throw an exception? Imagine if
df.count()threw an exception, and then you run it again.Won't this be a behavior change in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think
resetMetricscan throw exception...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't executedPlan throw an exception? I thought it can if the original spark plan failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see your point here