Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Nov 17, 2016

What changes were proposed in this pull request?

The methods such as Dataset.show and take use Limit (i.e., CollectLimitExec) which leverages SparkPlan.executeTake to efficiently collect required number of elements back to the driver.

However, under wholestage codege, we usually release resources after all elements are consumed (e.g., HashAggregate). In this case, we will not release the resources and cause memory leak with Dataset.show, for example.

An exception will be thrown in this case. This exception is thrown at Executor after it calls taskMemoryManager.cleanUpAllAllocatedMemory and finds there are memory not released after the task completion.

The exception looks like:

[info] - SPARK-18487: Consume all elements for show/take to avoid memory leak *** FAILED *** (1 second, 73 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 179.0 failed 1 times, most recent failure: Lost task 0.0 in stage 179.0 (TID 501, localhost, executor driver): org.apache.spark.SparkException: Managed memory leak detected; size = 33816576 bytes, TID = 501
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:295)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]  at java.lang.Thread.run(Thread.java:745)
[info] 
[info] Driver stacktrace:
[info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentS tages(DAGScheduler.scala:1436)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1424)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1423)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[info]   at scala.Option.foreach(Option.scala:257)
...

This pr adds task completion listener to HashAggregate to avoid the memory leak.

How was this patch tested?

Added test in DatasetSuite.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

@SparkQA
Copy link

SparkQA commented Nov 17, 2016

Test build #68763 has finished for PR 15916 at commit 93c93da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

@viirya don't we use task completion listeners for this?

@viirya
Copy link
Member Author

viirya commented Nov 18, 2016

@hvanhovell Thanks for the hint. I am trying it. However this listener would be in operators case by case, e.g., HashAggregate here. If there are any other operators needed to release resources after all rows iterated, we may need to add similar listeners too.

Originally I am little worrying the possible performance regression due to extra consuming of row. With task completion listener, this is not an issue.

@viirya viirya changed the title [SPARK-18487][SQL] Consume all elements for Dataset.show/take to avoid memory leak [SPARK-18487][SQL] Add completion listener to HashAggregatetask to avoid memory leak Nov 18, 2016
@rxin
Copy link
Contributor

rxin commented Nov 18, 2016

Did you actually see an issue with memory leak?

@viirya
Copy link
Member Author

viirya commented Nov 18, 2016

@rxin Yeah, the added test case will report memory leak failure before this patch.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2016

What do you mean by "report"? A warning message was logged? If a warning message was logged, it is generated by the callback itself which just releases the memory.

@viirya
Copy link
Member Author

viirya commented Nov 18, 2016

@rxin An exception will be thrown, not just a warning message. This exception is thrown at Executor after it calls taskMemoryManager.cleanUpAllAllocatedMemory and finds there are memory not released after the task completion.

The exception looks like:

[info] - SPARK-18487: Consume all elements for show/take to avoid memory leak *** FAILED *** (1 second, 73 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 179.0 failed 1 times, most recent failure: Lost task 0.0 in stage 179.0 (TID 501, localhost, executor driver): org.apache.spark.SparkException: Managed memory leak detected; size = 33816576 bytes, TID = 501
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:295)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]  at java.lang.Thread.run(Thread.java:745)
[info] 
[info] Driver stacktrace:
[info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentS tages(DAGScheduler.scala:1436)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1424)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1423)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
[info]   at scala.Option.foreach(Option.scala:257)
...

@viirya viirya changed the title [SPARK-18487][SQL] Add completion listener to HashAggregatetask to avoid memory leak [SPARK-18487][SQL] Add completion listener to HashAggregate to avoid memory leak Nov 18, 2016
@viirya
Copy link
Member Author

viirya commented Nov 18, 2016

Of course it is not actually a real memory leak because the memory is released at the end by calling taskMemoryManager.cleanUpAllAllocatedMemory in Executor. But with spark.unsafe.exceptionOnMemoryLeak as true by default (in test), we will see the exception.

Or we just need to turn spark.unsafe.exceptionOnMemoryLeak to false by default in test? Although I don't think this is a good idea.

@SparkQA
Copy link

SparkQA commented Nov 18, 2016

Test build #68818 has finished for PR 15916 at commit 2f304f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2016

I'd set it maybe to false. You are just adding a completion listener, which is the same as taskMemoryManager.cleanUpAllAllocatedMemory anyway ...

@viirya
Copy link
Member Author

viirya commented Nov 18, 2016

Yeah, I see. As I said in previous comment, the memory is released at the end anyway.

I would guess the default setting as true is to find potential memory leak during development. So turn it to false is a good idea?

This patch is coming from #15874 which hits the exception by @sethah during test.

Although taskMemoryManager.cleanUpAllAllocatedMemory can release memory for us, I think it is just a safety network. Operators should release memory themselves.

If you still think this is not necessary, I can close it.

@SparkQA
Copy link

SparkQA commented Nov 18, 2016

Test build #68824 has finished for PR 15916 at commit fa1d1fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Nov 19, 2016

@hvanhovell @rxin What do you think? Please let me know if we need this or not. Thanks.

@viirya
Copy link
Member Author

viirya commented Nov 25, 2016

Hello @rxin @hvanhovell Can you let me know if this is needed or not? So I can close it. Thank you.

@rxin
Copy link
Contributor

rxin commented Nov 25, 2016

No it's not needed if you are using exactly the same way as the catch-all to release memory. It is basically just the catch-all itself.

@viirya
Copy link
Member Author

viirya commented Nov 25, 2016

@rxin Thanks. Appreciate your feedback. I could close this now.

@viirya viirya closed this Nov 25, 2016
@viirya
Copy link
Member Author

viirya commented Nov 26, 2016

@rxin BTW, I see you merged #15989 to downgrade error message level in TaskMemoryManager.

I'd like to modify the error message in Executor too, because the current one is little confusing to developers if they don't know this part exactly and they would think there is memory leak happened.

What do you think? If it is ok for you, I'd submit a pr for it.

Thanks.

@rxin
Copy link
Contributor

rxin commented Nov 26, 2016

Can you show an example of a leak that would happen in Executor but not in the callback? Thanks.

@viirya
Copy link
Member Author

viirya commented Nov 26, 2016

The test case I added in this pr:

val rng = new scala.util.Random(42)
val data = sparkContext.parallelize(Seq.tabulate(100) { i =>
  Row(Array.fill(10)(rng.nextInt(10)))
})
val schema = StructType(Seq(
  StructField("arr", DataTypes.createArrayType(DataTypes.IntegerType))
))
val df = spark.createDataFrame(data, schema)
val exploded = df.select(struct(col("*")).as("star"), explode(col("arr")).as("a"))
val joined = exploded.join(exploded, "a").drop("a").distinct()
joined.show()

would thrown an exception like this:

[info] - SPARK-18487: Consume all elements for show/take to avoid memory leak *** FAILED *** (1 second, 73 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 179.0 failed 1 times, most recent failure: Lost task 0.0 in stage 179.0 (TID 501, localhost, executor driver): org.apache.spark.SparkException: Managed memory leak detected; size = 33816576 bytes, TID = 501
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:295)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]  at java.lang.Thread.run(Thread.java:745)

I submitted this pr because @sethah encountered this exception during his test. I think it might be other developers hit this in the future. If they don't know this part, from the error message they would think a memory leak happened. In order to avoid this and provide more useful info, I'd like to modify this error message too.

@viirya
Copy link
Member Author

viirya commented Nov 26, 2016

Forget to say, of course, this example will thrown the exception only running in "test".

Other developers would possibly encounter this when they write test codes in the future. If we could provide more info in this error message, we could save their time to investigate this.

What do you think?

@viirya
Copy link
Member Author

viirya commented Nov 28, 2016

ping @rxin Appreciate your feedback if you can let me know what you think of my suggestion. Thanks.

@viirya viirya deleted the fix-show-memory-leak branch December 27, 2023 18:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants