Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def executeCollect(): Array[InternalRow] = child.executeTake(limit)

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -200,18 +204,31 @@ case class TakeOrderedAndProject(
projectOutput.getOrElse(child.output)
}

override def outputsUnsafeRows: Boolean = if (projectList.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easy that just process UnsafeRow and output UnsafeRow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I need to close this again.....

true
} else {
child.outputsUnsafeRows
}

override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def outputPartitioning: Partitioning = SinglePartition

// We need to use an interpreted ordering here because generated orderings cannot be serialized
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)

// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
@transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
@transient private val projection = projectList.map(UnsafeProjection.create(_, child.output))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InterpretedProjection can be replaced by UnsafeProjection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I still have a dumb question. When calling the eval of each of the specified expressions, how can we know they can process unsafe rows? Why does the planner insert unsafe->safe conversion in the original design of TakeOrderedAndProject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is just because not all expressions support unsafe before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now all the expressions can support unsafe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If there are expressions still not supporting unsafe, we should make it support.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you!


private def collectData(): Array[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
projection.map(data.map(_)).getOrElse(data)
if (projection.isDefined) {
projection.map(p => data.map(p(_).asInstanceOf[InternalRow])).get
} else {
data
}
}

override def executeCollect(): Array[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(outputsUnsafe.outputsUnsafeRows)

test("planner should insert unsafe->safe conversions when required") {
ignore("planner should insert unsafe->safe conversions when required") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @marmbrus @yhuai should we remove this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other cases that we will have the conversion? Or, we can create a dummy operator that only accepts safe rows. So, we can still test the logic of adding conversions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a dummy node for it. Thanks.

val plan = Limit(10, outputsUnsafe)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
Expand Down