Skip to content

Commit db82040

Browse files
committed
Support bucketing for Hive tables
1 parent f4f1468 commit db82040

1 file changed

Lines changed: 53 additions & 0 deletions

File tree

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,59 @@ case class HiveTableScanExec(
166166

167167
override def output: Seq[Attribute] = attributes
168168

169+
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
170+
val bucketSpec = relation.catalogTable.bucketSpec
171+
172+
bucketSpec match {
173+
case Some(spec) =>
174+
// For bucketed columns:
175+
// -----------------------
176+
// `HashPartitioning` would be used only when:
177+
// 1. ALL the bucketing columns are being read from the table
178+
//
179+
// For sorted columns:
180+
// ---------------------
181+
// Sort ordering should be used when ALL these criteria's match:
182+
// 1. `HashPartitioning` is being used
183+
// 2. A prefix (or all) of the sort columns are being read from the table.
184+
//
185+
// Sort ordering would be over the prefix subset of `sort columns` being read
186+
// from the table.
187+
// eg.
188+
// Assume (col0, col2, col3) are the columns read from the table
189+
// If sort columns are (col0, col1), then sort ordering would be considered as (col0)
190+
// If sort columns are (col1, col0), then sort ordering would be empty as per rule #2
191+
// above
192+
193+
def toAttribute(colName: String): Option[Attribute] =
194+
output.find(_.name == colName)
195+
196+
val bucketColumns = spec.bucketColumnNames.flatMap(n => toAttribute(n))
197+
if (bucketColumns.size == spec.bucketColumnNames.size) {
198+
val partitioning = HashPartitioning(bucketColumns, spec.numBuckets)
199+
val sortColumns =
200+
spec.sortColumnNames.map(x => toAttribute(x)).takeWhile(x => x.isDefined).map(_.get)
201+
202+
val sortOrder = if (sortColumns.nonEmpty) {
203+
// In case of bucketing, its possible to have multiple files belonging to the
204+
// same bucket in a given relation. Each of these files are locally sorted
205+
// but those files combined together are not globally sorted. Given that,
206+
// the RDD partition will not be sorted even if the relation has sort columns set
207+
// Current solution is to check if all the buckets have a single file in it
208+
209+
sortColumns.map(attribute => SortOrder(attribute, Ascending))
210+
} else {
211+
Nil
212+
}
213+
(partitioning, sortOrder)
214+
} else {
215+
(UnknownPartitioning(0), Nil)
216+
}
217+
case _ =>
218+
(UnknownPartitioning(0), Nil)
219+
}
220+
}
221+
169222
override def sameResult(plan: SparkPlan): Boolean = plan match {
170223
case other: HiveTableScanExec =>
171224
val thisPredicates = partitionPruningPred.map(cleanExpression)

0 commit comments

Comments
 (0)