@@ -117,8 +117,8 @@ class BasicOperationsSuite extends TestSuiteBase {
117117 test(" groupByKey" ) {
118118 testOperation(
119119 Seq ( Seq (" a" , " a" , " b" ), Seq (" " , " " ), Seq () ),
120- (s : DStream [String ]) => s.map(x => (x, 1 )).groupByKey(),
121- Seq ( Seq ((" a" , Seq (1 , 1 ).toIterator ), (" b" , Seq (1 ).toIterator )), Seq ((" " , Seq (1 , 1 ).toIterator )), Seq () ),
120+ (s : DStream [String ]) => s.map(x => (x, 1 )).groupByKey().mapValues(_.toSeq) ,
121+ Seq ( Seq ((" a" , Seq (1 , 1 )), (" b" , Seq (1 ))), Seq ((" " , Seq (1 , 1 ))), Seq () ),
122122 true
123123 )
124124 }
@@ -245,13 +245,13 @@ class BasicOperationsSuite extends TestSuiteBase {
245245 val inputData1 = Seq ( Seq (" a" , " a" , " b" ), Seq (" a" , " " ), Seq (" " ), Seq () )
246246 val inputData2 = Seq ( Seq (" a" , " a" , " b" ), Seq (" b" , " " ), Seq (), Seq () )
247247 val outputData = Seq (
248- Seq ( (" a" , (Seq (1 , 1 ).toIterator , Seq (" x" , " x" ).toIterator )), (" b" , (Seq (1 ).toIterator , Seq (" x" ).toIterator )) ),
249- Seq ( (" a" , (Seq (1 ).toIterator , Seq ().toIterator )), (" b" , (Seq ().toIterator , Seq (" x" ).toIterator )), (" " , (Seq (1 ).toIterator , Seq (" x" ).toIterator )) ),
250- Seq ( (" " , (Seq (1 ).toIterator , Seq ().toIterator )) ),
248+ Seq ( (" a" , (Seq (1 , 1 ), Seq (" x" , " x" ))), (" b" , (Seq (1 ), Seq (" x" ))) ),
249+ Seq ( (" a" , (Seq (1 ), Seq ())), (" b" , (Seq (), Seq (" x" ))), (" " , (Seq (1 ), Seq (" x" ))) ),
250+ Seq ( (" " , (Seq (1 ), Seq ())) ),
251251 Seq ( )
252252 )
253253 val operation = (s1 : DStream [String ], s2 : DStream [String ]) => {
254- s1.map(x => (x,1 )).cogroup(s2.map(x => (x, " x" )))
254+ s1.map(x => (x,1 )).cogroup(s2.map(x => (x, " x" ))).mapValues(x => (x._1.toSeq, x._2.toSeq))
255255 }
256256 testOperation(inputData1, inputData2, operation, outputData, true )
257257 }
0 commit comments