Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import java.io.Closeable
import java.util.concurrent.TimeUnit._
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -711,12 +711,15 @@ class SparkSession private(
// scalastyle:on

/**
* Stop the underlying `SparkContext`.
* Stop the underlying `SparkContext` if there are are no active sessions remaining.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are are -> are.

*
* @since 2.0.0
*/
def stop(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I think this was a design decision that stopping sessions stops spark context too. Why don't you just don't call stop() on the session since what it does it just stops the session? Seems like the behaviour is documented properly as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that if one creates a multi-tenant Spark process, and you give each user a Spark session, you want to be able to close down the resources for one session (e.g. connections to JDBC, perhaps), but not stop the entire Spark Context, thus keeping the Spark Context alive for the other users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sparkContext.stop()
SparkSession.clearActiveSession()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also clear the defaultSession() ? Otherwise, the getOrCreate() method still returns the previously created SparkSession

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this as well and here were my thoughts:

  1. The default session should simply be a "default" set of config that is applied when creating a new SparkSession, unless overrides to that config are specified. The only notion of SparkSessions that we care about is the set of active spark sessions. For that reason, I think clearing the default session may not be necessary here.
  2. We could completely get rid of the notion of default sessions (which I wouldn't be opposed to), and instead require that upon session creation time, each user is required to specify the full list of settings that they want in order to create their Session. How would people feel about this Alternatively, we could say that the default session is a session that is completely hidden from the user and that we grab configs from the first created active SparkSession (using getOrCreate()) to populate the config. This seems seems like confusing behavior to me though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is now we update both activeSession and defaultSession when we create a new SparkSession. If we don't clear the defaultSession here then we will still have defaultSession returned every time we call getOrCreate(). Whether to keep the notion of default session worth another mail thread or PR, and the quickest way to make this PR goes in would be to clear the defaultSession here, IIUC.

if (SparkSession.numActiveSessions.get() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this remove itself from the active sessions before checking that there are 0 remaining?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good catch

sparkContext.stop()
}
}

/**
Expand Down Expand Up @@ -776,6 +779,8 @@ class SparkSession private(
@Stable
object SparkSession extends Logging {

private[spark] val numActiveSessions: AtomicInteger = new AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is error prone and also not easy to debug, we may need to keep track of each active SparkSession in SparkContext. We don't need to pass in the SparkSession instance, maybe only compute the System.identityHashCode() of current SparkSession object and pass that to the SparkContext. On SparkSession.stop() we only need to drop the corresponding identity from SparkContext, and when it's empty it means we can safely stop the SparkContext, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the identity hash code could be an interesting solution here. I'll experiment with that


/**
* Builder for [[SparkSession]].
*/
Expand Down Expand Up @@ -958,6 +963,8 @@ object SparkSession extends Logging {
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
// Should remove listener after this event fires
sparkContext.removeSparkListener(this)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be called in the SparkSession.stop() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be ideal - the problem is that we lost the handle to this listener outside of this method. I could create a global var in the SparkSession to hold a reference to the listener, but that also seems kind of strange. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine to have a global var of SparkListener inside SparkSession.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, will add this in!

}
})
}
Expand All @@ -981,17 +988,30 @@ object SparkSession extends Logging {
* @since 2.0.0
*/
def setActiveSession(session: SparkSession): Unit = {
activeThreadSession.set(session)
if (getActiveSession.isEmpty
|| (session != getActiveSession.get && getActiveSession.isDefined)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.isDefined is checked in the wrong order, but you've already checked it actually with .isEmpty

numActiveSessions.getAndIncrement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I see here is: if I have 2 sessions, and I set one as active then set another. If I keep doing this then the count here will be wrong.

I don't have a good idea to track the last alive session. Even if we can, users may want to create more sessions later and not stop the SparkContext.

The SparkSession should be light-weighted, what's the memory leak you observed before?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem is that the current behaviour is clearly documented. It doesn't look particularly wrong either:

Stop the underlying SparkContext

We're trying to make a behaviour change just based on the new design choice.
Shell we at least keep the compatibility via a switch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, the right way to "free" a session is to leave it and wait for it to be GCed. If there is something that can't be GCed, it's a memory leak and we should fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @HyukjinKwon - thanks for the thoughts.

@cloud-fan - The memory leak is detailed here #24807 (comment).

@HyukjinKwon - I actually think that despite the fact that the current behavior is clearly documented, it actually doesn't make sense and is error prone. I detailed an example here #24807 (comment) of how this could bit us.

Is there a world where a user should be able to call .stop() on a session and forcibly invalidate every other potential session (by killing the context)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the leak is problem, we should fix it rather than changing the behaviour. It is documented and users are relying on this behaviour
We can understand .stop() is like .stopContext(), no?

I don't think we should just change without guarding. All other projects related to Spark such as Zeppelin would need to revisit their behaviour about how to stop, and it would make it difficult them to support multiple Spark versions for instance.

activeThreadSession.set(session)
} else if (session == null) {
this.clearActiveSession()
}
}

/**
* Clears the active SparkSession for current thread. Subsequent calls to getOrCreate will
* return the first created context instead of a thread-local override.
* Clears the active SparkSession for current thread assuming it is defined.
* Subsequent calls to getOrCreate will return the first created context
* instead of a thread-local override.
*
* @since 2.0.0
*/
def clearActiveSession(): Unit = {
activeThreadSession.remove()
if (getActiveSession.isDefined) {
activeThreadSession.remove()
numActiveSessions.decrementAndGet()
} else {
logWarning("Calling clearActiveSession() on a SparkSession " +
"without an active session is a noop.")
}
}

/**
Expand All @@ -1004,12 +1024,17 @@ object SparkSession extends Logging {
}

/**
* Clears the default SparkSession that is returned by the builder.
*
* Clears the default SparkSession that is returned by the builder
* if it is not null.
* @since 2.0.0
*/
def clearDefaultSession(): Unit = {
defaultSession.set(null)
if (getDefaultSession.isDefined) {
defaultSession.set(null)
} else {
logWarning("Calling clearDefaultSession() on a SparkSession " +
"without an default session is a noop.")
}
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jun 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, the existing behavior of this function is a quiet no-op and looks better.

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config.UI.UI_ENABLED


/**
* Test cases for the lifecycle of a [[SparkSession]].
*/
class SparkSessionLifecycleSuite extends SparkFunSuite {
test("test SparkContext stopped when last SparkSession is stopped ") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. test("test -> test(".

val session1 = SparkSession.builder()
.master("local")
.config(UI_ENABLED.key, value = false)
.config("some-config", "a")
.getOrCreate()

assert(!session1.sparkContext.isStopped)

val session2 = SparkSession.builder()
.master("local")
.config(UI_ENABLED.key, value = false)
.config("some-config", "b")
.getOrCreate()

session1.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add assert below this line?

assert(!session1.sparkContext.isStopped)

session2.stop()
assert(session1.sparkContext.isStopped)
}

test("test SparkContext is not stopped when other sessions exist") {
val session1 = SparkSession.builder()
.master("local")
.config(UI_ENABLED.key, value = false)
.config("some-config", "a")
.getOrCreate()

assert(!session1.sparkContext.isStopped)

val session2 = SparkSession.builder()
.master("local")
.config(UI_ENABLED.key, value = false)
.config("some-config", "b")
.getOrCreate()

session1.stop()
assert(!session1.sparkContext.isStopped)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks almost duplicated with the first one.
Also, this test case seems to be dangerous because it doesn't clean up the SparkContext.
Let's remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was torn about this one, but agreed, will remove

}
}