Skip to content

Commit 3e5d77c

Browse files
WIP: giant and messy WIP.
1 parent a92ed0c commit 3e5d77c

4 files changed

Lines changed: 72 additions & 7 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {
4545
* that schema.
4646
*
4747
* In contrast to a normal projection, a MutableProjection reuses the same underlying row object
48-
* each time an input row is added. This significatly reduces the cost of calcuating the
49-
* projection, but means that it is not safe
48+
* each time an input row is added. This significantly reduces the cost of calculating the
49+
* projection, but means that it is not safe ...?
5050
*/
5151
case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
5252
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import org.apache.hadoop.fs.FileSystem
2021
import org.apache.spark.sql.{SQLContext, execution}
2122
import org.apache.spark.sql.catalyst.expressions._
2223
import org.apache.spark.sql.catalyst.planning._
@@ -38,6 +39,26 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3839

3940
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
4041

42+
// case HashFilteredJoin(
43+
// Inner,
44+
// leftKeys,
45+
// rightKeys,
46+
// condition,
47+
// left,
48+
// right @ PhysicalOperation(_, _, b: MetastoreRelation))
49+
// if tableRawSizeBelowThreshold(left) =>
50+
// // TODO: these will be used
51+
//// import org.apache.hadoop.fs.ContentSummary
52+
//// import org.apache.hadoop.fs.FileSystem
53+
//// import org.apache.hadoop.fs.Path
54+
//
55+
// FileSystem.get()
56+
//
57+
// val hashJoin =
58+
// execution.BroadcastHashJoin(
59+
// leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext)
60+
// condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
61+
4162
case HashFilteredJoin(
4263
Inner,
4364
leftKeys,
@@ -129,8 +150,25 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
129150
}
130151
}
131152

153+
// // FIXME(zongheng): WIP
154+
// object AutoBroadcastHashJoin extends Strategy {
155+
// def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
156+
// case logical.Join(left, right, joinType, condition) =>
157+
//
158+
// execution.BroadcastHashJoin()
159+
//
160+
// execution.BroadcastNestedLoopJoin(
161+
// planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil
162+
// case _ => Nil
163+
// }
164+
// }
165+
132166
object BroadcastNestedLoopJoin extends Strategy {
133167
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
168+
169+
// FIXME: WIP -- auto broadcast hash join
170+
case logical.Join
171+
134172
case logical.Join(left, right, joinType, condition) =>
135173
execution.BroadcastNestedLoopJoin(
136174
planLater(left), planLater(right), joinType, condition)(sparkContext) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ trait HashJoin {
109109
/**
110110
* Searches the streamed iterator for the next row that has at least one match in hashtable.
111111
*
112-
* @return true if the search is successful, and false the streamed iterator runs out of
112+
* @return true if the search is successful, and false if the streamed iterator runs out of
113113
* tuples.
114114
*/
115115
private final def fetchNext(): Boolean = {
@@ -136,7 +136,7 @@ trait HashJoin {
136136

137137
/**
138138
* :: DeveloperApi ::
139-
* Performs and inner hash join of two child relations by first shuffling the data using the join
139+
* Performs an inner hash join of two child relations by first shuffling the data using the join
140140
* keys.
141141
*/
142142
@DeveloperApi
@@ -163,9 +163,10 @@ case class ShuffledHashJoin(
163163

164164
/**
165165
* :: DeveloperApi ::
166-
* Performs an inner hash join of two child relations. When the operator is constructed, a Spark
167-
* job is asynchronously started to calculate the values for the broadcasted relation. This data
168-
* is then placed in a Spark broadcast variable. The streamed relation is not shuffled.
166+
* Performs an inner hash join of two child relations. When the output RDD of this operator is
167+
* being constructed, a Spark job is asynchronously started to calculate the values for the
168+
* broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed
169+
* relation is not shuffled.
169170
*/
170171
@DeveloperApi
171172
case class BroadcastHashJoin(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import org.apache.hadoop.fs.FileSystem
21+
22+
import org.apache.spark.sql
2023
import org.apache.spark.sql.SQLContext
2124
import org.apache.spark.sql.catalyst.expressions._
2225
import org.apache.spark.sql.catalyst.planning._
@@ -32,6 +35,29 @@ private[hive] trait HiveStrategies {
3235

3336
val hiveContext: HiveContext
3437

38+
// FIXME(zongheng): WIP
39+
object HashJoin extends Strategy {
40+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
41+
case HashFilteredJoin(
42+
Inner,
43+
leftKeys,
44+
rightKeys,
45+
condition,
46+
left,
47+
right @ PhysicalOperation(_, _, b: MetastoreRelation)) =>
48+
49+
val path = b.hiveQlTable.getPath
50+
val fs = path.getFileSystem(hiveContext.hiveconf)
51+
val size = fs.getContentSummary(path).getLength // TODO: in bytes?
52+
53+
54+
val hashJoin =
55+
sql.execution.BroadcastHashJoin(
56+
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))(sparkContext)
57+
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
58+
}
59+
}
60+
3561
object Scripts extends Strategy {
3662
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3763
case logical.ScriptTransformation(input, script, output, child) =>

0 commit comments

Comments
 (0)