Skip to content

Commit 6fa9d92

Browse files
committed
Implement PySpark take as limit + collect.
1 parent 44984a7 commit 6fa9d92

File tree

3 files changed

+8
-18
lines changed

3 files changed

+8
-18
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,7 @@ def take(self, num):
357357
>>> df.take(2)
358358
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
359359
"""
360-
with SCCallSiteSync(self._sc) as css:
361-
port = self._sc._jvm.org.apache.spark.sql.execution.python.EvaluatePython.takeAndServe(
362-
self._jdf, num)
363-
return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
360+
return self.limit(num).collect()
364361

365362
@since(1.3)
366363
def foreach(self, f):

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
2929
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3030
import org.apache.spark.api.java.JavaRDD
3131
import org.apache.spark.api.java.function._
32-
import org.apache.spark.api.python.PythonRDD
32+
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
3333
import org.apache.spark.broadcast.Broadcast
3434
import org.apache.spark.rdd.RDD
3535
import org.apache.spark.sql.catalyst._
@@ -2567,8 +2567,12 @@ class Dataset[T] private[sql](
25672567
}
25682568

25692569
private[sql] def collectToPython(): Int = {
2570+
EvaluatePython.registerPicklers()
25702571
withNewExecutionId {
2571-
PythonRDD.collectAndServe(javaToPython.rdd)
2572+
val toJava: (Any) => Any = EvaluatePython.toJava(_, schema)
2573+
val iter = new SerDeUtil.AutoBatchedPickler(
2574+
queryExecution.executedPlan.executeCollect().iterator.map(toJava))
2575+
PythonRDD.serveIterator(iter, s"serve-DataFrame")
25722576
}
25732577
}
25742578

sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,15 @@ import scala.collection.JavaConverters._
2424

2525
import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
2626

27-
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
27+
import org.apache.spark.api.python.SerDeUtil
2828
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.sql.DataFrame
3029
import org.apache.spark.sql.catalyst.InternalRow
3130
import org.apache.spark.sql.catalyst.expressions._
3231
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
3332
import org.apache.spark.sql.types._
3433
import org.apache.spark.unsafe.types.UTF8String
3534

3635
object EvaluatePython {
37-
def takeAndServe(df: DataFrame, n: Int): Int = {
38-
registerPicklers()
39-
df.withNewExecutionId {
40-
val iter = new SerDeUtil.AutoBatchedPickler(
41-
df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
42-
EvaluatePython.toJava(row, df.schema)
43-
})
44-
PythonRDD.serveIterator(iter, s"serve-DataFrame")
45-
}
46-
}
4736

4837
def needConversionInPython(dt: DataType): Boolean = dt match {
4938
case DateType | TimestampType => true

0 commit comments

Comments
 (0)