@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
224224 }
225225
226226 /**
227- * SQLConf and HiveConf contracts: when the hive session is first initialized, params in
228- * HiveConf will get picked up by the SQLConf. Additionally, any properties set by
229- * set() or a SET command inside sql() will be set in the SQLConf *as well as*
230- * in the HiveConf.
227+ * SQLConf and HiveConf contracts:
228+ *
229+ * 1. reuse existing started SessionState if any
230+ * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
231+ * SQLConf. Additionally, any properties set by set() or a SET command inside sql() will be
232+ * set in the SQLConf *as well as* in the HiveConf.
231233 */
232- @ transient lazy val hiveconf = new HiveConf (classOf [SessionState ])
233- @ transient protected [hive] lazy val sessionState = {
234- val ss = new SessionState (hiveconf)
235- setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
236- SessionState .start(ss)
237- ss.err = new PrintStream (outputBuffer, true , " UTF-8" )
238- ss.out = new PrintStream (outputBuffer, true , " UTF-8" )
239-
240- ss
241- }
234+ @ transient protected [hive] lazy val (hiveconf, sessionState) =
235+ Option (SessionState .get())
236+ .orElse {
237+ val newState = new SessionState (new HiveConf (classOf [SessionState ]))
238+ // Only starts newly created `SessionState` instance. Any existing `SessionState` instance
239+ // returned by `SessionState.get()` must be the most recently started one.
240+ SessionState .start(newState)
241+ Some (newState)
242+ }
243+ .map { state =>
244+ setConf(state.getConf.getAllProperties)
245+ if (state.out == null ) state.out = new PrintStream (outputBuffer, true , " UTF-8" )
246+ if (state.err == null ) state.err = new PrintStream (outputBuffer, true , " UTF-8" )
247+ (state.getConf, state)
248+ }
249+ .get
242250
243251 override def setConf (key : String , value : String ): Unit = {
244252 super .setConf(key, value)
@@ -288,8 +296,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
288296 val cmd_1 : String = cmd_trimmed.substring(tokens(0 ).length()).trim()
289297 val proc : CommandProcessor = HiveShim .getCommandProcessor(Array (tokens(0 )), hiveconf)
290298
299+ // Makes sure the session represented by the `sessionState` field is activated. This implies
300+ // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
301+ // session isolation under multi-user scenarios (i.e. HiveThriftServer2).
302+ // TODO Fix session isolation
303+ SessionState .start(sessionState)
304+
291305 proc match {
292306 case driver : Driver =>
307+ driver.init()
293308 val results = HiveShim .createDriverResultsArray
294309 val response : CommandProcessorResponse = driver.run(cmd)
295310 // Throw an exception if there is an error in query processing.
0 commit comments