Skip to content

Commit 062163b

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7348][CARMEL-5673] Track the Memory Usage of Large Objects in Driver (apache#126)
1 parent b8dccff commit 062163b

8 files changed

Lines changed: 184 additions & 6 deletions

File tree

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ private case class CleanAccum(accId: Long) extends CleanupTask
4646
private case class CleanCheckpoint(rddId: Int) extends CleanupTask
4747
private case class CleanSparkListener(listener: SparkListener) extends CleanupTask
4848
private case class CleanSpilledPartitionResult(file: File) extends CleanupTask
49+
private case class CleanFileScanRDD(rddId: Int) extends CleanupTask
4950

5051
/**
5152
* A WeakReference associated with a CleanupTask.
@@ -70,6 +71,8 @@ private[spark] class ContextCleaner(
7071
sc: SparkContext,
7172
shuffleDriverComponents: ShuffleDriverComponents) extends Logging {
7273

74+
protected val trackFileScanRddClean = sc.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_FILE_SCAN_RDD)
75+
7376
private val periodicGCService: ScheduledExecutorService =
7477
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
7578

@@ -117,6 +120,8 @@ private[spark] class ContextCleaner(
117120
new CleanSparkListenerCleanupWorker(sc))
118121
contextCleanupWorkers.put(classOf[CleanSpilledPartitionResult].getName,
119122
new SpilledPartitionResultCleanupWorker(sc))
123+
contextCleanupWorkers.put(classOf[CleanFileScanRDD].getName,
124+
new FileScanRDDCleanupWorker(sc))
120125

121126
contextCleanupWorkers.asScala.foreach(_._2.start())
122127

@@ -138,6 +143,18 @@ private[spark] class ContextCleaner(
138143
registerForCleanup(rdd, CleanRDD(rdd.id))
139144
}
140145

146+
/** Register an File Scan RDD for cleanup when it is garbage collected. */
147+
def registerFileScanRDDForCleanup(rdd: RDD[_]): Unit = {
148+
if (trackFileScanRddClean) {
149+
try {
150+
registerForCleanup(rdd, CleanFileScanRDD(rdd.id))
151+
} catch {
152+
case t: Throwable =>
153+
logError(s"Failed to register file scan rdd ${rdd.id}", t)
154+
}
155+
}
156+
}
157+
141158
def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
142159
registerForCleanup(a, CleanAccum(a.id))
143160
}
@@ -315,6 +332,15 @@ abstract private[spark] class ContextCleanupWorker(sc: SparkContext, name: Strin
315332
}
316333

317334
def referenceBufferSize(): Int = referenceBuffer.size()
335+
336+
def visit(f: AnyRef => Unit): Unit = {
337+
referenceBuffer.forEach(r => {
338+
val referent = r.get()
339+
if (referent != null) {
340+
f(referent)
341+
}
342+
})
343+
}
318344
}
319345

320346
private[spark] class BroadcastCleanupWorker(
@@ -510,3 +536,15 @@ private[spark] class SpilledPartitionResultCleanupWorker(sc: SparkContext)
510536
}
511537
}
512538
}
539+
540+
private[spark] class FileScanRDDCleanupWorker(sc: SparkContext)
541+
extends ContextCleanupWorker(sc, classOf[CleanFileScanRDD].getName) with Logging {
542+
543+
override def doCleanup(task: CleanupTask): Unit = {
544+
task match {
545+
case CleanFileScanRDD(rddId) =>
546+
// Noop
547+
case _ =>
548+
}
549+
}
550+
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
677677
def unregisterShuffle(shuffleId: Int): Unit
678678

679679
def stop(): Unit = {}
680+
681+
def shuffleStatusesEstimatedSize(): (Int, Long) = (0, 0L)
680682
}
681683

682684
/**
@@ -1272,6 +1274,10 @@ private[spark] class MapOutputTrackerMaster(
12721274
}
12731275

12741276
def mapOutputRequestQueued: Int = 0
1277+
1278+
override def shuffleStatusesEstimatedSize(): (Int, Long) = {
1279+
(shuffleStatuses.size, SizeEstimator.estimate(shuffleStatuses))
1280+
}
12751281
}
12761282

12771283
/**

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,6 +1823,12 @@ package object config {
18231823
.booleanConf
18241824
.createWithDefault(false)
18251825

1826+
private[spark] val CLEANER_REFERENCE_TRACKING_CLEAN_FILE_SCAN_RDD =
1827+
ConfigBuilder("spark.cleaner.referenceTracking.cleanFileScanRDD")
1828+
.version("3.5.0")
1829+
.booleanConf
1830+
.createWithDefault(false)
1831+
18261832
private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY =
18271833
ConfigBuilder("spark.executor.logs.rolling.strategy")
18281834
.version("1.1.0")

core/src/main/scala/org/apache/spark/scheduler/AnalyticsTaskSchedulerImpl.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,9 @@ private[spark] class AnalyticsTaskSchedulerImpl(
680680
if (TaskState.isFinished(state)) {
681681
cleanupTaskState(tid)
682682
taskSet.runningTasksReadyToUpdate.getAndIncrement()
683+
if (serializedData != null && serializedData.limit() > 0) {
684+
taskSet.totalResultInMemorySize.addAndGet(serializedData.limit())
685+
}
683686
if (state == TaskState.FINISHED) {
684687
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
685688
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,9 @@ private[spark] class TaskSchedulerImpl(
842842
if (TaskState.isFinished(state)) {
843843
cleanupTaskState(tid)
844844
taskSet.runningTasksReadyToUpdate.getAndIncrement()
845+
if (serializedData != null && serializedData.limit() > 0) {
846+
taskSet.totalResultInMemorySize.addAndGet(serializedData.limit())
847+
}
845848
taskSet.removeRunningTask(tid)
846849
if (state == TaskState.FINISHED) {
847850
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData,

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.scheduler._
3939
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
4040
import org.apache.spark.shuffle.ShuffleManager
4141
import org.apache.spark.storage.BlockManagerMessages._
42-
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
42+
import org.apache.spark.util.{RpcUtils, SizeEstimator, ThreadUtils, Utils}
4343

4444
/**
4545
* BlockManagerMasterEndpoint is an [[IsolatedThreadSafeRpcEndpoint]] on the master node to
@@ -956,6 +956,10 @@ class BlockManagerMasterEndpoint(
956956
}
957957

958958
def askQueued: Int = ThreadUtils.queuedSize(askThreadPool)
959+
960+
def blockManagerInfoEstimatedSize(): (Int, Long) = {
961+
(blockManagerInfo.size, SizeEstimator.estimate(blockManagerInfo))
962+
}
959963
}
960964

961965
@DeveloperApi

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,10 +660,12 @@ case class FileSourceScanExec(
660660
}
661661
}
662662

663-
new FileScanRDD(relation.sparkSession, readFile, filePartitions,
663+
val fileScanRDD = new FileScanRDD(relation.sparkSession, readFile, filePartitions,
664664
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
665665
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
666666
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
667+
Option(session).foreach(_.sparkContext.cleaner.foreach(_.registerRDDForCleanup(fileScanRDD)))
668+
fileScanRDD
667669
}
668670

669671
/**
@@ -713,10 +715,12 @@ case class FileSourceScanExec(
713715
val partitions =
714716
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
715717

716-
new FileScanRDD(relation.sparkSession, readFile, partitions,
718+
val fileScanRDD = new FileScanRDD(relation.sparkSession, readFile, partitions,
717719
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
718720
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
719721
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
722+
Option(session).foreach(_.sparkContext.cleaner.foreach(_.registerRDDForCleanup(fileScanRDD)))
723+
fileScanRDD
720724
}
721725

722726
// Filters unused DynamicPruningExpression expressions - one which has been replaced

sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,21 @@ import javax.ws.rs.core.MediaType
2424

2525
import scala.util.{Failure, Success, Try}
2626

27+
import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode}
2728
import com.google.common.cache.{Cache, CacheBuilder}
2829

29-
import org.apache.spark.{JobExecutionStatus, SparkEnv}
30+
import org.apache.spark.{CleanFileScanRDD, JobExecutionStatus, SparkContext, SparkEnv}
3031
import org.apache.spark.internal.Logging
32+
import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_FILE_SCAN_RDD
33+
import org.apache.spark.scheduler.TaskSummary
3134
import org.apache.spark.sql.execution.BroadcastRelationManager
3235
import org.apache.spark.sql.execution.BroadcastRelationManager.BroadcastRelationInfo
33-
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData}
36+
import org.apache.spark.sql.execution.datasources.FileScanRDD
37+
import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, _}
3438
import org.apache.spark.status.AppStatusStore
3539
import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException, StageData, StageStatus}
36-
import org.apache.spark.util.ThreadUtils
40+
import org.apache.spark.storage.{BlockManagerMaster, BlockManagerMasterEndpoint}
41+
import org.apache.spark.util.{SizeEstimator, ThreadUtils}
3742

3843
@Produces(Array(MediaType.APPLICATION_JSON))
3944
private[v1] class SqlResource extends BaseAppResource with Logging {
@@ -98,6 +103,115 @@ private[v1] class SqlResource extends BaseAppResource with Logging {
98103
}
99104
}
100105

106+
@GET
107+
@Path("summary")
108+
def memorySummary(
109+
@DefaultValue("true") @QueryParam("shuffle") includeShuffle: Boolean,
110+
@DefaultValue("true") @QueryParam("broadcast") includeBroadcast: Boolean,
111+
@DefaultValue("true") @QueryParam("task") includeTask: Boolean,
112+
@DefaultValue("true") @QueryParam("fileScanRDD") includeFileScanRDD: Boolean,
113+
@DefaultValue("true") @QueryParam("block") includeBlockManagerInfo: Boolean): ObjectNode = {
114+
logInfo("Received request for summary")
115+
116+
val res = new ObjectNode(JsonNodeFactory.instance)
117+
try {
118+
if (includeShuffle) {
119+
val shuffleStatus = SparkEnv.get.mapOutputTracker.shuffleStatusesEstimatedSize()
120+
121+
val shuffle = new ObjectNode(JsonNodeFactory.instance)
122+
shuffle.put("count", shuffleStatus._1)
123+
shuffle.put("estimatedSize", shuffleStatus._2)
124+
res.set("shuffleStatus", shuffle)
125+
}
126+
127+
if (includeBroadcast) {
128+
val count = BroadcastRelationManager.allInMemBroadcastRelations().size
129+
val relationsSize = BroadcastRelationManager.allInMemBroadcastRelations().
130+
map(_.size).sum
131+
132+
val broadcast = new ObjectNode(JsonNodeFactory.instance)
133+
broadcast.put("count", count)
134+
broadcast.put("relationsSize", relationsSize)
135+
res.set("broadcastRelation", broadcast)
136+
}
137+
138+
SparkContext.getActive.foreach(sc => {
139+
if (includeTask) {
140+
val taskSummary = sc.taskScheduler.taskSummary()
141+
res.set("task", taskSummaryToJson(taskSummary))
142+
}
143+
144+
if (includeFileScanRDD) {
145+
var count = 0L
146+
var estimatedSize = 0L
147+
try {
148+
val contextCleanupWorker = sc.cleaner.get.
149+
getContextCleanupWorker(classOf[CleanFileScanRDD].getName)
150+
151+
if (sc.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_FILE_SCAN_RDD)) {
152+
contextCleanupWorker.visit(r => {
153+
if (r != null && r.isInstanceOf[FileScanRDD]) {
154+
estimatedSize += SizeEstimator.estimate(
155+
r.asInstanceOf[FileScanRDD].filePartitions)
156+
count += 1
157+
}
158+
})
159+
}
160+
} catch {
161+
case t: Throwable =>
162+
logError("Failed to get fileScanRDDSize", t)
163+
} finally {
164+
val fileScanRDD = new ObjectNode(JsonNodeFactory.instance)
165+
fileScanRDD.put("count", count)
166+
fileScanRDD.put("estimatedSize", estimatedSize)
167+
res.set("fileScanRDD", fileScanRDD)
168+
}
169+
}
170+
})
171+
172+
if (includeBlockManagerInfo) {
173+
try {
174+
val endpoint = SparkEnv.get.rpcEnv.getEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME)
175+
if (endpoint != null) {
176+
val (count, estimatedSize) = endpoint.
177+
asInstanceOf[BlockManagerMasterEndpoint].blockManagerInfoEstimatedSize()
178+
179+
val blockManagerInfo = new ObjectNode(JsonNodeFactory.instance)
180+
blockManagerInfo.put("count", count)
181+
blockManagerInfo.put("estimatedSize", estimatedSize)
182+
res.set("blockManagerInfo", blockManagerInfo)
183+
}
184+
} catch {
185+
case t: Throwable =>
186+
logError("Failed to get blockManagerInfoEstimateSize", t)
187+
}
188+
}
189+
190+
res.put("valid", true)
191+
} catch {
192+
case e: Exception =>
193+
res.put("valid", false)
194+
res.put("message", e.getMessage)
195+
}
196+
logInfo(s"driver summary: ${res}")
197+
res
198+
}
199+
200+
def taskSummaryToJson(s: TaskSummary): ObjectNode = {
201+
val res = new ObjectNode(JsonNodeFactory.instance)
202+
res.put("totalTasks", s.totalTasks)
203+
res.put("runningTasks", s.runningTasks)
204+
res.put("successfulTasks", s.successfulTasks)
205+
res.put("zombieTasks", s.zombieTasks)
206+
res.put("zombieTaskSets", s.zombieTaskSets)
207+
res.put("activeTaskSets", s.activeTaskSets)
208+
res.put("freeCores", s.freeCores)
209+
res.put("runningTasksReadyToUpdate", s.runningTasksReadyToUpdate)
210+
res.put("totalResultSize", s.totalResultSize)
211+
res.put("totalResultInMemorySize", s.totalResultInMemorySize)
212+
res
213+
}
214+
101215
@GET
102216
@Path("context-cleaner")
103217
def contextCleaner(): Seq[(String, Int)] = {

0 commit comments

Comments
 (0)