-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20213][SQL][follow-up] introduce SQLExecution.ignoreNestedExecutionId #18419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ object SQLExecution { | |
|
|
||
| val EXECUTION_ID_KEY = "spark.sql.execution.id" | ||
|
|
||
| private val IGNORE_NESTED_EXECUTION_ID = "spark.sql.execution.ignoreNestedExecutionId" | ||
|
|
||
| private val _nextExecutionId = new AtomicLong(0) | ||
|
|
||
| private def nextExecutionId: Long = _nextExecutionId.getAndIncrement | ||
|
|
@@ -85,6 +87,9 @@ object SQLExecution { | |
| sc.setLocalProperty(EXECUTION_ID_KEY, null) | ||
| } | ||
| r | ||
| } else if (sc.getLocalProperty(IGNORE_NESTED_EXECUTION_ID) != null) { | ||
| // If `IGNORE_NESTED_EXECUTION_ID` is set, just ignore this new execution id. | ||
| body | ||
| } else { | ||
| // Don't support nested `withNewExecutionId`. This is an example of the nested | ||
| // `withNewExecutionId`: | ||
|
|
@@ -100,7 +105,9 @@ object SQLExecution { | |
| // all accumulator metrics will be 0. It will confuse people if we show them in Web UI. | ||
| // | ||
| // A real case is the `DataFrame.count` method. | ||
| throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set") | ||
| throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set, please wrap your " + | ||
| "action with SQLExecution.ignoreNestedExecutionId if you don't want to track the Spark " + | ||
| "jobs issued by the nested execution.") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -118,4 +125,19 @@ object SQLExecution { | |
| sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wrap an action which may have nested execution id. This method can be used to run an execution | ||
| * inside another execution, e.g., `CacheTableCommand` need to call `Dataset.collect`. | ||
|
||
| */ | ||
| def ignoreNestedExecutionId[T](sparkSession: SparkSession)(body: => T): T = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although we ignore nested execution id, the job stages and metrics created by the body here will still be recorded into the |
||
| val sc = sparkSession.sparkContext | ||
| val allowNestedPreviousValue = sc.getLocalProperty(IGNORE_NESTED_EXECUTION_ID) | ||
| try { | ||
| sc.setLocalProperty(IGNORE_NESTED_EXECUTION_ID, "true") | ||
| body | ||
| } finally { | ||
| sc.setLocalProperty(IGNORE_NESTED_EXECUTION_ID, allowNestedPreviousValue) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nested execution is a developer problem, not a user problem. That's why the original PR did not throw
IllegalArgumentExceptionoutside of testing. I think that should still be how this is handled.If this is thrown at runtime, adding the text about
ignoreNestedExecutionIdis confusing for users, who can't (or shouldn't) set it. A comment is more appropriate if users will see this message. If the change to only throw during testing is added, then I think it is fine to add the text to the exception.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLExecutionis kind of a developer API, people who develop data source may need to callignoreNestedExecutionIdinside their data source implementation, as reading/writing data source will be run inside a command and they may hit the nested execution problem. What do you think?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that this is an easy error to hit and it shouldn't affect end users. It is better to warn that something is wrong than to fail a job that would otherwise succeed for a bug in Spark. As for the error message, I think it is fine if we intend to leave it in. I'd just rather not fail user jobs here.
I assume that DataSource developers will have tests, but probably not ones that know to set spark.testing. Is there a better way to detect test cases?