Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
}
}

override lazy val statistics =
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics =
Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length)
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*
* [[LeafNode]]s must override this.
*/
def statistics: Statistics = {
def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
// def statistics(): Statistics = {
if (children.isEmpty) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}
Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
Statistics(sizeInBytes = children.map(
_.statistics(Option(this +: parents.getOrElse(Seq.empty[LogicalPlan]))).sizeInBytes).product)
}

/**
Expand Down Expand Up @@ -306,20 +308,24 @@ abstract class UnaryNode extends LogicalPlan {

override protected def validConstraints: Set[Expression] = child.constraints

override def statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
// There should be some overhead in Row object, the size should not be zero when there is
// no columns, this help to prevent divide-by-zero error.
val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8
val outputRowSize = output.map(_.dataType.defaultSize).sum + 8
val childStats = child.statistics(
Option(this +: parents.getOrElse(Seq.empty[LogicalPlan]))
)
val childNumRows = childStats.sizeInBytes / childRowSize
// Assume there will be the same number of rows as child has.
var sizeInBytes = (child.statistics.sizeInBytes * outputRowSize) / childRowSize
var sizeInBytes = (childNumRows * outputRowSize)
if (sizeInBytes == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
sizeInBytes = 1
}

child.statistics.copy(sizeInBytes = sizeInBytes)
childStats.copy(sizeInBytes = sizeInBytes)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
}
}

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
val leftSize = left.statistics.sizeInBytes
val rightSize = right.statistics.sizeInBytes
val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
Expand All @@ -184,7 +184,7 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(le
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
left.statistics.copy()
}
}
Expand Down Expand Up @@ -224,7 +224,7 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
children.length > 1 && childrenResolved && allChildrenCompatible
}

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
Statistics(sizeInBytes = sizeInBytes)
}
Expand Down Expand Up @@ -333,15 +333,16 @@ case class Join(
case _ => resolvedExceptNatural
}

override lazy val statistics: Statistics = joinType match {
case LeftAnti | LeftSemi =>
// LeftSemi and LeftAnti won't ever be bigger than left
left.statistics.copy()
case _ =>
// make sure we don't propagate isBroadcastable in other joins, because
// they could explode the size.
super.statistics.copy(isBroadcastable = false)
}
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics =
joinType match {
case LeftAnti | LeftSemi =>
// LeftSemi and LeftAnti won't ever be bigger than left
left.statistics.copy()
case _ =>
// make sure we don't propagate isBroadcastable in other joins, because
// they could explode the size.
super.statistics.copy(isBroadcastable = false)
}
}

/**
Expand All @@ -351,7 +352,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

// set isBroadcastable to true so the child will be broadcasted
override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics =
super.statistics.copy(isBroadcastable = true)
}

case class InsertIntoTable(
Expand Down Expand Up @@ -450,7 +452,7 @@ case class Range(

override def newInstance(): Range = copy(output = output.map(_.newInstance()))

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
val sizeInBytes = LongType.defaultSize * numElements
Statistics( sizeInBytes = sizeInBytes )
}
Expand Down Expand Up @@ -487,7 +489,7 @@ case class Aggregate(
child.constraints.union(getAliasedConstraints(nonAgg))
}

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
if (groupingExpressions.isEmpty) {
super.statistics.copy(sizeInBytes = 1)
} else {
Expand Down Expand Up @@ -587,7 +589,7 @@ case class Expand(
override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
val sizeInBytes = super.statistics.sizeInBytes * projections.length
Statistics(sizeInBytes = sizeInBytes)
}
Expand Down Expand Up @@ -659,7 +661,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
case _ => None
}
}
override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = if (limit == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
Expand All @@ -680,7 +682,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
case _ => None
}
}
override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = if (limit == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
Expand Down Expand Up @@ -719,7 +721,7 @@ case class Sample(

override def output: Seq[Attribute] = child.output

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
val ratio = upperBound - lowerBound
// BigInt can't multiply with Double
var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
Expand Down Expand Up @@ -766,5 +768,6 @@ case object OneRowRelation extends LeafNode {
*
* [[LeafNode]]s must override this.
*/
override lazy val statistics: Statistics = Statistics(sizeInBytes = 1)
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics =
Statistics(sizeInBytes = 1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ case class ExternalRDD[T](

override protected def stringArgs: Iterator[Any] = Iterator(output)

@transient override lazy val statistics: Statistics = Statistics(
override def producedAttributes: AttributeSet = outputSet

@transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None):
Statistics =
Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
Expand Down Expand Up @@ -147,7 +151,8 @@ case class LogicalRDD(

override protected def stringArgs: Iterator[Any] = Iterator(output)

@transient override lazy val statistics: Statistics = Statistics(
@transient override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None):
Statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.statistics.isBroadcastable ||
plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold
val stats = plan.statistics
stats.isBroadcastable || stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.CollectionAccumulator
Expand Down Expand Up @@ -73,7 +73,7 @@ case class InMemoryRelation(

@transient val partitionStatistics = new PartitionStatistics(output)

override lazy val statistics: Statistics = {
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics = {
if (batchStats.value.isEmpty) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ case class LogicalRelation(
// expId can be different but the relation is still the same.
override lazy val cleanArgs: Seq[Any] = Seq(relation)

@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = BigInt(relation.sizeInBytes)
)
override def statistics(implicit parents: Option[Seq[LogicalPlan]] = None): Statistics =
Statistics(sizeInBytes = BigInt(relation.sizeInBytes))

/** Used to lookup original attribute capitalization */
val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ENABLE_PRUNED_PARTITION_STATS =
SQLConfigBuilder("spark.sql.statistics.prunedPartitionStats")
.doc("When enabled, spark-sql would try and calculate stats based on size of partitions" +
" specified in your filter condition. i.e. if you run a query like " +
"`select * from src s , dest d where s.partition = 'partition1' and s.key = d.key`" +
" instead of using the entire table's totalSize, rawSize stats, spark will consider only " +
" the size of partition 1 for table src. Currently the optimization is only available for" +
" equality check and won't be applied if partition column is specified in any other check" +
" or if partition column is used in operators other than And and OR. In those events stat" +
" calculation falls back to totalSize/rawSize.")
.booleanConf
.createWithDefault(false)

val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
.internal()
.doc("The default table size used in query planning. By default, it is set to Long.MaxValue " +
Expand Down Expand Up @@ -629,6 +642,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def prunedPartitionStatsEnabled: Boolean = getConf(ENABLE_PRUNED_PARTITION_STATS)

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)

def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
Expand Down
Loading