Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,24 @@ private[spark] class AppStatusListener(
// Update the block entry in the RDD info, keeping track of the deltas above so that we
// can update the executor information too.
liveRDDs.get(block.rddId).foreach { rdd =>

if (updatedStorageLevel.isDefined) {
rdd.setStorageLevel(updatedStorageLevel.get)
// Replicated block update events will have `storageLevel.replication=1`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a bug itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs more check, including impacts. Currently the fix is from UI side.

// To avoid overwriting the block replicated event in the store, we need to
// have a check for whether the event is block replication or not.
// Default value of `storageInfo.replication = 1` and hence if
// `storeLevel.replication = 2`, the replicated events won't overwrite in the store.
val storageInfo = rdd.storageInfo
val isReplicatedBlockUpdateEvent = storageLevel.replication < storageInfo.replication &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if (storageLevel.isValid) before accessing storageLevel.*?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, This line checks the storageLevel is valid or not.

val updatedStorageLevel = if (storageLevel.isValid) {
Some(storageLevel.description)
} else {
None
}
If not valid, then the updatedStorageLevel will be None. So, it won't come to this line (L-928).
Thanks

(storageInfo.useDisk == storageLevel.useDisk &&
storageInfo.useMemory == storageLevel.useMemory &&
storageInfo.deserialized == storageLevel.deserialized &&
storageInfo.useOffHeap == storageLevel.useOffHeap)

if (!isReplicatedBlockUpdateEvent) {
rdd.storageInfo = storageLevel
rdd.setStorageLevel(updatedStorageLevel.get)
}
}

val partition = rdd.partition(block.name)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.{RDDInfo, StorageLevel}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
Expand Down Expand Up @@ -510,6 +510,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
var storageLevel: String = weakIntern(info.storageLevel.description)
var memoryUsed = 0L
var diskUsed = 0L
var storageInfo: StorageLevel = new StorageLevel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know this part well, but is it redundant with storageLevel above?

Copy link
Contributor Author

@shahidki31 shahidki31 Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above was just a string representation of storage level. from StorageInfo we can get individual parameters including replication.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but should we not just replace the field above with this richer object? or should this not use info.storageLevel as the initial value? maybe not, just jumped out at me as a question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. we can initialize storageInfo = info.storageLevel. But I'm not sure we can get rid of storageLevel, as there is a public method which sets the value. updated the code.


private val partitions = new HashMap[String, LiveRDDPartition]()
private val partitionSeq = new RDDPartitionSeq()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,46 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}

test("storage description should display correct replication if storage replication is 2") {
val listener = new AppStatusListener(store, conf, true)
// Register a couple of block managers.
val bm1 = BlockManagerId("1", "host-1", 1234)
val bm2 = BlockManagerId("2", "host-2", 2345)

Seq(bm1, bm2).foreach { bm =>
listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId,
new ExecutorInfo(bm.host, 1, Map.empty, Map.empty)))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, 42L))
}
val rddBlock = RddBlock(1, 1, 1L, 2L)

val level = StorageLevel.MEMORY_AND_DISK_2
// `replication` value of the replicated block will be 1.
val levelBlockReplica = StorageLevel.MEMORY_AND_DISK

// Submit a stage and make sure the RDDs are recorded.
val rdd1Info = new RDDInfo(rddBlock.rddId, "rdd1", 2, level, Nil)
val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1")
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))

// Block update event, where replication = 2
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rddBlock.blockId, level, rddBlock.memSize, rddBlock.diskSize)))
// Block update event, where replication = 1
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm2, rddBlock.blockId, levelBlockReplica, rddBlock.memSize,
rddBlock.diskSize)))

check[RDDStorageInfoWrapper](rddBlock.rddId) { wrapper =>
val partitionInfo = wrapper.info.partitions.get.find(_.blockName === rddBlock.blockId.name)
.get
assert(partitionInfo.storageLevel === level.description)
assert(partitionInfo.memoryUsed === 2 * rddBlock.memSize)
assert(partitionInfo.diskUsed === 2 * rddBlock.diskSize)
assert(partitionInfo.executors === Seq(bm1.executorId, bm2.executorId))
}
}

test("storage information on executor lost/down") {
val listener = new AppStatusListener(store, conf, true)
val maxMemory = 42L
Expand Down