Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Aug 15, 2019

What changes were proposed in this pull request?

Now, RepartitionByExpression is allowed at Dataset method Dataset.repartition(). But in spark sql, we do not have an equivalent functionality.
In hive, we can use distribute by, so it's worth to add a hint to support such function.
Similar jira SPARK-24940

Why are the changes needed?

Make repartition hints consistent with repartition api .

Does this PR introduce any user-facing change?

This pr intends to support quries below;

// SQL cases
 - sql("SELECT /*+ REPARTITION(c) */ * FROM t")
 - sql("SELECT /*+ REPARTITION(1, c) */ * FROM t")
 - sql("SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t")
 - sql("SELECT /*+ REPARTITION_BY_RANGE(1, c) */ * FROM t")

How was this patch tested?

UT

@maropu
Copy link
Member

maropu commented Aug 16, 2019

@gatorsmile @maryannxue We need this?

@maryannxue
Copy link
Contributor

I'm not against it, but is it possible to extend the existing "repartition" hint grammar to achieve this?

@maropu
Copy link
Member

maropu commented Aug 21, 2019

Yea, that approach looks more reasonable to me. Could you brush up the code based on the suggestion? @ulysses-you

@ulysses-you
Copy link
Contributor Author

Thanks for review this. Make hint consistent with repartition api is a good idea.

@ulysses-you
Copy link
Contributor Author

retest this please

@maropu
Copy link
Member

maropu commented Aug 23, 2019

ok to test

@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #109600 has finished for PR 25464 at commit c57805b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

RepartitionByExpression(
exprs.map(_.asInstanceOf[UnresolvedAttribute]), h.child, numPartitions)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the case, SELECT /*+ REPARTITION(a) */ * FROM t?

if (errExprs.nonEmpty) throw new AnalysisException(
s"""Invalid type exprs : $errExprs
|expects UnresolvedAttribute type
""".stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz add tests for this exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will add this later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't add this test yet?

@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #109603 has finished for PR 25464 at commit 4fbf34a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2019

Test build #109615 has finished for PR 25464 at commit b0ae689.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ulysses-you
Copy link
Contributor Author

retest this please

1 similar comment
@maropu
Copy link
Member

maropu commented Aug 26, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Aug 26, 2019

Test build #109720 has finished for PR 25464 at commit b0ae689.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Aug 26, 2019

retet this please

@ulysses-you
Copy link
Contributor Author

retest this please

@maropu
Copy link
Member

maropu commented Aug 27, 2019

oh.. typo..

@maropu
Copy link
Member

maropu commented Aug 27, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112637 has finished for PR 25464 at commit 507735c.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Oct 25, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112639 has finished for PR 25464 at commit 44aa0b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112646 has finished for PR 25464 at commit 44aa0b8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Oct 28, 2019

retest this please

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Anyone could do final checks? @cloud-fan @HyukjinKwon

@SparkQA
Copy link

SparkQA commented Oct 28, 2019

Test build #112749 has finished for PR 25464 at commit 44aa0b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

throw new AnalysisException(s"$hintName Hint parameter should include columns, but " +
s"${invalidParams.mkString(", ")} found")
}
RepartitionByExpression(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we then consistently throw an exception like Dataset.repartition?

    val sortOrders = partitionExprs.filter(_.expr.isInstanceOf[SortOrder])
    if (sortOrders.nonEmpty) throw new IllegalArgumentException(
      s"""Invalid partitionExprs specified: $sortOrders
         |For range partitioning use repartitionByRange(...) instead.
       """.stripMargin)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, add an IllegalArgumentException check.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too otherwise.

@SparkQA
Copy link

SparkQA commented Oct 29, 2019

Test build #112849 has finished for PR 25464 at commit 4ac7eb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 6958d7e Oct 29, 2019
@maropu
Copy link
Member

maropu commented Oct 29, 2019

Thanks for the contribution, @ulysses-you ! Merged to master.

@maropu
Copy link
Member

maropu commented Oct 29, 2019

FYI: I added @ulysses-you in the Spark contributor list.

@ulysses-you
Copy link
Contributor Author

Thanks for great help ! @maropu @HyukjinKwon @cloud-fan @maryannxue

createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case _ => plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we return hint here? This will cause stack overflow once the hint is not the root node.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. I will make a followup. Thanks for catching this.

HyukjinKwon added a commit that referenced this pull request Nov 23, 2019
…unknown hint resolution

### What changes were proposed in this pull request?
This is rather a followup of #25464 (see https://github.com/apache/spark/pull/25464/files#r349543286)

It will cause an infinite recursion via mapping children - we should return the hint rather than its parent plan in unknown hint resolution.

### Why are the changes needed?

Prevent Stack over flow during hint resolution.

### Does this PR introduce any user-facing change?

Yes, it avoids stack overflow exception It was caused by #25464 and this is only in the master.

No behaviour changes to end users as it happened only in the master.

### How was this patch tested?

Unittest was added.

Closes #26642 from HyukjinKwon/SPARK-30003.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
s"""Invalid partitionExprs specified: $sortOrders
|For range partitioning use REPARTITION_BY_RANGE instead.
""".stripMargin)
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check breaks the old API, in Spark 2.4 it is possible to use an expression here. I think we need to back this out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK 2.4 only supports something like REPARTITION(5), the parameters here means anything after the partition number parameter, e.g. REPARTITION(5, para1, para2, ...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think so, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, you are right. Never the less I think we should support expressions for REPARTITION here.

def createRepartitionByExpression(
numPartitions: Int, partitionExprs: Seq[Any]): RepartitionByExpression = {
val sortOrders = partitionExprs.filter(_.isInstanceOf[SortOrder])
if (sortOrders.nonEmpty) throw new IllegalArgumentException(
Copy link
Contributor

@hvanhovell hvanhovell Feb 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style, please put this inside curly braces and on a new line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you Could you do follow-up for the two comments from @hvanhovell ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it might be better to just handle this later when we happen to touch this codes given that we don't usually make followups for minor styles issues.


def createRepartitionByExpression(
numPartitions: Int, partitionExprs: Seq[Any]): RepartitionByExpression = {
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

@ulysses-you ulysses-you deleted the SPARK-28746 branch March 3, 2021 06:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants