@@ -301,6 +301,38 @@ def f(iterator):
301301 output = self ._run_stream (test_input , test_func , expected_output , numSlices )
302302 self .assertEqual (expected_output , output )
303303
304+ def test_countByValue_batch (self ):
305+ """Basic operation test for DStream.countByValue with batch deserializer"""
306+ test_input = [range (1 , 5 ) + range (1 ,5 ), range (5 , 7 ) + range (5 , 9 ), ["a" ] * 2 + ["b" ] + ["" ] ]
307+
308+ def test_func (dstream ):
309+ return dstream .countByValue ()
310+ expected_output = [[(1 , 2 ), (2 , 2 ), (3 , 2 ), (4 , 2 )],
311+ [(5 , 2 ), (6 , 2 ), (7 , 1 ), (8 , 1 )],
312+ [("a" , 2 ), ("b" , 1 ), ("" , 1 )]]
313+ output = self ._run_stream (test_input , test_func , expected_output )
314+ for result in (output , expected_output ):
315+ self ._sort_result_based_on_key (result )
316+ self .assertEqual (expected_output , output )
317+
318+ def test_countByValue_unbatch (self ):
319+ """Basic operation test for DStream.countByValue with unbatch deserializer"""
320+ test_input = [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
321+
322+ def test_func (dstream ):
323+ return dstream .countByValue ()
324+ expected_output = [[(1 , 1 ), (2 , 1 ), (3 , 1 )],
325+ [(1 , 2 ), ("" , 1 )],
326+ [("a" , 2 ), ("b" , 1 )]]
327+ output = self ._run_stream (test_input , test_func , expected_output )
328+ for result in (output , expected_output ):
329+ self ._sort_result_based_on_key (result )
330+ self .assertEqual (expected_output , output )
331+
332+ def _sort_result_based_on_key (self , outputs ):
333+ for output in outputs :
334+ output .sort (key = lambda x : x [0 ])
335+
304336 def _run_stream (self , test_input , test_func , expected_output , numSlices = None ):
305337 """Start stream and return the output"""
306338 # Generate input stream with user-defined input
0 commit comments