-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23928][SQL] Add shuffle collection function. #21802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @pkuwm |
python/pyspark/sql/functions.py
Outdated
| """ | ||
| Collection function: Generates a random permutation of the given array. | ||
| .. note:: The function is non-deterministic because its results depends on order of rows which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it non-deterministic rather for the fact that the permutation is determined randomly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this one would be better? "The function is non-deterministic because it produces
an unbiased permutation: every permutation is equally likely."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The permutation is determined randomly but it is determined for the same query plan if the order of rows is determined, because the analyzer will assign a random seed for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given a same input sequence, will this function always return the same permutation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The seed is fixed when analysis phase, so if we, say, collect() twice or more from the same DataFrame, we will get the same result:
val df = .. .select(shuffle('arr))
df.collect() == df.collect()but if we create another DataFrame from the same input, we will get different results:
val df1 = .. .select(shuffle('arr))
val df2 = .. .select(shuffle('arr))
df1.collect() != df2.collect()| null | ||
| ).toDF("i") | ||
|
|
||
| def checkResult1(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a different name for the method?
|
Test build #93231 has finished for PR 21802 at commit
|
5817265 to
9081e2f
Compare
|
Test build #93261 has finished for PR 21802 at commit
|
|
Jenkins, retest this please. |
|
Test build #93268 has finished for PR 21802 at commit
|
|
Test build #93279 has finished for PR 21802 at commit
|
| override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { | ||
| case p if p.resolved => p | ||
| case p => p transformExpressionsUp { | ||
| case Shuffle(child, None) => Shuffle(child, Some(random.nextLong())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks reasonable, do we have any context about why we start doing this? instead of picking a seed at runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then can we use a single rule to assign seed to these randomized functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, in Uuid we want to make sure the same query plan can return the same result. It is more deterministic between retries.
| * Returns a random permutation of the given array. | ||
| * | ||
| * This implementation uses the modern version of Fisher-Yates algorithm. | ||
| * Reference: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#Modern_method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to use Fisher-Yates here? AFAIK we should not change the input value, in case it's used by other expressions and common subexpression elimination is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copy the input array before starting shuffle not to change the input value. Isn't it safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we create a new array, I guess there should be some simpler algorithms without swapping...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. Let me try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the latest commit, "inside-out" looks simpler without swapping (using just an assignment).
|
Do we really need full codegen for all of these collection functions? They seem pretty slow and specialization with full codegen won't help perf that much (and might even hurt by blowing up the code size) right? |
|
Test build #93373 has finished for PR 21802 at commit
|
| assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) === | ||
| evaluateWithUnsafeProjection(Shuffle(ai0, seed1))) | ||
|
|
||
| val seed2 = Some(r.nextLong()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to ensure this property (different seeds must generate different result)?
We likely expect this property. However, I think that this test is too strict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also followed a test for Uuid MiscExpressionsSuite.scala#L46-L68 here.
@viirya WDYT about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is what we expect. The result is decided by the random seed. So if using different random seeds, I think the results should be different.
|
Test build #93493 has finished for PR 21802 at commit
|
|
@rxin Generally I agree with you, but currently the whole-stage-codegen doesn't support |
| val isPrimitiveType = CodeGenerator.isPrimitiveType(elementType) | ||
|
|
||
| val numElements = ctx.freshName("numElements") | ||
| val arrayData = ctx.freshName("arrayData") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't need the arrayData variable, we can assign ev.value directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we need a new variable to use ctx.createUnsafeArray() which declares a new variable in it for now whereas ev.value is already declared.
| case class RandomIndicesGenerator(randomSeed: Long) { | ||
| private val random = new MersenneTwister(randomSeed) | ||
|
|
||
| def getNextIndices(length: Int): Array[Int] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should take a Array[Int], to save the array creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create an array to store the shuffled indices anyway. If we want to pass an array to be shuffled, we need to create the array and fill it with 0 until n before we call this. But with this implementation, we don't need to fill the numbers prior to shuffle thanks to the "inside-out" version of Fisher-Yates algorithm. WDYT?
|
LGTM |
1 similar comment
|
LGTM |
python/pyspark/sql/functions.py
Outdated
| """ | ||
| Collection function: Generates a random permutation of the given array. | ||
| .. note:: The function is non-deterministic because its results depends on order of rows which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: results depends found while reading this one.
| [3, 1, 5, 20] | ||
| > SELECT _FUNC_(array(1, 20, null, 3)); | ||
| [20, null, 3, 1] | ||
| """, since = "2.4.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add note here too.
| * Returns a random permutation of the given array. | ||
| * | ||
| * @group collection_funcs | ||
| * @since 2.4.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we match the documentation here as well?
| .. note:: The function is non-deterministic because its results depends on order of rows which | ||
| may be non-deterministic after a shuffle. | ||
| :param col: name of column or expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python doctest looks missing.
|
Looks good to me too |
|
Test build #93661 has finished for PR 21802 at commit
|
|
Thanks! merging to master. |
What changes were proposed in this pull request?
This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.
How was this patch tested?
New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.