Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Extract distinct aggregate expressions.
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
val distincgAggExpressions = aggExpressions.filter(_.isDistinct)
val distinctAggGroups = distincgAggExpressions.groupBy { e =>
val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
if (unfoldableChildren.nonEmpty) {
// Only expand the unfoldable children
Expand All @@ -132,7 +133,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Aggregation strategy can handle queries with a single distinct group.
if (distinctAggGroups.size > 1) {
if (distincgAggExpressions.size > 1) {
// Create the attributes for the grouping id and the group by clause.
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
Expand All @@ -151,7 +152,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Setup unique distinct aggregate children.
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq
Copy link
Member

@gatorsmile gatorsmile May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this pr though, I dropped .distinct because it does nothing (keySet.flatten is already a set)?

val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair)
val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our MultipleDistinctRewriter should take care this case.
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
"Spark user mailing list.")
}
Expand Down
6 changes: 5 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ SELECT 1 from (
FROM (select 1 as x) a
WHERE false
) b
where b.z != b.z
where b.z != b.z;

-- SPARK-24369 multiple distinct aggregations having the same argument set
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y);
11 changes: 10 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/group-by.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 26
-- Number of queries: 27


-- !query 0
Expand Down Expand Up @@ -241,3 +241,12 @@ where b.z != b.z
struct<1:int>
-- !query 25 output



-- !query 26
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
-- !query 26 schema
struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,corr(DISTINCT CAST(y AS DOUBLE), CAST(x AS DOUBLE)):double,count(1):bigint>
-- !query 26 output
1.0 1.0 3