Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def executeCollect(): Array[InternalRow] = child.executeTake(limit)

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -200,18 +204,26 @@ case class TakeOrderedAndProject(
projectOutput.getOrElse(child.output)
}

override def outputsUnsafeRows: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the projectList is None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added few lines to check if we need do extra unsafe projection for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you have done when projectList is None is exactly same with ConvertToUnsafe right? How about we change this to if (projectList.isDefined) true else child.outputsUnsafeRows, then our framework can insert ConvertToUnsafe if it's necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I've updated it.

override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def outputPartitioning: Partitioning = SinglePartition

// We need to use an interpreted ordering here because generated orderings cannot be serialized
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)

// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
@transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
@transient private val projection = projectList.map(UnsafeProjection.create(_, child.output))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InterpretedProjection can be replaced by UnsafeProjection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I still have a dumb question. When calling the eval of each of the specified expressions, how can we know they can process unsafe rows? Why does the planner insert unsafe->safe conversion in the original design of TakeOrderedAndProject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is just because not all expressions support unsafe before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now all the expressions can support unsafe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If there are expressions still not supporting unsafe, we should make it support.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you!


private def collectData(): Array[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
projection.map(data.map(_)).getOrElse(data)
if (projection.isDefined) {
projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need copy here? We have already copied the rows when getting data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I was thinking that it is needed to copy the returned row because it is the same object. But after I checked GenerateUnsafeProjection, looks like it will create new row every time. I've updated it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it will be a problem without this copy(). HiveCompatibilitySuite will be failed.

[info]   key    value
[info]   !== HIVE - 5 row(s) ==   == CATALYST - 5 row(s) ==
[info]   !0 val_0                 4 val_4
[info]   !0 val_0                 4 val_4
[info]   !0 val_0                 4 val_4
[info]   !2 val_2                 4 val_4
[info]    4 val_4                 4 val_4

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-checked GenerateUnsafeProjection, it will return the same unsafe row. So we should use another copy() here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I misread the code, the copy is needed even we already copied before takeOrdered.

} else {
data
}
}

override def executeCollect(): Array[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(outputsUnsafe.outputsUnsafeRows)

test("planner should insert unsafe->safe conversions when required") {
ignore("planner should insert unsafe->safe conversions when required") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @marmbrus @yhuai should we remove this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other cases that we will have the conversion? Or, we can create a dummy operator that only accepts safe rows. So, we can still test the logic of adding conversions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a dummy node for it. Thanks.

val plan = Limit(10, outputsUnsafe)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
Expand Down