@@ -150,6 +150,7 @@ def _testInputStream(self, test_inputs, numSlices=None):
150150 This implementation is inspired by QueStream implementation.
151151 Give list of RDD to generate DStream which contains the RDD.
152152 """
153+ < << << << HEAD
153154 test_rdds = list ()
154155 test_rdd_deserializers = list ()
155156 for test_input in test_inputs :
@@ -161,3 +162,38 @@ def _testInputStream(self, test_inputs, numSlices=None):
161162 jinput_stream = self ._jvm .PythonTestInputStream (self ._jssc , jtest_rdds ).asJavaDStream ()
162163
163164 return DStream (jinput_stream , self , test_rdd_deserializers [0 ])
165+ == == == =
166+ self ._jssc .checkpoint (directory )
167+
168+ def _testInputStream (self , test_inputs , numSlices = None ):
169+ """
170+ Generate multiple files to make "stream" in Scala side for test.
171+ Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
172+ """
173+ numSlices = numSlices or self ._sc .defaultParallelism
174+ # Calling the Java parallelize() method with an ArrayList is too slow,
175+ # because it sends O(n) Py4J commands. As an alternative, serialized
176+ # objects are written to a file and loaded through textFile().
177+
178+ tempFiles = list ()
179+ for test_input in test_inputs :
180+ tempFile = NamedTemporaryFile (delete = False , dir = self ._sc ._temp_dir )
181+
182+ # Make sure we distribute data evenly if it's smaller than self.batchSize
183+ if "__len__" not in dir (test_input ):
184+ c = list (test_input ) # Make it a list so we can compute its length
185+ batchSize = min (len (test_input ) // numSlices , self ._sc ._batchSize )
186+ if batchSize > 1 :
187+ serializer = BatchedSerializer (self ._sc ._unbatched_serializer ,
188+ batchSize )
189+ else :
190+ serializer = self ._sc ._unbatched_serializer
191+ serializer .dump_stream (test_input , tempFile )
192+ tempFiles .append (tempFile .name )
193+
194+ jtempFiles = ListConverter ().convert (tempFiles , SparkContext ._gateway ._gateway_client )
195+ jinput_stream = self ._jvm .PythonTestInputStream (self ._jssc ,
196+ jtempFiles ,
197+ numSlices ).asJavaDStream ()
198+ return DStream (jinput_stream , self , PickleSerializer ())
199+ >> >> >> > added basic operation test cases
0 commit comments