Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,16 @@ abstract class HashExpression[E] extends Expression {

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
ev.isNull = "false"
val childrenHash = children.map { child =>
val childrenHash = ctx.splitExpressions(ctx.INPUT_ROW, children.map { child =>
val childGen = child.genCode(ctx)
childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
computeHash(childGen.value, child.dataType, ev.value, ctx)
}
}.mkString("\n")
})

ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
ev.copy(code = s"""
${ctx.javaType(dataType)} ${ev.value} = $seed;
${ev.value} = $seed;
$childrenHash""")
}

Expand Down Expand Up @@ -600,15 +601,18 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
ev.isNull = "false"
val childHash = ctx.freshName("childHash")
val childrenHash = children.map { child =>
val childrenHash = ctx.splitExpressions(ctx.INPUT_ROW, children.map { child =>
val childGen = child.genCode(ctx)
childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
computeHash(childGen.value, child.dataType, childHash, ctx)
} + s"${ev.value} = (31 * ${ev.value}) + $childHash;"
}.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "")
} + s"${ev.value} = (31 * ${ev.value}) + $childHash;" +
s"\n$childHash = 0;"
})

ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
ctx.addMutableState("int", childHash, s"$childHash = 0;")
ev.copy(code = s"""
${ctx.javaType(dataType)} ${ev.value} = $seed;
${ev.value} = $seed;
$childrenHash""")
}

Expand Down
19 changes: 19 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1728,4 +1728,23 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema)
assert(df.filter($"array1" === $"array2").count() == 1)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to revert this change.

Copy link
Member Author

@kiszk kiszk Nov 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my typo. It is time for me to have to sleep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. Happens to us all :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there is an additional new line.

test("SPARK-18207: Compute hash for wider table") {
import org.apache.spark.sql.types.{StructType, StringType}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance that you can make this UT a bit more concise. This seems way to contrived.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it more concise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this hashing?

Copy link
Member Author

@kiszk kiszk Nov 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This union involves hashing. I confirmed that the original error occurred without this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk The link above is broken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the hashing is not explicit to reader, can you add a comment for that?

Copy link
Member Author

@kiszk kiszk Nov 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya thank you for pointing out. I fixed the link, and will add a comment there later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is more like an end-to-end test. The hashing is not obvious as we seen.

Can we add unit test? In HashExpressionsSuite, I think? So you can test HiveHash too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree with @viirya the current test is way to indirect and can actually be broken silently if the planner choses a sort based aggregate over a hash based aggregate. It would be very nice to have a direct test on the hash function.

Copy link
Member Author

@kiszk kiszk Nov 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I created direct tests in HashExpressionsSuite.scala.


val COLMAX = 1000
val schema: StructType = (1 to COLMAX)
.foldLeft(new StructType())((s, i) => s.add(s"g$i", StringType, nullable = true))
val rdds = spark.sparkContext.parallelize(Seq(Row.fromSeq((1 to COLMAX).map(_.toString))))
val wideDF = spark.createDataFrame(rdds, schema)

val widePlus = wideDF.withColumn("d_rank", lit(1))
widePlus.createOrReplaceTempView("wide_plus")
val widePlus2 = widePlus.withColumn("d_rank", lit(0))
widePlus2.createOrReplaceTempView("wide_plus2")

// union operation in this SQL involves computation of hash for a row
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Actually the hash computation is happened at HashAggregate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Updated the comment

val df = spark.sqlContext.sql("select * from wide_plus union select * from wide_plus2")
df.count
}
}