Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
val executorId = executorAdded.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
Expand All @@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.cluster.ExecutorInfo

/**
* Test add and remove behavior of ExecutorAllocationManager.
Expand Down Expand Up @@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {

// Verify that running a task reduces the cap
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsPending(manager) === 4)
assert(addExecutors(manager) === 1)
Expand Down Expand Up @@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)

// New executors have registered
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(removeTimes(manager).contains("executor-1"))
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-2", "host2", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-2", new ExecutorInfo("host2", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 2)
assert(removeTimes(manager).contains("executor-2"))

// Existing executors have disconnected
sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
0L, BlockManagerId("executor-1", "host1", 1)))
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", ""))
assert(executorIds(manager).size === 1)
assert(!executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(!removeTimes(manager).contains("executor-1"))

// Unknown executor has disconnected
sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
0L, BlockManagerId("executor-3", "host3", 1)))
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", ""))
assert(executorIds(manager).size === 1)
assert(removeTimes(manager).size === 1)
}
Expand All @@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)

sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)
Expand All @@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))

assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)

sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-2", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-2", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 1)
Expand Down