File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -1854,6 +1854,24 @@ def test_collect_functions(self):
18541854 sorted (df .select (functions .collect_list (df .value ).alias ('r' )).collect ()[0 ].r ),
18551855 ["1" , "2" , "2" , "2" ])
18561856
1857+ def test_limit_and_take (self ):
1858+ df = self .spark .range (1 , 1000 , numPartitions = 10 )
1859+
1860+ def assert_runs_only_one_job_stage_and_task (job_group_name , f ):
1861+ tracker = self .sc .statusTracker ()
1862+ self .sc .setJobGroup (job_group_name , description = "" )
1863+ f ()
1864+ jobs = tracker .getJobIdsForGroup (job_group_name )
1865+ self .assertEqual (1 , len (jobs ))
1866+ stages = tracker .getJobInfo (jobs [0 ]).stageIds
1867+ self .assertEqual (1 , len (stages ))
1868+ self .assertEqual (1 , tracker .getStageInfo (stages [0 ]).numTasks )
1869+
1870+ # Regression test for SPARK-10731: take should delegate to Scala implementation
1871+ assert_runs_only_one_job_stage_and_task ("take" , lambda : df .take (1 ))
1872+ # Regression test for SPARK-17514: limit(n).collect() should the perform same as take(n)
1873+ assert_runs_only_one_job_stage_and_task ("collect_limit" , lambda : df .limit (1 ).collect ())
1874+
18571875
18581876if __name__ == "__main__" :
18591877 from pyspark .sql .tests import *
You can’t perform that action at this time.
0 commit comments