-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2 #2887
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { | |||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * SQLConf and HiveConf contracts: when the hive session is first initialized, params in | ||||||
| * HiveConf will get picked up by the SQLConf. Additionally, any properties set by | ||||||
| * set() or a SET command inside sql() will be set in the SQLConf *as well as* | ||||||
| * in the HiveConf. | ||||||
| * SQLConf and HiveConf contracts: | ||||||
| * | ||||||
| * 1. reuse existing started SessionState if any | ||||||
| * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the | ||||||
| * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be | ||||||
| * set in the SQLConf *as well as* in the HiveConf. | ||||||
| */ | ||||||
| @transient lazy val hiveconf = new HiveConf(classOf[SessionState]) | ||||||
| @transient protected[hive] lazy val sessionState = { | ||||||
| val ss = new SessionState(hiveconf) | ||||||
| setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. | ||||||
| SessionState.start(ss) | ||||||
| ss.err = new PrintStream(outputBuffer, true, "UTF-8") | ||||||
| ss.out = new PrintStream(outputBuffer, true, "UTF-8") | ||||||
|
|
||||||
| ss | ||||||
| } | ||||||
| @transient protected[hive] lazy val (hiveconf, sessionState) = | ||||||
| Option(SessionState.get()) | ||||||
| .orElse { | ||||||
| val newState = new SessionState(new HiveConf(classOf[SessionState])) | ||||||
| // Only starts newly created `SessionState` instance. Any existing `SessionState` instance | ||||||
| // returned by `SessionState.get()` must be the most recently started one. | ||||||
| SessionState.start(newState) | ||||||
| Some(newState) | ||||||
| } | ||||||
| .map { state => | ||||||
| setConf(state.getConf.getAllProperties) | ||||||
| if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8") | ||||||
| if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8") | ||||||
| (state.getConf, state) | ||||||
| } | ||||||
| .get | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should SessionState.start(state) be invoked here? Otherwise, it relies on other code to initialize the state, and we lose the track whether the state is initialized, and other code may call SessionState.start() multiple times, resulting in unexpected behavior. Correct me if I am wrong.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't call However, add another
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, we may call SessionState.start with the session state multiple times. In my point of view, we should check whether SessionState.get is null or not, and then decide whether we need to call SessionState.start.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should consider initializing the session state in the constructor, maybe by forcing the realization of the lazy val. Matei just bumped into a problem where running "SHOW TABLES" as the first command null pointers. Lets make sure that this PR fixes those sorts of problems too.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, all commands/DDLs that are directly delegated to Hive suffer this issue, and this PR fixes it. |
||||||
|
|
||||||
| override def setConf(key: String, value: String): Unit = { | ||||||
| super.setConf(key, value) | ||||||
|
|
@@ -288,8 +296,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { | |||||
| val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() | ||||||
| val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) | ||||||
|
|
||||||
| // Makes sure the session represented by the `sessionState` field is activated. This implies | ||||||
| // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks | ||||||
| // session isolation under multi-user scenarios (i.e. HiveThriftServer2). | ||||||
| // TODO Fix session isolation | ||||||
| SessionState.start(sessionState) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above comments. Here we may start the same sessionState in the same thread multiple times. if (SessionState.get == null) SessionState.start(sessionState)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, Another issue that really puzzles me is this line from the Jenkins test failure log: The line number suggests that apparently Hive 0.13.1 was used (see here), but this Jenkins build was triggered with Anyway, I'll follow your suggestion to avoid starting
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, to avoid multiple start, I guess this is better?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think now the default run-test is hive-0.13. So that is right.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see where the problem is: the assembly jar is built with [1] Line 143 in 7768a80
[2] Line 170 in 7768a80
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I check with some hive experts here, and was told that the session state should not be started again and again. It may work, but not guaranteed. In hive, there is no such use case.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, thanks for this important information!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, just reproduced the above Jenkins build failure locally by:
Will fix |
||||||
|
|
||||||
| proc match { | ||||||
| case driver: Driver => | ||||||
| driver.init() | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I check the function getCommandProcessor and found it has already initialize the driver, and we don't need to init it again.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, thanks. |
||||||
| val results = HiveShim.createDriverResultsArray | ||||||
| val response: CommandProcessorResponse = driver.run(cmd) | ||||||
| // Throw an exception if there is an error in query processing. | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This SET command is used as a regression test of SPARK-4037.