Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -830,13 +830,10 @@ class SparkContext(
setLocalProperty("externalCallSite", null)
}

/** Capture the current user callsite and return a formatted version for printing. If the user
* has overridden the call site, this will return the user's version. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format of this doc comment seems wrong

private[spark] def getCallSite(): String = {
val callSite = getLocalProperty("externalCallSite")
if (callSite == null) {
Utils.formatSparkCallSite
} else {
callSite
}
Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
}

/**
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))

def generator: String = rdd.generator

override def toString = rdd.toString

/** Assign a name to this RDD */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.api.java

import java.util.{Comparator, List => JList}

import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

def name(): String = rdd.name

/** Reset generator */
def setGenerator(_generator: String) = {
rdd.setGenerator(_generator)
}
}
19 changes: 5 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
this
}

/** User-defined generator of this RDD*/
@transient var generator = Utils.getCallSiteInfo.firstUserClass

/** Reset generator*/
def setGenerator(_generator: String) = {
generator = _generator
}

/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
Expand Down Expand Up @@ -1031,8 +1023,10 @@ abstract class RDD[T: ClassTag](

private var storageLevel: StorageLevel = StorageLevel.NONE

/** Record user function generating this RDD. */
@transient private[spark] val origin = sc.getCallSite()
/** Info about the function call site where this was created (e.g. `textFile`, `parallelize`). */
@transient private[spark] val callSiteInfo = Utils.getCallSiteInfo

private[spark] def getCallSite = Utils.formatCallSiteInfo(callSiteInfo)

private[spark] def elementClassTag: ClassTag[T] = classTag[T]

Expand Down Expand Up @@ -1095,10 +1089,7 @@ abstract class RDD[T: ClassTag](
}

override def toString: String = "%s%s[%d] at %s".format(
Option(name).map(_ + " ").getOrElse(""),
getClass.getSimpleName,
id,
origin)
Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCallSite)

def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class DAGScheduler(
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCallSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
* @param indent Indent number before info
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
val rddInfo =
if (rdd.getStorageLevel != StorageLevel.NONE) {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
rdd.origin + " " + rdd.generator
} else {
"RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
rdd.origin + " " + rdd.generator
}
s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
s"${rdd.getCallSite} ${rdd.callSiteInfo.firstUserClass}"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private[spark] class Stage(
id
}

val name = callSite.getOrElse(rdd.origin)
val name = callSite.getOrElse(rdd.getCallSite)

override def toString = "Stage " + id

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,8 @@ private[spark] object Utils extends Logging {
new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
}

def formatSparkCallSite = {
val callSiteInfo = getCallSiteInfo
/** Returns a printable version of the call site info suitable for logs. */
def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = {
"%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
callSiteInfo.firstUserLine)
}
Expand Down