Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ private[hive] class SparkExecuteStatementOperation(
with Logging {

private var result: DataFrame = _
private var resultList: Option[Array[org.apache.spark.sql.Row]] = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write SparkRow for consistency? and init to None explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @srowen .
Sure!

private var iter: Iterator[SparkRow] = _
private var iterHeader: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private var statementId: String = _

Expand Down Expand Up @@ -103,6 +103,10 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def useIncrementalCollect: Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a def? will it ever change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. When we uses beeline, we can control this like the following.

0: jdbc:hive2://localhost:10000> set spark.sql.thriftServer.incrementalCollect=false;
+--------------------------------------------+--------+--+
|                    key                     | value  |
+--------------------------------------------+--------+--+
| spark.sql.thriftServer.incrementalCollect  | false  |
+--------------------------------------------+--------+--+
1 row selected (0.015 seconds)

0: jdbc:hive2://localhost:10000> select * from t;
+----+--+
| a  |
+----+--+
+----+--+
No rows selected (0.054 seconds)

sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document this configuration flag in SQLConf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I'll register this configuration into SQLConf explicitly.

}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
Expand All @@ -111,9 +115,15 @@ private[hive] class SparkExecuteStatementOperation(

// Reset iter to header when fetching start from first row
if (order.equals(FetchOrientation.FETCH_FIRST)) {
val (ita, itb) = iterHeader.duplicate
iter = ita
iterHeader = itb
iter = if (useIncrementalCollect) {
resultList = None
result.toLocalIterator.asScala
} else {
if (resultList.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this makes the implicit buffering implicit. So, if an iterator is duplicated into A and B, and all of A is consumed, then B will internally buffer everything from A so it can be replayed? and in our case, we know that A will be entirely consumed? then these are basically the same, yes.

But, does that solve the problem? this now always stores the whole result set locally. Is this avoiding a second whole copy of it?

What if you always just return result.collect().iterator here -- the problem is the re-collecting the result every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The following happens with iterator.duplicate.

So, if an iterator is duplicated into A and B, and all of A is consumed, then B will internally buffer everything from A so it can be replayed?

And, the whole result storing happens line 122 and line 245-246 for spark.sql.thriftServer.incrementalCollect=false only.

resultList = Some(result.collect())
resultList.get.iterator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I'm asking, why is this an improvement? because in the new version, you also buffer the whole result into memory locally.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. There are two cases and this PR targets incrementalCollect=true mainly.

you also buffer the whole result into memory locally.

First of all, before SPARK-16563, FETCH_FIRST is not supported correctly because iterator can be traversed once.

  • Case 1) incrementalCollect=false
    Creating a whole result in a memory once by result.collect was the the original Spark way before SPARK-16563.
    If we can create a whole result once during the query processing, I think we can keep that for FETCH_FIRST with less side effect.
    So, I keep them. If this is not allowed, we have to go Case 2.

  • Case 2) incrementalCollect=true
    In this case, by definition, we cannot create the whole result set in a memory at any time during the query processing. There is no way to find the first row with iterator. result.toLocalIterator.asScala should be called whenever FETCH_FIRST is used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I believe I get it now. I see your approach and it makes sense.
The only real change here is that you hold on to a reference to the whole data set here rather than collect() it into memory. Maybe that's the right thing to do but that's the only thing I'm wondering about. Previously it seems like it would collect() each time anyway?

Just wondering if that's actually simpler, to avoid keeping a reference to the whole data set, or whether that defeats a purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect() is still intended to be called once logically. The following is the reason why there exists two collect().

When useIncrementalCollect=false, collect() is called at line 244 once and resultList will not be None.

However, if users executes a query with useIncrementalCollect=true and they changes their mind to turn off as useIncrementalCollect=false. The next getNextRowSet(FetchOrientation.FETCH_FIRST) should check resultList and fill that by calling collect() once in line 123.

resultList = Some(result.collect())
}
resultList.get.iterator
}
}

if (!iter.hasNext) {
Expand Down Expand Up @@ -227,17 +237,14 @@ private[hive] class SparkExecuteStatementOperation(
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
resultList = None
result.toLocalIterator.asScala
} else {
result.collect().iterator
resultList = Some(result.collect())
resultList.get.iterator
}
}
val (itra, itrb) = iter.duplicate
iterHeader = itra
iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>
Expand Down