Skip to content

Commit 601c649

Browse files
Alicerxin
authored andcommitted
[SPARK-16563][SQL] fix spark sql thrift server FetchResults bug
## What changes were proposed in this pull request? Add a constant iterator which point to head of result. The header will be used to reset iterator when fetch result from first row repeatedly. JIRA ticket https://issues.apache.org/jira/browse/SPARK-16563 ## How was this patch tested? This bug was found when using Cloudera HUE connecting to spark sql thrift server, currently SQL statement result can be only fetched for once. The fix was tested manually with Cloudera HUE, With this fix, HUE can fetch spark SQL results repeatedly through thrift server. Author: Alice <[email protected]> Author: Alice <[email protected]> Closes #14218 from alicegugu/SparkSQLFetchResultsBug. (cherry picked from commit e17a76e) Signed-off-by: Reynold Xin <[email protected]>
1 parent 6fc54b7 commit 601c649

2 files changed

Lines changed: 60 additions & 0 deletions

File tree

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ private[hive] class SparkExecuteStatementOperation(
5151

5252
private var result: DataFrame = _
5353
private var iter: Iterator[SparkRow] = _
54+
private var iterHeader: Iterator[SparkRow] = _
5455
private var dataTypes: Array[DataType] = _
5556
private var statementId: String = _
5657

@@ -110,6 +111,14 @@ private[hive] class SparkExecuteStatementOperation(
110111
assertState(OperationState.FINISHED)
111112
setHasResultSet(true)
112113
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
114+
115+
// Reset iter to header when fetching start from first row
116+
if (order.equals(FetchOrientation.FETCH_FIRST)) {
117+
val (ita, itb) = iterHeader.duplicate
118+
iter = ita
119+
iterHeader = itb
120+
}
121+
113122
if (!iter.hasNext) {
114123
resultRowSet
115124
} else {
@@ -228,6 +237,9 @@ private[hive] class SparkExecuteStatementOperation(
228237
result.collect().iterator
229238
}
230239
}
240+
val (itra, itrb) = iter.duplicate
241+
iterHeader = itra
242+
iter = itrb
231243
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
232244
} catch {
233245
case e: HiveSQLException =>

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import org.apache.hive.service.auth.PlainSaslHelper
3636
import org.apache.hive.service.cli.GetInfoType
3737
import org.apache.hive.service.cli.thrift.TCLIService.Client
3838
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
39+
import org.apache.hive.service.cli.FetchOrientation
40+
import org.apache.hive.service.cli.FetchType
3941
import org.apache.thrift.protocol.TBinaryProtocol
4042
import org.apache.thrift.transport.TSocket
4143
import org.scalatest.BeforeAndAfterAll
@@ -91,6 +93,52 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
9193
}
9294
}
9395

96+
test("SPARK-16563 ThriftCLIService FetchResults repeat fetching result") {
97+
withCLIServiceClient { client =>
98+
val user = System.getProperty("user.name")
99+
val sessionHandle = client.openSession(user, "")
100+
101+
withJdbcStatement { statement =>
102+
val queries = Seq(
103+
"DROP TABLE IF EXISTS test_16563",
104+
"CREATE TABLE test_16563(key INT, val STRING)",
105+
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_16563")
106+
107+
queries.foreach(statement.execute)
108+
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
109+
val operationHandle = client.executeStatement(
110+
sessionHandle,
111+
"SELECT * FROM test_16563",
112+
confOverlay)
113+
114+
// Fetch result first time
115+
assertResult(5, "Fetching result first time from next row") {
116+
117+
val rows_next = client.fetchResults(
118+
operationHandle,
119+
FetchOrientation.FETCH_NEXT,
120+
1000,
121+
FetchType.QUERY_OUTPUT)
122+
123+
rows_next.numRows()
124+
}
125+
126+
// Fetch result second time from first row
127+
assertResult(5, "Repeat fetching result from first row") {
128+
129+
val rows_first = client.fetchResults(
130+
operationHandle,
131+
FetchOrientation.FETCH_FIRST,
132+
1000,
133+
FetchType.QUERY_OUTPUT)
134+
135+
rows_first.numRows()
136+
}
137+
statement.executeQuery("DROP TABLE IF EXISTS test_16563")
138+
}
139+
}
140+
}
141+
94142
test("JDBC query execution") {
95143
withJdbcStatement { statement =>
96144
val queries = Seq(

0 commit comments

Comments
 (0)