2020
2121import java .io .IOException ;
2222import java .net .URI ;
23- import java .util .*;
23+ import java .util .List ;
24+ import java .util .ArrayList ;
25+ import java .util .Date ;
2426
2527import org .apache .hadoop .conf .Configuration ;
2628import org .apache .hadoop .conf .Configured ;
3133import org .apache .hadoop .io .WritableComparable ;
3234import org .apache .hadoop .mapred .ClusterStatus ;
3335import org .apache .hadoop .mapred .JobClient ;
34- import org .apache .hadoop .mapreduce .*;
36+ import org .apache .hadoop .mapreduce .Job ;
37+ import org .apache .hadoop .mapreduce .Mapper ;
38+ import org .apache .hadoop .mapreduce .Reducer ;
39+ import org .apache .hadoop .mapreduce .InputFormat ;
40+ import org .apache .hadoop .mapreduce .OutputFormat ;
3541import org .apache .hadoop .mapreduce .lib .input .FileInputFormat ;
3642import org .apache .hadoop .mapreduce .lib .input .SequenceFileInputFormat ;
3743import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
5460 * [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
5561 * <i>in-dir</i> <i>out-dir</i>
5662 */
57- public class Sort <K ,V > extends Configured implements Tool {
63+ public class Sort <K , V > extends Configured implements Tool {
5864 public static final String REDUCES_PER_HOST =
59- "mapreduce.sort.reducesperhost" ;
65+ "mapreduce.sort.reducesperhost" ;
6066 private Job job = null ;
6167
6268 static int printUsage () {
@@ -85,17 +91,17 @@ public int run(String[] args) throws Exception {
8591 int num_reduces = (int ) (cluster .getMaxReduceTasks () * 0.9 );
8692 String sort_reduces = conf .get (REDUCES_PER_HOST );
8793 if (sort_reduces != null ) {
88- num_reduces = cluster .getTaskTrackers () *
94+ num_reduces = cluster .getTaskTrackers () *
8995 Integer .parseInt (sort_reduces );
9096 }
9197 Class <? extends InputFormat > inputFormatClass =
92- SequenceFileInputFormat .class ;
98+ SequenceFileInputFormat .class ;
9399 Class <? extends OutputFormat > outputFormatClass =
94- SequenceFileOutputFormat .class ;
100+ SequenceFileOutputFormat .class ;
95101 Class <? extends WritableComparable > outputKeyClass = BytesWritable .class ;
96102 Class <? extends Writable > outputValueClass = BytesWritable .class ;
97103 List <String > otherArgs = new ArrayList <String >();
98- InputSampler .Sampler <K ,V > sampler = null ;
104+ InputSampler .Sampler <K , V > sampler = null ;
99105 for (int i =0 ; i < args .length ; ++i ) {
100106 try {
101107 if ("-r" .equals (args [i ])) {
@@ -116,9 +122,11 @@ public int run(String[] args) throws Exception {
116122 double pcnt = Double .parseDouble (args [++i ]);
117123 int numSamples = Integer .parseInt (args [++i ]);
118124 int maxSplits = Integer .parseInt (args [++i ]);
119- if (0 >= maxSplits ) maxSplits = Integer .MAX_VALUE ;
125+ if (0 >= maxSplits ) {
126+ maxSplits = Integer .MAX_VALUE ;
127+ }
120128 sampler =
121- new InputSampler .RandomSampler <K ,V >(pcnt , numSamples , maxSplits );
129+ new InputSampler .RandomSampler <K , V >(pcnt , numSamples , maxSplits );
122130 } else {
123131 otherArgs .add (args [i ]);
124132 }
@@ -164,7 +172,7 @@ public int run(String[] args) throws Exception {
164172 inputDir = inputDir .makeQualified (fs .getUri (), fs .getWorkingDirectory ());
165173 Path partitionFile = new Path (inputDir , "_sortPartitioning" );
166174 TotalOrderPartitioner .setPartitionFile (conf , partitionFile );
167- InputSampler .<K ,V >writePartitionFile (job , sampler );
175+ InputSampler .<K , V >writePartitionFile (job , sampler );
168176 URI partitionUri = new URI (partitionFile .toString () +
169177 "#" + "_sortPartitioning" );
170178 job .addCacheFile (partitionUri );
0 commit comments