Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.columnar

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, RowOrdering}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -323,18 +324,31 @@ private[columnar] final class DecimalColumnStats(precision: Int, scale: Int) ext
}

private[columnar] final class ObjectColumnStats(dataType: DataType) extends ColumnStats {
protected var upper: Any = null
protected var lower: Any = null

val columnType = ColumnType(dataType)
val ordering = dataType match {
case x if RowOrdering.isOrderable(dataType) && x != NullType =>
Option(TypeUtils.getInterpretedOrdering(x))
case _ => None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class is only for "orderable", maybe we don't need optional here and ordering can just be Ordering[T].

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for DataTypes that could be orderable since Arrays and Structs may have children data types that aren't.

}

override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
if (!row.isNullAt(ordinal)) {
val size = columnType.actualSize(row, ordinal)
sizeInBytes += size
count += 1
ordering.foreach { order =>
Copy link
Member

@kiszk kiszk Jan 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have more than one elements in ordering? If not, can we write this without foreach? It could achieve better performance.

val value = row.get(ordinal, dataType)
if (upper == null || order.gt(value, upper)) upper = value
if (lower == null || order.lt(value, lower)) lower = value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For unsafe row and array, doesn't we need to copy the value? In the added test this can't be tested because the random rows are all individual instances, however, it can be the same instance of unsafe row or array during query evaluation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for catching this.

}
} else {
gatherNullStats
}
}

override def collectedStatistics: Array[Any] =
Array[Any](null, null, nullCount, count, sizeInBytes)
Array[Any](lower, upper, nullCount, count, sizeInBytes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,35 @@
package org.apache.spark.sql.execution.columnar

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._

class ColumnStatsSuite extends SparkFunSuite {
testColumnStats(classOf[BooleanColumnStats], BOOLEAN, Array(true, false, 0))
testColumnStats(classOf[ByteColumnStats], BYTE, Array(Byte.MaxValue, Byte.MinValue, 0))
testColumnStats(classOf[ShortColumnStats], SHORT, Array(Short.MaxValue, Short.MinValue, 0))
testColumnStats(classOf[IntColumnStats], INT, Array(Int.MaxValue, Int.MinValue, 0))
testColumnStats(classOf[LongColumnStats], LONG, Array(Long.MaxValue, Long.MinValue, 0))
testColumnStats(classOf[FloatColumnStats], FLOAT, Array(Float.MaxValue, Float.MinValue, 0))
testColumnStats(classOf[DoubleColumnStats], DOUBLE, Array(Double.MaxValue, Double.MinValue, 0))
testColumnStats(classOf[StringColumnStats], STRING, Array(null, null, 0))
testDecimalColumnStats(Array(null, null, 0))
testColumnStats(classOf[BooleanColumnStats], BOOLEAN, Array(true, false, 0, 0, 0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those changes to testColumnStats seems unnecessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column statistics have 5 fields in their array, so the zip comparison on the initial stats will drop the final two.

testColumnStats(classOf[ByteColumnStats], BYTE, Array(Byte.MaxValue, Byte.MinValue, 0, 0, 0))
testColumnStats(classOf[ShortColumnStats], SHORT, Array(Short.MaxValue, Short.MinValue, 0, 0, 0))
testColumnStats(classOf[IntColumnStats], INT, Array(Int.MaxValue, Int.MinValue, 0, 0, 0))
testColumnStats(classOf[LongColumnStats], LONG, Array(Long.MaxValue, Long.MinValue, 0, 0, 0))
testColumnStats(classOf[FloatColumnStats], FLOAT, Array(Float.MaxValue, Float.MinValue, 0, 0, 0))
testColumnStats(
classOf[DoubleColumnStats], DOUBLE,
Array(Double.MaxValue, Double.MinValue, 0, 0, 0)
)
testColumnStats(classOf[StringColumnStats], STRING, Array(null, null, 0, 0, 0))
testDecimalColumnStats(Array(null, null, 0, 0, 0))
testObjectColumnStats(ArrayType(IntegerType), orderable = true, Array(null, null, 0, 0, 0))
testObjectColumnStats(
StructType(Array(StructField("test", DataTypes.StringType))),
orderable = true,
Array(null, null, 0, 0, 0)
)
testObjectColumnStats(
MapType(IntegerType, StringType),
orderable = false,
Array(null, null, 0, 0, 0)
)


def testColumnStats[T <: AtomicType, U <: ColumnStats](
columnStatsClass: Class[U],
Expand Down Expand Up @@ -103,4 +120,43 @@ class ColumnStatsSuite extends SparkFunSuite {
}
}
}

def testObjectColumnStats(
dataType: DataType, orderable: Boolean, initialStatistics: Array[Any]): Unit = {
assert(!(orderable ^ RowOrdering.isOrderable(dataType)))
val columnType = ColumnType(dataType)

test(s"${dataType.typeName}: empty") {
val objectStats = new ObjectColumnStats(dataType)
objectStats.collectedStatistics.zip(initialStatistics).foreach {
case (actual, expected) => assert(actual === expected)
}
}

test(s"${dataType.typeName}: non-empty") {
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
val objectStats = new ObjectColumnStats(dataType)
val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we don't reuse the unsafe array/row here, we don't actually test on the copying in corresponding column statistics, can we have the test data reusing the unsafe structures to test array and struct column statistics?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will do.

rows.foreach(objectStats.gatherStats(_, 0))

val stats = objectStats.collectedStatistics
if (orderable) {
val values = rows.take(10).map(_.get(0, columnType.dataType))
val ordering = TypeUtils.getInterpretedOrdering(dataType)

assertResult(values.min(ordering), "Wrong lower bound")(stats(0))
assertResult(values.max(ordering), "Wrong upper bound")(stats(1))
} else {
assertResult(null, "Wrong lower bound")(stats(0))
assertResult(null, "Wrong upper bound")(stats(1))
}
assertResult(10, "Wrong null count")(stats(2))
assertResult(20, "Wrong row count")(stats(3))
assertResult(stats(4), "Wrong size in bytes") {
rows.map { row =>
if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0)
}.sum
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import scala.collection.immutable.HashSet
import scala.util.Random

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types.{AtomicType, Decimal}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData, UnsafeMapData, UnsafeProjection}
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.types.{AtomicType, DataType, Decimal, IntegerType, MapType, StringType, StructField, StructType}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it imports more then 5, wlidcard can be used as well per style guide.

import org.apache.spark.unsafe.types.UTF8String

object ColumnarTestUtils {
Expand Down Expand Up @@ -54,12 +54,22 @@ object ColumnarTestUtils {
case COMPACT_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale)
case LARGE_DECIMAL(precision, scale) => Decimal(Random.nextLong(), precision, scale)
case STRUCT(_) =>
new GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10))))
val schema = StructType(Array(StructField("test", StringType)))
val converter = UnsafeProjection.create(schema)
converter(InternalRow(Array(UTF8String.fromString(Random.nextString(10))): _*))
case ARRAY(_) =>
new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt()))
UnsafeArrayData.fromPrimitiveArray(Array(Random.nextInt(), Random.nextInt()))
case MAP(_) =>
ArrayBasedMapData(
Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32)))))
val unsafeConverter =
UnsafeProjection.create(Array[DataType](MapType(IntegerType, StringType)))
val row = new GenericInternalRow(1)
def toUnsafeMap(map: ArrayBasedMapData): UnsafeMapData = {
row.update(0, map)
val unsafeRow = unsafeConverter.apply(row)
unsafeRow.getMap(0).copy
}
toUnsafeMap(ArrayBasedMapData(
Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32))))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems above changes to data generation are unnecessary too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ColumnType for Maps/Struct/Array all end up casting to their Unsafe structures to get the size for the statistics, so the test data will need to reflect that as well.

case _ => throw new IllegalArgumentException(s"Unknown column type $columnType")
}).asInstanceOf[JvmType]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.util.Utils

class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -527,4 +526,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-23819: Complex type pruning should utilize proper statistics") {
val df = Seq((Array(1), (1, 1))).toDF("arr", "struct").cache()
assert(df.where("arr <=> array(1)").count() === 1)
assert(df.where("struct <=> named_struct('_1', 1, '_2', 1)").count() === 1)
}
}