Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ private[spark] class PartitionedAppendOnlyMap[K, V]

def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
destructiveSortedIterator(comparator)
destructiveSortedIterator(getComparator(keyComparator))
}

def insert(partition: Int, key: K, value: V): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
/** Iterate through the data in a given order. For this class this is not really destructive. */
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
new Sorter(new KVArraySortDataFormat[(Int, K),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think breaking the line here is odd. If necesasry, pull out the result of getComparator to a statement above to shorten this line, like it was before.

AnyRef]).sort(data, 0, curSize, getComparator(keyComparator))
iterator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,31 @@ private[spark] object WritablePartitionedPairCollection {
}
}

/* Takes an optional parameter (keyComparator), use if provided
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc/scaladoc starts with /** and usually you leave that alone on one line and start documentation on the next.

* and returns a comparator for the partitions
*/
def getComparator[K](keyComparator: Option[Comparator[K]]) : Comparator[(Int, K)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no space before colon

val comparator : Comparator[(Int, K)] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comparator is now entirely redundant. The whole body is just the if statement

if (keyComparator.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDefined is probably a tiny bit more conventional (and then flip the logic here of course)

partitionComparator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be inlined now

} else {
new Comparator[(Int, K)] {
// We know we have a non-empty comparator here
val ourKeyComp = keyComparator.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be outside the body of the anonymous class. You don't need a reference to the Option here even in the anonymous class.

override def compare(a: (Int, K), b: (Int, K)): Int = {
val partitionDiff = a._1 - b._1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not thrilled about the subtraction here but maybe leave it for now

if (partitionDiff != 0) {
partitionDiff
} else {
ourKeyComp.compare(a._2, b._2)
}
}
}
}
comparator
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can delete partitionKeyComparator below now, right?

/**
* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
*/
Expand Down