Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def executeCollect(): Array[InternalRow] = child.executeTake(limit)

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -200,18 +204,31 @@ case class TakeOrderedAndProject(
projectOutput.getOrElse(child.output)
}

override def outputsUnsafeRows: Boolean = if (projectList.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easy that just process UnsafeRow and output UnsafeRow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I need to close this again.....

true
} else {
child.outputsUnsafeRows
}

override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def outputPartitioning: Partitioning = SinglePartition

// We need to use an interpreted ordering here because generated orderings cannot be serialized
// and this ordering needs to be created on the driver in order to be passed into Spark core code.
private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)

// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
@transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
@transient private val projection = projectList.map(UnsafeProjection.create(_, child.output))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InterpretedProjection can be replaced by UnsafeProjection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I still have a dumb question. When calling the eval of each of the specified expressions, how can we know they can process unsafe rows? Why does the planner insert unsafe->safe conversion in the original design of TakeOrderedAndProject?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is just because not all expressions support unsafe before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now all the expressions can support unsafe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If there are expressions still not supporting unsafe, we should make it support.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you!


private def collectData(): Array[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
projection.map(data.map(_)).getOrElse(data)
if (projection.isDefined) {
projection.map(p => data.map(p(_).copy().asInstanceOf[InternalRow])).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need copy here? We have already copied the rows when getting data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I was thinking that it is needed to copy the returned row because it is the same object. But after I checked GenerateUnsafeProjection, looks like it will create new row every time. I've updated it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it will be a problem without this copy(). HiveCompatibilitySuite will be failed.

[info]   key    value
[info]   !== HIVE - 5 row(s) ==   == CATALYST - 5 row(s) ==
[info]   !0 val_0                 4 val_4
[info]   !0 val_0                 4 val_4
[info]   !0 val_0                 4 val_4
[info]   !2 val_2                 4 val_4
[info]    4 val_4                 4 val_4

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-checked GenerateUnsafeProjection, it will return the same unsafe row. So we should use another copy() here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I misread the code, the copy is needed even we already copied before takeOrdered.

} else {
data
}
}

override def executeCollect(): Array[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
mapData.collect().take(1).map(Row.fromTuple).toSeq)
}

test("sort and limit") {
checkAnswer(
sql("SELECT * FROM arrayData ORDER BY data[0] ASC LIMIT 1"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this test fail without your change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed. I just test it locally.

arrayData.collect().sortBy(_.data(0)).map(Row.fromTuple).take(1).toSeq)
}

test("CTE feature") {
checkAnswer(
sql("with q1 as (select * from testData limit 10) select * from q1"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(outputsUnsafe.outputsUnsafeRows)

test("planner should insert unsafe->safe conversions when required") {
ignore("planner should insert unsafe->safe conversions when required") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @marmbrus @yhuai should we remove this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other cases that we will have the conversion? Or, we can create a dummy operator that only accepts safe rows. So, we can still test the logic of adding conversions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a dummy node for it. Thanks.

val plan = Limit(10, outputsUnsafe)
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
assert(preparedPlan.children.head.isInstanceOf[ConvertToSafe])
Expand Down