Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2570,10 +2570,23 @@ class SparkContext(config: SparkConf) extends Logging {

private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()

private val nextRddId = new AtomicInteger(0)
private var nextRddId = new AtomicInteger(0)
Copy link

@schlosna schlosna Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the RDD ID overflows within a SparkContext? Are there tests that cover these cases?

Curious if it would be better to switch to an AtomicLong and just modulo max int?

Suggested change
private var nextRddId = new AtomicInteger(0)
private[spark] def newRddId(): Int = (nextRddId.getAndIncrement() % Integer.MAX_VALUE).toInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the RDD ID overflows within a SparkContext?
When BlockManager generates BlockId, BlockId only supports positive rddid, so BlockId generation fails.

switch to an AtomicLong ?
The scope of influence is very broad, which may require extensive discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You dont need to make this a var - see below.


/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
private[spark] def newRddId(): Int = {
var id = nextRddId.getAndIncrement()
if (id >= 0) {
return id
}
this.synchronized {
id = nextRddId.getAndIncrement()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid the duplicate call of 'nextRddId.getAndIncrement()'?

if (id < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this if (id < 0) condition, else part of the previous if (id >= 0) can be used

nextRddId = new AtomicInteger(0)
id = nextRddId.getAndIncrement()
Copy link
Contributor

@vinodkc vinodkc Nov 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try this ?

nextRddId = new AtomicInteger(1)
id = nextRddId 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review. I updated the code.

}
}
id
}

/**
* Registers listeners specified in spark.extraListeners, then starts the listener bus.
Expand Down