Skip to content

Conversation

@dydeve
Copy link

@dydeve dydeve commented May 22, 2019

What changes were proposed in this pull request?

LiveListenerBus may create multiple AsyncEventQueues with the same name when run in multi-thread scene and those created AsyncEventQueues will be added to queues finally.

In this patch, I change the queues: CopyOnWriteArrayList to map: ConcurrentHashMap which promise that only one of the AsyncEventQueues created with same name will be added to map.

How was this patch tested?

tested in SparkListenerSuite BlacklistTrackerSuite BlockManagerSuite EventLoggingListenerSuite
ExecutorAllocationManagerSuite OutputCommitCoordinatorSuite TaskSetBlacklistSuite

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@HeartSaVioR HeartSaVioR May 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here write operation on queues is guarded with synchronized, so what you describe should not happen as I commented on JIRA issue.

You've asked in JIRA comment why using explicit thread-safe list if the list is guarded. As I'm not the author of code I can't say, but you can trace back to #19211 and read some comments around queues.

The list is expected to have very small writing operations, whereas iteration frequently happens. If you see javadoc about CopyOnWriteArrayList:

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CopyOnWriteArrayList.html

This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads.

As you can see postToQueues, traversal operations are not guarded with synchronized. So the list should be still thread-safe, but it is not to avoid race condition between multiple writes (it's guarded with synchronized) but race between read (traversal) and write, and looks like traversal-optimized one is chosen.

Copy link
Author

@dydeve dydeve May 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thak you. I ignored the word synchronized,I am careless.
And I know the reason of using CopyOnWriteArrayList instead of ArrayList is to avoid ConcurrentModificationException.

How about improving the usage of synchronized ?

  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = {
    if (stopped.get()) {//read barrier
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)

      case None =>
        this.synchronized {

          queues.asScala.find(_.name == queue) match {
            case Some(queue) =>
              queue.addListener(listener)

            case None =>
              val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
              newQueue.addListener(listener)
              if (started.get()) {
                newQueue.start(sparkContext)
              }
              queues.add(newQueue)
          }
        }

    }
  }

@dydeve dydeve closed this May 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants