@@ -224,6 +224,60 @@ class FileSuite extends FunSuite with LocalSparkContext {
224224 assert(output.map(_.toString).collect().toList === List (" (1,a)" , " (2,aa)" , " (3,aaa)" ))
225225 }
226226
227+ test(" byte stream input" ) {
228+ sc = new SparkContext (" local" , " test" )
229+ val outputDir = new File (tempDir, " output" ).getAbsolutePath
230+ val outFile = new File (outputDir, " part-00000.bin" )
231+ val outFileName = outFile.toPath().toString()
232+
233+ // create file
234+ val testOutput = Array [Byte ](1 ,2 ,3 ,4 ,5 ,6 )
235+ val bbuf = java.nio.ByteBuffer .wrap(testOutput)
236+ // write data to file
237+ val file = new java.io.FileOutputStream (outFile)
238+ val channel = file.getChannel
239+ channel.write(bbuf)
240+ channel.close()
241+ file.close()
242+
243+ val inRdd = sc.binaryFiles(outFileName)
244+ val (infile : String , indata : Array [Byte ]) = inRdd.first
245+
246+ // Try reading the output back as an object file
247+ assert(infile === outFileName)
248+ assert(indata === testOutput)
249+ }
250+
251+ test(" fixed length byte stream input" ) {
252+ // a fixed length of 6 bytes
253+
254+ sc = new SparkContext (" local" , " test" )
255+
256+ val outputDir = new File (tempDir, " output" ).getAbsolutePath
257+ val outFile = new File (outputDir, " part-00000.bin" )
258+ val outFileName = outFile.toPath().toString()
259+
260+ // create file
261+ val testOutput = Array [Byte ](1 ,2 ,3 ,4 ,5 ,6 )
262+ val testOutputCopies = 10
263+ val bbuf = java.nio.ByteBuffer .wrap(testOutput)
264+ // write data to file
265+ val file = new java.io.FileOutputStream (outFile)
266+ val channel = file.getChannel
267+ for (i <- 1 to testOutputCopies) channel.write(bbuf)
268+ channel.close()
269+ file.close()
270+ sc.hadoopConfiguration.setInt(" recordLength" ,testOutput.length)
271+
272+ val inRdd = sc.binaryRecords(outFileName)
273+ // make sure there are enough elements
274+ assert(inRdd.count== testOutputCopies)
275+
276+ // now just compare the first one
277+ val indata : Array [Byte ] = inRdd.first
278+ assert(indata === testOutput)
279+ }
280+
227281 test(" file caching" ) {
228282 sc = new SparkContext (" local" , " test" )
229283 val out = new FileWriter (tempDir + " /input" )
0 commit comments