Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ private[spark] class ExecutorAllocationManager(
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

/**
* Change the value of numExecutorsTarget.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too vague...

Reset this manager to the initial starting state.
This must be called if the cluster manager is restarted.

*/
def reSetNumExecutorsTarget(): Unit = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 reset

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just call this reset() since it needs to do more than just setting the target

logDebug(s"Now reset the value of numExecutorsTarget.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation is off

numExecutorsTarget = conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly what's going on, this will result in addExecutors to issue a request for the desired number of executors, if that does not match spark.dynamicAllocation.initialExecutors, right? That same method will also bring numExecutorsTarget up to whatever was the previous value once it runs.

Instead, wouldn't it be clearer to just explicitly send a message to the AM saying "this is the current state of the world, initialize yourself to deal with it"? Maybe as a reply to the RegisterClusterManager message.

Also, YarnAllocator holds a lot of state about existing containers, such as in allocatedHostToContainersMap and releasedContainers. Is that data re-created somehow when the new AM comes up? If not, what are the side-effects, if any, of not having that information around?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we should reset on the driver side as this patch has done, since we end up calling updateAndSync anyway

}

/**
* Stop the allocation manager.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private[spark] abstract class YarnSchedulerBackend(
filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
}
scheduler.sc.executorAllocationManager.foreach(_.reSetNumExecutorsTarget())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should go into some other place. you've put it here into addWebUIFilter, just temporary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this whole change needs a better explanation. At a high leve, why does it occur and why is this the right fix? This doesn't look right

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My way is: if a new AM is started, the value of numExecutorsTarget is reset. So the total number of executor will be calculated again, and driver will send RequestExecutors to AM to start execuots.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not good to add this line of change here. I think your purpose is to detect whether the AM is reconnected, right? There's are many other ways for you to detect it, adding here is a little weird.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
Expand Down