@@ -535,6 +535,15 @@ def test_oldhadoop(self):
535535 "org.apache.hadoop.io.MapWritable" ).collect ())
536536 self .assertEqual (result , dict_data )
537537
538+ conf = {
539+ "mapred.output.format.class" : "org.apache.hadoop.mapred.SequenceFileOutputFormat" ,
540+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable" ,
541+ "mapred.output.value.class" : "org.apache.hadoop.io.MapWritable" ,
542+ "mapred.output.dir" : basepath + "/olddataset/" }
543+ self .sc .parallelize (dict_data ).saveAsHadoopDataset (conf )
544+ old_dataset = sorted (self .sc .sequenceFile (basepath + "/olddataset/" ).collect ())
545+ self .assertEqual (old_dataset , dict_data )
546+
538547 def test_newhadoop (self ):
539548 basepath = self .tempdir .name
540549 # use custom ArrayWritable types and converters to handle arrays
@@ -555,6 +564,19 @@ def test_newhadoop(self):
555564 valueConverter = "org.apache.spark.api.python.WritableToDoubleArrayConverter" ).collect ())
556565 self .assertEqual (result , array_data )
557566
567+ conf = {"mapreduce.outputformat.class" :
568+ "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" ,
569+ "mapred.output.key.class" : "org.apache.hadoop.io.IntWritable" ,
570+ "mapred.output.value.class" : "org.apache.spark.api.python.DoubleArrayWritable" ,
571+ "mapred.output.dir" : basepath + "/newdataset/" }
572+ self .sc .parallelize (array_data ).saveAsNewAPIHadoopDataset (conf ,
573+ valueConverter = "org.apache.spark.api.python.DoubleArrayToWritableConverter" )
574+ new_dataset = sorted (self .sc .sequenceFile (
575+ basepath + "/newdataset/" ,
576+ valueClass = "org.apache.spark.api.python.DoubleArrayWritable" ,
577+ valueConverter = "org.apache.spark.api.python.WritableToDoubleArrayConverter" ).collect ())
578+ self .assertEqual (new_dataset , array_data )
579+
558580 def test_newolderror (self ):
559581 basepath = self .tempdir .name
560582 rdd = self .sc .parallelize (range (1 , 4 )).map (lambda x : (x , "a" * x ))
0 commit comments