Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
213ada8
First draft of partially aggregated and code generated count distinct…
marmbrus Aug 17, 2014
bd08239
WIP
marmbrus Aug 17, 2014
050bb97
Skip no-arg constructors for kryo,
marmbrus Aug 17, 2014
41fbd1d
Never try and create an empty hash set.
marmbrus Aug 17, 2014
d494598
Fix tests now that the planner is better
marmbrus Aug 18, 2014
9153652
better toString
marmbrus Aug 18, 2014
38c7449
comments and style
marmbrus Aug 18, 2014
f31b8ad
more fixes
marmbrus Aug 18, 2014
c1f7114
Improve tests / fix serialization.
marmbrus Aug 18, 2014
b3d0f64
Add golden files.
marmbrus Aug 18, 2014
57ae3b1
Fix order dependent test
marmbrus Aug 18, 2014
27984d0
Merge remote-tracking branch 'origin/master' into countDistinctPartial
marmbrus Aug 18, 2014
abee26d
WIP
marmbrus Aug 18, 2014
87d101d
Fix isNullAt bug
marmbrus Aug 19, 2014
58d15f1
disable codegen logging
marmbrus Aug 19, 2014
8ff6402
Add specific row.
marmbrus Aug 19, 2014
2b46c4b
Merge remote-tracking branch 'origin/master' into countDistinctPartial
marmbrus Aug 19, 2014
c9e67de
Made SpecificRow and types serializable by Kryo
GregOwen Aug 19, 2014
3868f6c
Merge pull request #9 from GregOwen/countDistinctPartial
marmbrus Aug 19, 2014
db44a30
JIT hax.
marmbrus Aug 19, 2014
93d0f64
metastore concurrency fix.
marmbrus Aug 19, 2014
fdca896
cleanup
marmbrus Aug 19, 2014
fae38f4
Fix style
marmbrus Aug 19, 2014
b2e8ef3
Merge remote-tracking branch 'origin/master' into countDistinctPartial
marmbrus Aug 20, 2014
c122cca
Address comments, add tests
marmbrus Aug 20, 2014
32d216f
reynolds comments
marmbrus Aug 20, 2014
8074a80
fix tests
marmbrus Aug 20, 2014
5c7848d
turn off caching in the constructor
marmbrus Aug 23, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

protected val exprArray = expressions.toArray
// null check is required for when Kryo invokes the no-arg constructor.
protected val exprArray = if (expressions != null) expressions.toArray else null

def apply(input: Row): Row = {
val outputArray = new Array[Any](exprArray.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.util.collection.OpenHashSet

abstract class AggregateExpression extends Expression {
self: Product =>
Expand Down Expand Up @@ -161,13 +162,88 @@ case class Count(child: Expression) extends PartialAggregate with trees.UnaryNod
override def newInstance() = new CountFunction(child, this)
}

case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression {
case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate {
def this() = this(null)

override def children = expressions
override def references = expressions.flatMap(_.references).toSet
override def nullable = false
override def dataType = LongType
override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")})"
override def newInstance() = new CountDistinctFunction(expressions, this)

override def asPartial = {
val partialSet = Alias(CollectHashSet(expressions), "partialSets")()
SplitEvaluation(
CombineSetsAndCount(partialSet.toAttribute),
partialSet :: Nil)
}
}

case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression {
def this() = this(null)

override def children = expressions
override def references = expressions.flatMap(_.references).toSet
override def nullable = false
override def dataType = ArrayType(expressions.head.dataType)
override def toString = s"AddToHashSet(${expressions.mkString(",")})"
override def newInstance() = new CollectHashSetFunction(expressions, this)
}

case class CollectHashSetFunction(
@transient expr: Seq[Expression],
@transient base: AggregateExpression)
extends AggregateFunction {

def this() = this(null, null) // Required for serialization.

val seen = new OpenHashSet[Any]()

@transient
val distinctValue = new InterpretedProjection(expr)

override def update(input: Row): Unit = {
val evaluatedExpr = distinctValue(input)
if (!evaluatedExpr.anyNull) {
seen.add(evaluatedExpr)
}
}

override def eval(input: Row): Any = {
seen
}
}

case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression {
def this() = this(null)

override def children = inputSet :: Nil
override def references = inputSet.references
override def nullable = false
override def dataType = LongType
override def toString = s"CombineAndCount($inputSet)"
override def newInstance() = new CombineSetsAndCountFunction(inputSet, this)
}

case class CombineSetsAndCountFunction(
@transient inputSet: Expression,
@transient base: AggregateExpression)
extends AggregateFunction {

def this() = this(null, null) // Required for serialization.

val seen = new OpenHashSet[Any]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this support null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, we will never put null into it though (we always put rows in, and furthermore count distinct semantics don't count null).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add the line there explaining we never put null into it. i think the open hash set doesn't support null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think most HashSets don't support null. scala.collection.mutable.HashSet throws an exception if you try to add null.


override def update(input: Row): Unit = {
val inputSetEval = inputSet.eval(input).asInstanceOf[OpenHashSet[Any]]
val inputIterator = inputSetEval.iterator
while (inputIterator.hasNext) {
seen.add(inputIterator.next)
}
}

override def eval(input: Row): Any = seen.size.toLong
}

case class ApproxCountDistinctPartition(child: Expression, relativeSD: Double)
Expand Down Expand Up @@ -379,17 +455,22 @@ case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
seen.reduceLeft(base.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].plus)
}

case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpression)
case class CountDistinctFunction(
@transient expr: Seq[Expression],
@transient base: AggregateExpression)
extends AggregateFunction {

def this() = this(null, null) // Required for serialization.

val seen = new scala.collection.mutable.HashSet[Any]()
val seen = new OpenHashSet[Any]()

@transient
val distinctValue = new InterpretedProjection(expr)

override def update(input: Row): Unit = {
val evaluatedExpr = expr.map(_.eval(input))
if (evaluatedExpr.map(_ != null).reduceLeft(_ && _)) {
seen += evaluatedExpr
val evaluatedExpr = distinctValue(input)
if (!evaluatedExpr.anyNull) {
seen.add(evaluatedExpr)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,21 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet

override def eval(input: Row): Any = i2(input, left, right, _.rem(_, _))
}

case class MaxOf(left: Expression, right: Expression) extends Expression {
type EvaluatedType = Any

override def nullable = left.nullable && right.nullable

override def children = left :: right :: Nil

override def references = (left.flatMap(_.references) ++ right.flatMap(_.references)).toSet
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be left.references ++ right.references or children.flatMap(_.references).toSet ?


override def dataType = left.dataType

override def eval(input: Row): Any = {
val leftEval = left.eval(input)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code is missing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do u mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was missing, I just added it.
On Aug 20, 2014 3:18 PM, "Reynold Xin" [email protected] wrote:

In
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala:

@@ -85,3 +85,21 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet

override def eval(input: Row): Any = i2(input, left, right, .rem(, _))
}
+
+case class MaxOf(left: Expression, right: Expression) extends Expression {

  • type EvaluatedType = Any
  • override def nullable = left.nullable && right.nullable
  • override def children = left :: right :: Nil
  • override def references = (left.flatMap(.references) ++ right.flatMap(.references)).toSet
  • override def dataType = left.dataType
  • override def eval(input: Row): Any = {
  • val leftEval = left.eval(input)

what do u mean?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/1935/files#r16509768.

}

override def toString = s"MaxOf($left, $right)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._

class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int]
class LongHashSet extends org.apache.spark.util.collection.OpenHashSet[Long]

/**
* A base class for generators of byte code to perform expression evaluation. Includes a set of
* helpers for referring to Catalyst types and building trees that perform evaluation of individual
Expand Down Expand Up @@ -71,7 +74,8 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
* From the Guava Docs: A Cache is similar to ConcurrentMap, but not quite the same. The most
* fundamental difference is that a ConcurrentMap persists all elements that are added to it until
* they are explicitly removed. A Cache on the other hand is generally configured to evict entries
* automatically, in order to constrain its memory footprint
* automatically, in order to constrain its memory footprint. Note that this cache does not use
* weak keys/values and thus does not respond to memory pressure.
*/
protected val cache = CacheBuilder.newBuilder()
.maximumSize(1000)
Expand Down Expand Up @@ -398,6 +402,75 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
$primitiveTerm = ${falseEval.primitiveTerm}
}
""".children

case NewSet(elementType) =>
q"""
val $nullTerm = false
val $primitiveTerm = new ${hashSetForType(elementType)}()
""".children

case AddItemToSet(item, set) =>
val itemEval = expressionEvaluator(item)
val setEval = expressionEvaluator(set)

val ArrayType(elementType, _) = set.dataType

itemEval.code ++ setEval.code ++
q"""
if (!${itemEval.nullTerm}) {
${setEval.primitiveTerm}
.asInstanceOf[${hashSetForType(elementType)}]
.add(${itemEval.primitiveTerm})
}

val $nullTerm = false
val $primitiveTerm = ${setEval.primitiveTerm}
""".children

case CombineSets(left, right) =>
val leftEval = expressionEvaluator(left)
val rightEval = expressionEvaluator(right)

val ArrayType(elementType, _) = left.dataType

leftEval.code ++ rightEval.code ++
q"""
val leftSet = ${leftEval.primitiveTerm}.asInstanceOf[${hashSetForType(elementType)}]
val rightSet = ${rightEval.primitiveTerm}.asInstanceOf[${hashSetForType(elementType)}]
val iterator = rightSet.iterator
while (iterator.hasNext) {
leftSet.add(iterator.next())
}

val $nullTerm = false
val $primitiveTerm = leftSet
""".children

case MaxOf(e1, e2) =>
val eval1 = expressionEvaluator(e1)
val eval2 = expressionEvaluator(e2)

eval1.code ++ eval2.code ++
q"""
var $nullTerm = false
var $primitiveTerm: ${termForType(e1.dataType)} = ${defaultPrimitive(e1.dataType)}

if (${eval1.nullTerm}) {
$nullTerm = ${eval2.nullTerm}
$primitiveTerm = ${eval2.primitiveTerm}
} else if (${eval2.nullTerm}) {
$nullTerm = ${eval1.nullTerm}
$primitiveTerm = ${eval1.primitiveTerm}
} else {
$nullTerm = false
if (${eval1.primitiveTerm} > ${eval2.primitiveTerm}) {
$primitiveTerm = ${eval1.primitiveTerm}
} else {
$primitiveTerm = ${eval2.primitiveTerm}
}
}
""".children

}

// If there was no match in the partial function above, we fall back on calling the interpreted
Expand Down Expand Up @@ -437,6 +510,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
protected def accessorForType(dt: DataType) = newTermName(s"get${primitiveForType(dt)}")
protected def mutatorForType(dt: DataType) = newTermName(s"set${primitiveForType(dt)}")

protected def hashSetForType(dt: DataType) = dt match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this throw an exception for count distinct of non int/longs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess it is disabled in the higher level

how about adding a default case to throw an exception so it shows that you've controlled this earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, the planner will not use code generation unless the type of the count distinct is a single int or long value. Good idea though, I've added the check.

case IntegerType => typeOf[IntegerHashSet]
case LongType => typeOf[LongHashSet]
}

protected def primitiveForType(dt: DataType) = dt match {
case IntegerType => "Int"
case LongType => "Long"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
..${evaluatedExpression.code}
if(${evaluatedExpression.nullTerm})
setNullAt($iLit)
else
else {
nullBits($iLit) = false
$elementName = ${evaluatedExpression.primitiveTerm}
}
}
""".children : Seq[Tree]
}
Expand Down Expand Up @@ -106,9 +108,10 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
if(value == null) {
setNullAt(i)
} else {
nullBits(i) = false
$elementName = value.asInstanceOf[${termForType(e.dataType)}]
return
}
return
}"""
}
q"final def update(i: Int, value: Any): Unit = { ..$cases; $accessorFailure }"
Expand Down Expand Up @@ -137,7 +140,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
val elementName = newTermName(s"c$i")
// TODO: The string of ifs gets pretty inefficient as the row grows in size.
// TODO: Optional null checks?
q"if(i == $i) { $elementName = value; return }" :: Nil
q"if(i == $i) { nullBits($i) = false; $elementName = value; return }" :: Nil
case _ => Nil
}

Expand Down
Loading