diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index dd3eed8affe3..70c7474a936d 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -27,7 +27,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program.
- * @param sc [[org.apache.spark.SparkContext]] to use for the program.
+ * @param sc org.apache.spark.SparkContext to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
* Key will be the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
@@ -38,10 +38,10 @@ object Bagel extends Logging {
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
- * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
+ * @param partitioner org.apache.spark.Partitioner partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
- * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
+ * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
* intermediate RDDs in each superstep. Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
* the Vertex, optional Aggregator and the current superstep,
@@ -131,7 +131,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
- * [[org.apache.spark.HashPartitioner]] and default storage level
+ * org.apache.spark.HashPartitioner and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
@@ -146,7 +146,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
- * default [[org.apache.spark.HashPartitioner]]
+ * default org.apache.spark.HashPartitioner
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
@@ -166,7 +166,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * default [[org.apache.spark.HashPartitioner]],
+ * default org.apache.spark.HashPartitioner,
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
@@ -180,7 +180,7 @@ object Bagel extends Logging {
/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * the default [[org.apache.spark.HashPartitioner]]
+ * the default org.apache.spark.HashPartitioner
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
diff --git a/core/pom.xml b/core/pom.xml
index ebc178a10541..a333bff28c24 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -225,7 +225,7 @@
true
-
+
@@ -238,7 +238,7 @@
-
+
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1f5334f3dbb4..da778aa851cd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -351,7 +351,7 @@ class SparkContext(
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param conf JobConf for setting up the dataset
- * @param inputFormatClass Class of the [[InputFormat]]
+ * @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 9d75d7c4ad69..006e2a333542 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String)
/**
* Create a log file for one job
* @param jobID ID of the job
- * @exception FileNotFoundException Fail to create log file
+ * @throws FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
try {
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
index bf71882ef770..c539d2f708f9 100644
--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.typesafe.config.Config
/**
- * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
+ * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
* This is necessary as Spark Executors are allowed to recover from fatal exceptions
- * (see [[org.apache.spark.executor.Executor]]).
+ * (see org.apache.spark.executor.Executor)
*/
object IndestructibleActorSystem {
def apply(name: String, config: Config): ActorSystem =
diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
index 5b0d2c36510b..f837dc7ccc86 100644
--- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala
+++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala
@@ -19,9 +19,9 @@ package org.apache.spark.util
/**
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
- * numerically robust way. Includes support for merging two StatCounters. Based on
- * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
- * Welford and Chan's algorithms for running variance]].
+ * numerically robust way. Includes support for merging two StatCounters. Based on Welford
+ * and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]]
+ * for running variance.
*
* @constructor Initialize the StatCounter with the given values.
*/
diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala
index d437c055f33d..dc4b8f253f25 100644
--- a/core/src/main/scala/org/apache/spark/util/Vector.scala
+++ b/core/src/main/scala/org/apache/spark/util/Vector.scala
@@ -136,7 +136,7 @@ object Vector {
/**
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
- * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
+ * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
*/
def random(length: Int, random: Random = new XORShiftRandom()) =
Vector(length, _ => random.nextDouble())
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 20232e9fbb8d..aa5079c15983 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -75,8 +75,9 @@ public int compare(Integer a, Integer b) {
else if (a < b) return 1;
else return 0;
}
- };
+ }
+ @SuppressWarnings("unchecked")
@Test
public void sparkContextUnion() {
// Union of non-specialized JavaRDDs
@@ -148,6 +149,7 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}
+ @SuppressWarnings("unchecked")
@Test
public void lookup() {
JavaPairRDD categories = sc.parallelizePairs(Arrays.asList(
@@ -179,6 +181,7 @@ public Boolean call(Integer x) {
Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
}
+ @SuppressWarnings("unchecked")
@Test
public void cogroup() {
JavaPairRDD categories = sc.parallelizePairs(Arrays.asList(
@@ -197,6 +200,7 @@ public void cogroup() {
cogrouped.collect();
}
+ @SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
JavaPairRDD rdd1 = sc.parallelizePairs(Arrays.asList(
@@ -243,6 +247,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(33, sum);
}
+ @SuppressWarnings("unchecked")
@Test
public void foldByKey() {
List> pairs = Arrays.asList(
@@ -265,6 +270,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
}
+ @SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List> pairs = Arrays.asList(
@@ -320,8 +326,8 @@ public void approximateResults() {
public void take() {
JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Assert.assertEquals(1, rdd.first().intValue());
- List firstTwo = rdd.take(2);
- List sample = rdd.takeSample(false, 2, 42);
+ rdd.take(2);
+ rdd.takeSample(false, 2, 42);
}
@Test
@@ -359,8 +365,8 @@ public Boolean call(Double x) {
Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);
- Double first = rdd.first();
- List take = rdd.take(5);
+ rdd.first();
+ rdd.take(5);
}
@Test
@@ -438,11 +444,11 @@ public Iterable call(String s) {
return lengths;
}
});
- Double x = doubles.first();
- Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
+ Assert.assertEquals(5.0, doubles.first(), 0.01);
Assert.assertEquals(11, pairs.count());
}
+ @SuppressWarnings("unchecked")
@Test
public void mapsFromPairsToPairs() {
List> pairs = Arrays.asList(
@@ -509,6 +515,7 @@ public void repartition() {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void persist() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
@@ -573,6 +580,7 @@ public void textFilesCompressed() throws IOException {
Assert.assertEquals(expected, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
@@ -602,6 +610,7 @@ public Tuple2 call(Tuple2 pair) {
Assert.assertEquals(pairs, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void writeWithNewAPIHadoopFile() {
File tempDir = Files.createTempDir();
@@ -632,6 +641,7 @@ public String call(Tuple2 x) {
}).collect().toString());
}
+ @SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
File tempDir = Files.createTempDir();
@@ -674,6 +684,7 @@ public void objectFilesOfInts() {
Assert.assertEquals(expected, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void objectFilesOfComplexTypes() {
File tempDir = Files.createTempDir();
@@ -690,6 +701,7 @@ public void objectFilesOfComplexTypes() {
Assert.assertEquals(pairs, readRDD.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void hadoopFile() {
File tempDir = Files.createTempDir();
@@ -719,6 +731,7 @@ public String call(Tuple2 x) {
}).collect().toString());
}
+ @SuppressWarnings("unchecked")
@Test
public void hadoopFileCompressed() {
File tempDir = Files.createTempDir();
@@ -824,7 +837,7 @@ public Float zero(Float initialValue) {
}
};
- final Accumulator floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
+ final Accumulator floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
rdd.foreach(new VoidFunction() {
public void call(Integer x) {
floatAccum.add((float) x);
@@ -876,6 +889,7 @@ public void checkpointAndRestore() {
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}
+ @SuppressWarnings("unchecked")
@Test
public void mapOnPairRDD() {
JavaRDD rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
@@ -900,6 +914,7 @@ public Tuple2 call(Tuple2 in) throws Excepti
}
+ @SuppressWarnings("unchecked")
@Test
public void collectPartitions() {
JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
@@ -968,7 +983,7 @@ public void countApproxDistinctByKey() {
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
- JavaRDD rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+ JavaRDD rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD pairRDD = rdd.map(new PairFunction() {
@Override
public Tuple2 call(Integer x) throws Exception {
@@ -976,6 +991,6 @@ public Tuple2 call(Integer x) throws Exception {
}
});
pairRDD.collect(); // Works fine
- Map map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
diff --git a/pom.xml b/pom.xml
index 4f1e8398d9b8..1d97dd3402b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -613,12 +613,13 @@
org.apache.maven.pluginsmaven-compiler-plugin
- 2.5.1
+ 3.1${java.version}${java.version}UTF-81024m
+ true
@@ -633,7 +634,7 @@
org.scalatestscalatest-maven-plugin
- 1.0-M2
+ 1.0-RC2${project.build.directory}/surefire-reports.
diff --git a/repl/pom.xml b/repl/pom.xml
index 73597f635b9e..4c5f9720c802 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -98,7 +98,7 @@
true
-
+
@@ -111,7 +111,7 @@
-
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 4dcd0e4c51ec..2c7ff87744d7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]]
+ * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
@@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* thepartitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
@@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
- * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ * org.apache.spark.rdd.PairRDDFunctions for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
- * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ * org.apache.spark.rdd.PairRDDFunctions for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def join[W](
other: JavaPairDStream[K, W],
@@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def leftOuterJoin[W](
other: JavaPairDStream[K, W],
@@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 2268160dccc1..b082bb058529 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
- * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
def transform[T](
dstreams: JList[JavaDStream[_]],
@@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using
* [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream().
* In the transform function, convert the JavaRDD corresponding to that JavaDStream to
- * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD().
+ * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD().
*/
def transform[K, V](
dstreams: JList[JavaDStream[_]],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f3c58aede092..247349694936 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying `groupByKey` on each RDD. The supplied
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
- * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control
+ * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
@@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
@@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
- * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
@@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
+ * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs.
*/
def cogroup[W: ClassTag](
other: DStream[(K, W)],
@@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
- * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
+ * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
def join[W: ClassTag](
other: DStream[(K, W)],
@@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def leftOuterJoin[W: ClassTag](
@@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
/**
* Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
- * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * `other` DStream. The supplied org.apache.spark.Partitioner is used to control
* the partitioning of each RDD.
*/
def rightOuterJoin[W: ClassTag](
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4fbbce9b8b90..54a0791d04ea 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -19,7 +19,6 @@
import scala.Tuple2;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
@@ -30,7 +29,6 @@
import com.google.common.io.Files;
import com.google.common.collect.Sets;
-import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -38,6 +36,7 @@
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -45,6 +44,8 @@
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
+
+ @SuppressWarnings("unchecked")
@Test
public void testCount() {
List> inputData = Arrays.asList(
@@ -64,6 +65,7 @@ public void testCount() {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMap() {
List> inputData = Arrays.asList(
@@ -87,6 +89,7 @@ public Integer call(String s) throws Exception {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindow() {
List> inputData = Arrays.asList(
@@ -108,6 +111,7 @@ public void testWindow() {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testWindowWithSlideDuration() {
List> inputData = Arrays.asList(
@@ -132,6 +136,7 @@ public void testWindowWithSlideDuration() {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFilter() {
List> inputData = Arrays.asList(
@@ -155,13 +160,16 @@ public Boolean call(String s) throws Exception {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionMorePartitions() {
List> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
- JavaDStream repartitioned = stream.repartition(4);
+ JavaDStream stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+ JavaDStreamLike,JavaRDD> repartitioned =
+ stream.repartition(4);
JavaTestUtils.attachTestOutputStream(repartitioned);
List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -172,13 +180,16 @@ public void testRepartitionMorePartitions() {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testRepartitionFewerPartitions() {
List> inputData = Arrays.asList(
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
- JavaDStream repartitioned = stream.repartition(2);
+ JavaDStream stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+ JavaDStreamLike,JavaRDD> repartitioned =
+ stream.repartition(2);
JavaTestUtils.attachTestOutputStream(repartitioned);
List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
Assert.assertEquals(2, result.size());
@@ -188,6 +199,7 @@ public void testRepartitionFewerPartitions() {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testGlom() {
List> inputData = Arrays.asList(
@@ -206,6 +218,7 @@ public void testGlom() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testMapPartitions() {
List> inputData = Arrays.asList(
@@ -217,16 +230,17 @@ public void testMapPartitions() {
Arrays.asList("YANKEESRED SOCKS"));
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream mapped = stream.mapPartitions(new FlatMapFunction, String>() {
- @Override
- public Iterable call(Iterator in) {
- String out = "";
- while (in.hasNext()) {
- out = out + in.next().toUpperCase();
- }
- return Lists.newArrayList(out);
- }
- });
+ JavaDStream mapped = stream.mapPartitions(
+ new FlatMapFunction, String>() {
+ @Override
+ public Iterable call(Iterator in) {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ }
+ });
JavaTestUtils.attachTestOutputStream(mapped);
List> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -247,6 +261,7 @@ public Integer call(Integer i1, Integer i2) throws Exception {
}
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduce() {
List> inputData = Arrays.asList(
@@ -267,6 +282,7 @@ public void testReduce() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByWindow() {
List> inputData = Arrays.asList(
@@ -289,6 +305,7 @@ public void testReduceByWindow() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testQueueStream() {
List> expected = Arrays.asList(
@@ -312,6 +329,7 @@ public void testQueueStream() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransform() {
List> inputData = Arrays.asList(
@@ -344,6 +362,7 @@ public Integer call(Integer i) throws Exception {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransform() {
// tests whether all variations of transform can be called from Java
@@ -423,6 +442,7 @@ public JavaRDD call(JavaRDD in) throws Exception {
}
+ @SuppressWarnings("unchecked")
@Test
public void testTransformWith() {
List>> stringStringKVStream1 = Arrays.asList(
@@ -492,6 +512,7 @@ public JavaPairRDD> call(
}
+ @SuppressWarnings("unchecked")
@Test
public void testVariousTransformWith() {
// tests whether all variations of transformWith can be called from Java
@@ -591,6 +612,7 @@ public JavaPairRDD call(JavaPairRDD rdd1, JavaP
);
}
+ @SuppressWarnings("unchecked")
@Test
public void testStreamingContextTransform(){
List> stream1input = Arrays.asList(
@@ -658,6 +680,7 @@ public Tuple2 call(Integer i) throws Exception {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMap() {
List> inputData = Arrays.asList(
@@ -683,6 +706,7 @@ public Iterable call(String x) {
assertOrderInvariantEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairFlatMap() {
List> inputData = Arrays.asList(
@@ -718,22 +742,24 @@ public void testPairFlatMap() {
new Tuple2(9, "s")));
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction() {
- @Override
- public Iterable> call(String in) throws Exception {
- List> out = Lists.newArrayList();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2(in.length(), letter));
- }
- return out;
- }
- });
+ JavaPairDStream flatMapped = stream.flatMap(
+ new PairFlatMapFunction() {
+ @Override
+ public Iterable> call(String in) throws Exception {
+ List> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2(in.length(), letter));
+ }
+ return out;
+ }
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUnion() {
List> inputData1 = Arrays.asList(
@@ -778,6 +804,7 @@ public static > void assertOrderInvariantEquals(
// PairDStream Functions
+ @SuppressWarnings("unchecked")
@Test
public void testPairFilter() {
List> inputData = Arrays.asList(
@@ -810,7 +837,8 @@ public Boolean call(Tuple2 in) throws Exception {
Assert.assertEquals(expected, result);
}
- List>> stringStringKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List>> stringStringKVStream = Arrays.asList(
Arrays.asList(new Tuple2("california", "dodgers"),
new Tuple2("california", "giants"),
new Tuple2("new york", "yankees"),
@@ -820,7 +848,8 @@ public Boolean call(Tuple2 in) throws Exception {
new Tuple2("new york", "rangers"),
new Tuple2("new york", "islanders")));
- List>> stringIntKVStream = Arrays.asList(
+ @SuppressWarnings("unchecked")
+ private List>> stringIntKVStream = Arrays.asList(
Arrays.asList(
new Tuple2("california", 1),
new Tuple2("california", 3),
@@ -832,6 +861,7 @@ public Boolean call(Tuple2 in) throws Exception {
new Tuple2("new york", 3),
new Tuple2("new york", 1)));
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap() { // Maps pair -> pair of different type
List>> inputData = stringIntKVStream;
@@ -864,6 +894,7 @@ public Tuple2 call(Tuple2 in) throws Exception
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMapPartitions() { // Maps pair -> pair of different type
List>> inputData = stringIntKVStream;
@@ -901,6 +932,7 @@ public Iterable> call(Iterator>
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairMap2() { // Maps pair -> single
List>> inputData = stringIntKVStream;
@@ -925,6 +957,7 @@ public Integer call(Tuple2 in) throws Exception {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
List>> inputData = Arrays.asList(
@@ -967,6 +1000,7 @@ public Iterable> call(Tuple2 in) throws
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairGroupByKey() {
List>> inputData = stringStringKVStream;
@@ -989,6 +1023,7 @@ public void testPairGroupByKey() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairReduceByKey() {
List>> inputData = stringIntKVStream;
@@ -1013,6 +1048,7 @@ public void testPairReduceByKey() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCombineByKey() {
List>> inputData = stringIntKVStream;
@@ -1043,6 +1079,7 @@ public Integer call(Integer i) throws Exception {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValue() {
List> inputData = Arrays.asList(
@@ -1068,6 +1105,7 @@ public void testCountByValue() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testGroupByKeyAndWindow() {
List>> inputData = stringIntKVStream;
@@ -1113,6 +1151,7 @@ private Tuple2> convert(Tuple2> t
return new Tuple2>(tuple._1(), new HashSet(tuple._2()));
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindow() {
List>> inputData = stringIntKVStream;
@@ -1136,6 +1175,7 @@ public void testReduceByKeyAndWindow() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testUpdateStateByKey() {
List>> inputData = stringIntKVStream;
@@ -1171,6 +1211,7 @@ public Optional call(List values, Optional state) {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testReduceByKeyAndWindowWithInverse() {
List>> inputData = stringIntKVStream;
@@ -1194,6 +1235,7 @@ public void testReduceByKeyAndWindowWithInverse() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCountByValueAndWindow() {
List> inputData = Arrays.asList(
@@ -1227,6 +1269,7 @@ public void testCountByValueAndWindow() {
Assert.assertEquals(expected, unorderedResult);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairTransform() {
List>> inputData = Arrays.asList(
@@ -1271,6 +1314,7 @@ public JavaPairRDD call(JavaPairRDD in) thro
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testPairToNormalRDDTransform() {
List>> inputData = Arrays.asList(
@@ -1312,6 +1356,8 @@ public Integer call(Tuple2 in) {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
+ @Test
public void testMapValues() {
List>> inputData = stringStringKVStream;
@@ -1342,6 +1388,7 @@ public String call(String s) throws Exception {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testFlatMapValues() {
List>> inputData = stringStringKVStream;
@@ -1386,6 +1433,7 @@ public Iterable call(String in) {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCoGroup() {
List>> stringStringKVStream1 = Arrays.asList(
@@ -1429,6 +1477,7 @@ public void testCoGroup() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testJoin() {
List>> stringStringKVStream1 = Arrays.asList(
@@ -1472,6 +1521,7 @@ public void testJoin() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testLeftOuterJoin() {
List>> stringStringKVStream1 = Arrays.asList(
@@ -1503,6 +1553,7 @@ public void testLeftOuterJoin() {
Assert.assertEquals(expected, result);
}
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List> inputData = Arrays.asList(
@@ -1541,7 +1592,8 @@ public Integer call(String s) throws Exception {
}
- /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ @SuppressWarnings("unchecked")
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
List> inputData = Arrays.asList(
@@ -1581,16 +1633,14 @@ public void testSocketTextStream() {
@Test
public void testSocketString() {
class Converter extends Function> {
- public Iterable call(InputStream in) {
+ public Iterable call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
List out = new ArrayList();
- try {
- while (true) {
- String line = reader.readLine();
- if (line == null) { break; }
- out.add(line);
- }
- } catch (IOException e) { }
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) { break; }
+ out.add(line);
+ }
return out;
}
}
diff --git a/yarn/pom.xml b/yarn/pom.xml
index e7eba36ba351..c0e133dd603b 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -133,7 +133,7 @@
true
-
+
@@ -146,7 +146,7 @@
-
+