Skip to content

Conversation

@vinooganesh
Copy link
Contributor

@vinooganesh vinooganesh commented Apr 5, 2020

What changes were proposed in this pull request?

This change was made as a result of the conversation on https://issues.apache.org/jira/browse/SPARK-31354 and is intended to continue work from that ticket here.

This change fixes a memory leak where SparkSession listeners are never cleared off of the SparkContext listener bus.

Before running this PR, the following code:

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

would result in a SparkContext with the following listeners on the listener bus:

[org.apache.spark.status.AppStatusListener@5f610071,
org.apache.spark.HeartbeatReceiver@d400c17,
org.apache.spark.sql.SparkSession$$anon$1@25849aeb, <-First instance
org.apache.spark.sql.SparkSession$$anon$1@fadb9a0] <- Second instance

After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus:

[org.apache.spark.status.AppStatusListener@5f610071,
org.apache.spark.HeartbeatReceiver@d400c17,
org.apache.spark.sql.SparkSession$$anon$1@25849aeb] <-One instance

How was this patch tested?

  • Unit test included as a part of the PR

@vinooganesh
Copy link
Contributor Author

Hey @srowen - could I get an "ok to test" on here? also thoughts on this approach?

@srowen
Copy link
Member

srowen commented Apr 8, 2020

Jenkins test this please

@SparkQA
Copy link

SparkQA commented Apr 8, 2020

Test build #120948 has finished for PR 28128 at commit 99c5f64.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vinooganesh
Copy link
Contributor Author

hey @srowen - apologies for the repeated ping but I think you had some thoughts about this one. is this approach preferred compared to my previous one?

tagging some interested parties from the other ticket as well @cloud-fan @jiangxb1987 @rdblue @HyukjinKwon @britishbadger

@cloud-fan
Copy link
Contributor

I'm not against adding a clear method to free a session, but I'd like to understand more about the memory leak.

According to #24807 (comment) , if the memory leak is caused by the listener, can we simply register that listener only once? That listener just null out the defaultSession at the end of application, and I don't see why we need to add one listener per spark session.

@vinooganesh
Copy link
Contributor Author

Hi @cloud-fan - Good question! A few thoughts here:

  1. I've seen people use the per session listener for various monitoring functions on a per session basis (ie. custom monitoring tools). So I think there is an expectation that there is a listener per session - which intuitively also makes sense since these are intended as short-lived, monitorable sessions.
  2. The default session is a JVM wide singleton. Spinning up multiple spark sessions (multi-tenant inside a JVM) is also an SOP that firms have developed (see the comment here [SPARK-27958][SQL] Stopping a SparkSession should not always stop Spark Context #24807 (comment)). So nulling out the JVM wide default session once one particular active session is terminated removes the default session for every single other currently-existing session.

Does that make sense?

@cloud-fan
Copy link
Contributor

per-session listeners is a common use case. Does the new method remove these user-registered per-session listeners? (registered by session.listenerManager)

AFAIK the reported issue is memory leak. If it's caused by listeners, I think there are 2 ways to fix it:

  1. add an explicit API to remove listeners
  2. make the listeners not reference the session instance.

Can you explain more about how we leak the spark session instance? Then we can discuss how to fix it.

@vinooganesh
Copy link
Contributor Author

Hey @cloud-fan - Sure, right now the listener issue is coupled with the operating model for SparkSessions (which is where I think the confusion is coming from).

Currently, every time a SparkSession is created, a listener is attached to the SparkContext - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L947.

Once the listener is created, because of the way it is created, the reference is lost after it is attached to the SparkContext. This means that there is currently no way to remove the listener from the SparkContext (even after the lifetime of the session is "done"). Even if I call clearActiveSession() or clearDefaultSession() the listener continues to live on the SparkContext, even after the sessions are GCed away. This create an issue for JVMs with multi-tenancy. Many SparkSessions may be spun up , but without a clean way to remove the listener, we leak it. This is the listener memory leak.

I think my PR description may have been unclear- there isn't a leak of the SparkSessions instances themselves (at least that I'm aware of).

The reason this PR does more than just removing the listeners, is because there isn't a lifecycle method that actually allows to mark the end of a SparkSession that doesn't kill the underlying SparkContext (stop() - which then kills all other sessions that rely on the SparkContext staying alive). The strangeness here is the interaction between SparkSessions, which should be lightweight and easy to clean upwith the longer-livedSparkContextwhich should exist for the duration of all aliveSparkSession`.

So, in order to ever clean up the listener leak, we need a way to mark the session as over, and that currently doesn't exist.

Does that make sense?

@cloud-fan
Copy link
Contributor

If the only problem is https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L947

Can we just register it only once? The listener watches the SparkListenerApplicationEnd, which can happen only once for a Spark application. There is really no point in having many listeners do the same thing when the application ends.

@vinooganesh
Copy link
Contributor Author

@cloud-fan - Okay, I've pushed a fix to immediately resolve this leak. The context keep track of whether the SparkSession SparkListenerApplicationEnd` listener has been registered. I've also renamed the PR to make sure it's clear that this PR doesn't correspond to the lifecycle spark session methods.

Does this approach make sense to you?

val startTime = System.currentTimeMillis()

private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)
private[spark] val sessionListenerRegistered: AtomicBoolean = new AtomicBoolean(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this to object SparkSession?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not easily - that was my original plan, however a SparkSession object can be constructed with an instance of SparkContext. Meaning that it's really on the SparkContext to know whether a session has already attached a listener. The SparkSession instance can then decide whether or not to attach the listener, but the context will need to report whether the listener is attached first.

Copy link
Contributor

@cloud-fan cloud-fan May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I missed anything. object SparkSession is a singleton and it can make sure we do something only once. Are you saying that we can register a session cleanup listener to SparkContext outside of SparkSession?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so the challenge is that even though SparkSession is a singleton, I can create various spark session instances that are all "backed" by a different spark context instance.

val conf = new SparkConf()
      .setMaster("local")
      .setAppName("test-app-SPARK-31354-1")
    val context = new SparkContext(conf)

    val session1 = SparkSession.builder()
      .master("local")
      .sparkContext(context) // <- here
      .getOrCreate()

So, the state actually needs to moreso be on the context, since the context is the only one that knows whether it has already had a session listener attached. I agree that this feels really weird, and that's kind of why I was initially pursuing adding lifecycle methods to encapsulate the state in one place. Definitely open to other ways to resolve this though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can assume there is only one SparkContext instance. See #23311

One problem is we create a SparkContext, stop it and create it again. Then it's not safe to rely on the flag in singleton object SparkSession

Copy link
Contributor

@cloud-fan cloud-fan May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we really want to guide against the corner case, we can track the SparkContext instance id in object SparkSession. If SparkContext is re-created, we can reset the boolean flag.

It's a bit hacky to leak the session concept to the core module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - updated, how does this look?

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented May 20, 2020

Test build #122886 has finished for PR 28128 at commit e5563a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vinooganesh
Copy link
Contributor Author

Thanks, @cloud-fan and @stczwd. Did some cleanup and added a test.

If anyone was curious, before this PR, running the following

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

would result in a SparkContext with the following listeners on the listener bus:

[org.apache.spark.status.AppStatusListener@5f610071,
org.apache.spark.HeartbeatReceiver@d400c17,
org.apache.spark.sql.SparkSession$$anon$1@25849aeb, <-First instance
org.apache.spark.sql.SparkSession$$anon$1@fadb9a0] <- Second instance

After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus:

[org.apache.spark.status.AppStatusListener@5f610071,
org.apache.spark.HeartbeatReceiver@d400c17,
org.apache.spark.sql.SparkSession$$anon$1@25849aeb] <-One instance

@SparkQA
Copy link

SparkQA commented May 20, 2020

Test build #122905 has finished for PR 28128 at commit e5ef33a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


/** Register the AppEnd listener onto the Context */
private def registerContextListener(sparkContext: SparkContext): Unit = {
if (!SparkSession.listenerRegistered.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we are in the same class so we can remove SparkSession.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

defaultSession.set(null)
}
})
SparkSession.listenerRegistered.set(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

@SparkQA
Copy link

SparkQA commented May 21, 2020

Test build #122929 has finished for PR 28128 at commit cfa1462.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

cloud-fan pushed a commit that referenced this pull request May 21, 2020
…End listener

## What changes were proposed in this pull request?

This change was made as a result of the conversation on https://issues.apache.org/jira/browse/SPARK-31354 and is intended to continue work from that ticket here.

This change fixes a memory leak where SparkSession listeners are never cleared off of the SparkContext listener bus.

Before running this PR, the following code:
```
SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
```
would result in a SparkContext with the following listeners on the listener bus:
```
[org.apache.spark.status.AppStatusListener5f610071,
org.apache.spark.HeartbeatReceiverd400c17,
org.apache.spark.sql.SparkSession$$anon$125849aeb, <-First instance
org.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- Second instance
```
After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus:
```
[org.apache.spark.status.AppStatusListener5f610071,
org.apache.spark.HeartbeatReceiverd400c17,
org.apache.spark.sql.SparkSession$$anon$125849aeb] <-One instance
```
## How was this patch tested?

* Unit test included as a part of the PR

Closes #28128 from vinooganesh/vinooganesh/SPARK-27958.

Lead-authored-by: Vinoo Ganesh <[email protected]>
Co-authored-by: Vinoo Ganesh <[email protected]>
Co-authored-by: Vinoo Ganesh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit dae7988)
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan cloud-fan closed this in dae7988 May 21, 2020
@advancedxy
Copy link
Contributor

Hi, @vinooganesh and @cloud-fan
I'd like to point out that this pr doesn't fix the memory leaky completely. Once SessionState is touched, it will add two more listeners into the SparkContext, namely SQLAppStatusListener and ExecutionListenerBus

It can be reproduced easily as

  test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("test-app-SPARK-31354-1")
    val context = new SparkContext(conf)
    SparkSession
      .builder()
      .sparkContext(context)
      .master("local")
      .getOrCreate()
      .sessionState // this touches the sessionState
    val postFirstCreation = context.listenerBus.listeners.size()
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()

    SparkSession
      .builder()
      .sparkContext(context)
      .master("local")
      .getOrCreate()
      .sessionState // this touches the sessionState
    val postSecondCreation = context.listenerBus.listeners.size()
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()
    assert(postFirstCreation == postSecondCreation)
  }

image

@vinooganesh
Copy link
Contributor Author

@advancedxy - can you file a new jira ticket? I can look into fixing this.

@advancedxy
Copy link
Contributor

@advancedxy - can you file a new jira ticket? I can look into fixing this.

@vinooganesh filed https://issues.apache.org/jira/browse/SPARK-32165

Alexis-D pushed a commit to Alexis-D/spark that referenced this pull request Dec 9, 2020
…End listener

This change was made as a result of the conversation on https://issues.apache.org/jira/browse/SPARK-31354 and is intended to continue work from that ticket here.

This change fixes a memory leak where SparkSession listeners are never cleared off of the SparkContext listener bus.

Before running this PR, the following code:
```
SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

SparkSession.builder().master("local").getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
```
would result in a SparkContext with the following listeners on the listener bus:
```
[org.apache.spark.status.AppStatusListener5f610071,
org.apache.spark.HeartbeatReceiverd400c17,
org.apache.spark.sql.SparkSession$$anon$125849aeb, <-First instance
org.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- Second instance
```
After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus:
```
[org.apache.spark.status.AppStatusListener5f610071,
org.apache.spark.HeartbeatReceiverd400c17,
org.apache.spark.sql.SparkSession$$anon$125849aeb] <-One instance
```

* Unit test included as a part of the PR

Closes apache#28128 from vinooganesh/vinooganesh/SPARK-27958.

Lead-authored-by: Vinoo Ganesh <[email protected]>
Co-authored-by: Vinoo Ganesh <[email protected]>
Co-authored-by: Vinoo Ganesh <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants