Skip to content

Commit 8663e84

Browse files
Use BigInt for stat; for logical leaves, by default throw an exception.
Also cleanups & scaladoc fixes per review comments.
1 parent 2f2fb89 commit 8663e84

6 files changed

Lines changed: 37 additions & 37 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.trees
2626
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
2727
self: Product =>
2828

29-
// TODO: handle overflow?
3029
/**
3130
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
3231
* corresponding statistic produced by the children. To override this behavior, override
@@ -40,7 +39,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
4039
* defaults to the product of children's `sizeInBytes`.
4140
*/
4241
case class Statistics(
43-
sizeInBytes: Long
42+
sizeInBytes: BigInt
4443
)
4544
lazy val statistics: Statistics = Statistics(
4645
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
@@ -112,7 +111,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
112111
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
113112
self: Product =>
114113

115-
override lazy val statistics = Statistics(sizeInBytes = 1L)
114+
override lazy val statistics: Statistics =
115+
throw new UnsupportedOperationException("default leaf nodes don't have meaningful Statistics")
116116

117117
// Leaf nodes by definition cannot reference any input attributes.
118118
override def references = Set.empty

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@ import java.util.Properties
2222
import scala.collection.JavaConverters._
2323

2424
/**
25-
* A trait that enables the setting and getting of mutable config parameters/hints. The central
26-
* location for storing them is uniquely located in the same-name private companion object.
27-
* Therefore, all classes that mix in this trait share all the hints.
25+
* A trait that enables the setting and getting of mutable config parameters/hints.
2826
*
29-
* In the presence of a SQLContext, these can be set and queried either by passing SET commands
30-
* into Spark SQL's DSL functions (sql(), hql(), etc.). Otherwise, users of this trait can
27+
* In the presence of a SQLContext, these can be set and queried by passing SET commands
28+
* into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
3129
* modify the hints by programmatically calling the setters and getters of this trait.
3230
*
33-
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
31+
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
3432
*/
3533
trait SQLConf {
3634
import SQLConf._

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,9 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
8282
}
8383

8484
@transient override lazy val statistics = Statistics(
85-
sizeInBytes = {
86-
alreadyPlanned match {
87-
// TODO: Instead of returning a default value here, find a way to return a meaningful
88-
// size estimate for RDDs. See PR 1238 for more discussions.
89-
case e: ExistingRdd => sqlContext.statsDefaultSizeInBytes
90-
case _ => 1L // TODO: consider adding statistics to physical plans as well.
91-
}
92-
}
85+
// TODO: Instead of returning a default value here, find a way to return a meaningful size
86+
// estimate for RDDs. See PR 1238 for more discussions.
87+
sizeInBytes = BigInt(sqlContext.statsDefaultSizeInBytes)
9388
)
9489

9590
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import scala.util.Try
21+
2022
import org.apache.spark.sql.{SQLContext, execution}
2123
import org.apache.spark.sql.catalyst.expressions._
2224
import org.apache.spark.sql.catalyst.planning._
@@ -72,19 +74,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7274

7375
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
7476
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
75-
if sqlContext.autoConvertJoinSize > 0 &&
76-
right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize =>
77-
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
77+
if Try(sqlContext.autoConvertJoinSize > 0 &&
78+
right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) =>
79+
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
7880

7981
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
80-
if sqlContext.autoConvertJoinSize > 0 &&
81-
left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize =>
82+
if Try(sqlContext.autoConvertJoinSize > 0 &&
83+
left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) =>
8284
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
8385

8486
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
8587
val buildSide =
86-
if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) BuildRight
87-
else BuildLeft
88+
if (Try(right.statistics.sizeInBytes <= left.statistics.sizeInBytes).getOrElse(false)) {
89+
BuildRight
90+
} else {
91+
BuildLeft
92+
}
8893
val hashJoin =
8994
execution.ShuffledHashJoin(
9095
leftKeys, rightKeys, buildSide, planLater(left), planLater(right))

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
2828
import org.apache.hadoop.hive.serde2.Deserializer
2929

3030
import org.apache.spark.annotation.DeveloperApi
31-
import org.apache.spark.sql.Logging
31+
import org.apache.spark.sql.{SQLContext, Logging}
3232
import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, Catalog}
3333
import org.apache.spark.sql.catalyst.expressions._
3434
import org.apache.spark.sql.catalyst.plans.logical
@@ -66,8 +66,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6666
// Since HiveQL is case insensitive for table names we make them all lowercase.
6767
MetastoreRelation(
6868
databaseName, tblName, alias)(
69-
table.getTTable, partitions.map(part => part.getTPartition))(
70-
hive.hiveconf, table.getPath)
69+
table.getTTable, partitions.map(part => part.getTPartition))(hive)
7170
}
7271

7372
def createTable(
@@ -252,7 +251,7 @@ object HiveMetastoreTypes extends RegexParsers {
252251
private[hive] case class MetastoreRelation
253252
(databaseName: String, tableName: String, alias: Option[String])
254253
(val table: TTable, val partitions: Seq[TPartition])
255-
(@transient hiveConf: HiveConf, @transient path: Path)
254+
(@transient sqlContext: SQLContext)
256255
extends LeafNode {
257256

258257
self: Product =>
@@ -270,15 +269,17 @@ private[hive] case class MetastoreRelation
270269
}
271270

272271
@transient override lazy val statistics = Statistics(
273-
// TODO: check if this estimate is valid for tables after partition pruning.
274272
sizeInBytes = {
275-
// NOTE: kind of hacky, but this should be relatively cheap if parameters for the table are
276-
// populated into the metastore. An alternative would be going through Hadoop's FileSystem
277-
// API, which can be expensive if a lot of RPCs are involved. Besides `totalSize`, there are
278-
// also `numFiles`, `numRows`, `rawDataSize` keys we can look at in the future.
279-
val sizeMaybeFromMetastore =
280-
Option(hiveQlTable.getParameters.get("totalSize")).map(_.toLong).getOrElse(-1L)
281-
math.max(sizeMaybeFromMetastore, 1L)
273+
// TODO: check if this estimate is valid for tables after partition pruning.
274+
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
275+
// relatively cheap if parameters for the table are populated into the metastore. An
276+
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
277+
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
278+
// `rawDataSize` keys that we can look at in the future.
279+
BigInt(
280+
Option(hiveQlTable.getParameters.get("totalSize"))
281+
.map(_.toLong)
282+
.getOrElse(sqlContext.statsDefaultSizeInBytes))
282283
}
283284
)
284285

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ class StatisticsSuite extends QueryTest {
3232
mr.statistics.sizeInBytes
3333
}
3434
assert(sizes.size === 1)
35-
assert(sizes(0) == 5812, s"expected exact size 5812 for test table 'src', got ${sizes(0)}")
35+
assert(sizes(0).equals(BigInt(5812)),
36+
s"expected exact size 5812 for test table 'src', got: ${sizes(0)}")
3637
}
3738

3839
test("auto converts to broadcast hash join, by size estimate of a relation") {

0 commit comments

Comments
 (0)