Skip to content

Commit cba3934

Browse files
author
Davies Liu
committed
output UnsafeRow from Hive
1 parent 94fcd01 commit cba3934

3 files changed

Lines changed: 16 additions & 9 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,16 +257,16 @@ case class Window(
257257
* @return the final resulting projection.
258258
*/
259259
private[this] def createResultProjection(
260-
expressions: Seq[Expression]): MutableProjection = {
260+
expressions: Seq[Expression]): UnsafeProjection = {
261261
val references = expressions.zipWithIndex.map{ case (e, i) =>
262262
// Results of window expressions will be on the right side of child's output
263263
BoundReference(child.output.size + i, e.dataType, e.nullable)
264264
}
265265
val unboundToRefMap = expressions.zip(references).toMap
266266
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
267-
newMutableProjection(
267+
UnsafeProjection.create(
268268
projectList ++ patchedWindowExpression,
269-
child.output)()
269+
child.output)
270270
}
271271

272272
protected override def doExecute(): RDD[InternalRow] = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,17 @@ case class HiveTableScan(
132132
}
133133
}
134134

135-
protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
136-
hadoopReader.makeRDDForTable(relation.hiveQlTable)
137-
} else {
138-
hadoopReader.makeRDDForPartitionedTable(
139-
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
135+
protected override def doExecute(): RDD[InternalRow] = {
136+
val rdd = if (!relation.hiveQlTable.isPartitioned) {
137+
hadoopReader.makeRDDForTable(relation.hiveQlTable)
138+
} else {
139+
hadoopReader.makeRDDForPartitionedTable(
140+
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
141+
}
142+
rdd.mapPartitionsInternal { iter =>
143+
val proj = UnsafeProjection.create(schema)
144+
iter.map(proj)
145+
}
140146
}
141147

142148
override def output: Seq[Attribute] = attributes

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ case class ScriptTransformation(
213213

214214
child.execute().mapPartitions { iter =>
215215
if (iter.hasNext) {
216-
processIterator(iter)
216+
val proj = UnsafeProjection.create(schema)
217+
processIterator(iter).map(proj)
217218
} else {
218219
// If the input iterator has no rows then do not launch the external script.
219220
Iterator.empty

0 commit comments

Comments
 (0)