Skip to content

Commit e11a0da

Browse files
committed
Add more API functions
1 parent 6a030a9 commit e11a0da

1 file changed

Lines changed: 100 additions & 37 deletions

File tree

docs/programming-guide.md

Lines changed: 100 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ by a key.
674674
In Scala, these operations are automatically available on RDDs containing
675675
[Tuple2](http://www.scala-lang.org/api/{{site.SCALA_VERSION}}/index.html#scala.Tuple2) objects
676676
(the built-in tuples in the language, created by simply writing `(a, b)`), as long as you
677-
`import org.apache.spark.SparkContext._` in your program to enable Spark's implicit
677+
import `org.apache.spark.SparkContext._` in your program to enable Spark's implicit
678678
conversions. The key-value pair operations are available in the
679679
[PairRDDFunctions](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) class,
680680
which automatically wraps around an RDD of tuples if you import the conversions.
@@ -688,7 +688,7 @@ val pairs = lines.map(s => (s, 1))
688688
val counts = pairs.reduceByKey((a, b) => a + b)
689689
{% endhighlight %}
690690

691-
We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally
691+
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
692692
`counts.collect()` to bring them back to the driver program as an array of objects.
693693

694694
</div>
@@ -716,11 +716,11 @@ many times each line of text occurs in a file:
716716

717717
{% highlight scala %}
718718
JavaRDD<String> lines = sc.textFile("data.txt");
719-
JavaPairRDD<String, Integer> pairs = lines.map(s -> new Tuple2(s, 1));
719+
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
720720
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
721721
{% endhighlight %}
722722

723-
We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally
723+
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
724724
`counts.collect()` to bring them back to the driver program as an array of objects.
725725

726726

@@ -745,8 +745,8 @@ pairs = lines.map(lambda s: (s, 1))
745745
counts = pairs.reduceByKey(lambda a, b: a + b)
746746
{% endhighlight %}
747747

748-
We could also use `counts.sortByKey()`, for example, to sort the pairs by word, and finally
749-
`counts.collect()` to bring them back to the driver program as an array of objects.
748+
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
749+
`counts.collect()` to bring them back to the driver program as a list of objects.
750750

751751
</div>
752752

@@ -755,7 +755,15 @@ We could also use `counts.sortByKey()`, for example, to sort the pairs by word,
755755

756756
### Transformations
757757

758-
The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD) for details):
758+
The following table lists some of the common transformations supported by Spark. Refer to the
759+
RDD API doc
760+
([Scala](api/scala/index.html#org.apache.spark.rdd.RDD),
761+
[Java](api/java/org/apache/spark/api/java/JavaRDD.html),
762+
[Python](api/python/pyspark.rdd.RDD-class.html))
763+
and pair RDD functions doc
764+
([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),
765+
[Java](api/java/org/apache/spark/api/java/JavaPairRDD.html))
766+
for details.
759767

760768
<table class="table">
761769
<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
@@ -774,12 +782,12 @@ The following tables list the transformations and actions currently supported (s
774782
<tr>
775783
<td> <b>mapPartitions</b>(<i>func</i>) </td>
776784
<td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
777-
Iterator[T] => Iterator[U] when running on an RDD of type T. </td>
785+
Iterator&lt;T&gt; => Iterator&lt;U&gt; when running on an RDD of type T. </td>
778786
</tr>
779787
<tr>
780788
<td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td>
781789
<td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of
782-
the partition, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
790+
the partition, so <i>func</i> must be of type (Int, Iterator&lt;T&gt;) => Iterator&lt;U&gt; when running on an RDD of type T.
783791
</td>
784792
</tr>
785793
<tr>
@@ -790,18 +798,23 @@ The following tables list the transformations and actions currently supported (s
790798
<td> <b>union</b>(<i>otherDataset</i>) </td>
791799
<td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td>
792800
</tr>
801+
<tr>
802+
<td> <b>intersection</b>(<i>otherDataset</i>) </td>
803+
<td> Return a new RDD that contains the intersection of elements in the source dataset and the argument. </td>
804+
</tr>
793805
<tr>
794806
<td> <b>distinct</b>([<i>numTasks</i>])) </td>
795807
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
796808
</tr>
797809
<tr>
798810
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
799-
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br />
800-
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
801-
average) over each key, using <code>reduceByKey</code> or <code>combineByKey</code> will yield much better
802-
performance.
803-
<br />
804-
<b>Note:</b> By default, if the RDD already has a partitioner, the task number is decided by the partition number of the partitioner, or else relies on the value of <code>spark.default.parallelism</code> if the property is set , otherwise depends on the partition number of the RDD. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
811+
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
812+
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
813+
average) over each key, using <code>reduceByKey</code> or <code>combineByKey</code> will yield much better
814+
performance.
815+
<br />
816+
<b>Note:</b> By default, the level of parallelism in the output depends on the number of partitions of the parent RDD.
817+
You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
805818
</td>
806819
</tr>
807820
<tr>
@@ -814,22 +827,47 @@ The following tables list the transformations and actions currently supported (s
814827
</tr>
815828
<tr>
816829
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
817-
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. </td>
830+
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
831+
Outer joins are also supported through <code>leftOuterJoin</code> and <code>rightOuterJoin</code>.
832+
</td>
818833
</tr>
819834
<tr>
820835
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
821-
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called <code>groupWith</code>. </td>
836+
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable&lt;V&gt;, Iterable&lt;W&gt;) tuples. This operation is also called <code>groupWith</code>. </td>
822837
</tr>
823838
<tr>
824839
<td> <b>cartesian</b>(<i>otherDataset</i>) </td>
825840
<td> When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). </td>
826841
</tr>
842+
<tr>
843+
<td> <b>pipe</b>(<i>command</i>, <i>[envVars]</i>) </td>
844+
<td> Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the
845+
process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
846+
</tr>
847+
<tr>
848+
<td> <b>coalesce</b>(<i>numPartitions</i>) </td>
849+
<td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
850+
after filtering down a large dataset. </td>
851+
</tr>
852+
<tr>
853+
<td> <b>repartition</b>(<i>numPartitions</i>) </td>
854+
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
855+
This always shuffles all data over the network. </td>
856+
</tr>
827857
</table>
828858

829-
A complete list of transformations is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD).
830-
831859
### Actions
832860

861+
The following table lists some of the common transformations supported by Spark. Refer to the
862+
RDD API doc
863+
([Scala](api/scala/index.html#org.apache.spark.rdd.RDD),
864+
[Java](api/java/org/apache/spark/api/java/JavaRDD.html),
865+
[Python](api/python/pyspark.rdd.RDD-class.html))
866+
and pair RDD functions doc
867+
([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions),
868+
[Java](api/java/org/apache/spark/api/java/JavaPairRDD.html))
869+
for details.
870+
833871
<table class="table">
834872
<tr><th>Action</th><th>Meaning</th></tr>
835873
<tr>
@@ -856,49 +894,57 @@ A complete list of transformations is available in the [RDD API doc](api/scala/i
856894
<td> <b>takeSample</b>(<i>withReplacement</i>, <i>num</i>, <i>seed</i>) </td>
857895
<td> Return an array with a random sample of <i>num</i> elements of the dataset, with or without replacement, using the given random number generator seed. </td>
858896
</tr>
897+
<tr>
898+
<td> <b>takeOrdered</b>(<i>n</i>, <i>[ordering]</i>) </td>
899+
<td> Return the first <i>n</i> elements of the RDD using either their natural order or a custom comparator. </td>
900+
</tr>
859901
<tr>
860902
<td> <b>saveAsTextFile</b>(<i>path</i>) </td>
861903
<td> Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. </td>
862904
</tr>
863905
<tr>
864-
<td> <b>saveAsSequenceFile</b>(<i>path</i>) </td>
865-
<td> Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). </td>
906+
<td> <b>saveAsSequenceFile</b>(<i>path</i>) <br /> (Java and Scala) </td>
907+
<td> Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also
908+
available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). </td>
909+
</tr>
910+
<tr>
911+
<td> <b>saveAsObjectFile</b>(<i>path</i>) <br /> (Java and Scala) </td>
912+
<td> Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using
913+
<code>SparkContext.objectFile()</code>. </td>
866914
</tr>
867915
<tr>
868916
<td> <b>countByKey</b>() </td>
869-
<td> Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key. </td>
917+
<td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
870918
</tr>
871919
<tr>
872920
<td> <b>foreach</b>(<i>func</i>) </td>
873921
<td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems. </td>
874922
</tr>
875923
</table>
876924

877-
A complete list of actions is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD).
878-
879925
## RDD Persistence
880926

881927
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
882-
across operations. When you persist an RDD, each node stores any slices of it that it computes in
928+
across operations. When you persist an RDD, each node stores any partitions of it that it computes in
883929
memory and reuses them in other actions on that dataset (or datasets derived from it). This allows
884-
future actions to be much faster (often by more than 10x). Caching is a key tool for building
885-
iterative algorithms with Spark and for interactive use from the interpreter.
930+
future actions to be much faster (often by more than 10x). Caching is a key tool for
931+
iterative algorithms and fast interactive use.
886932

887933
You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time
888-
it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant --
934+
it is computed in an action, it will be kept in memory on the nodes. Spark's cache is fault-tolerant --
889935
if any partition of an RDD is lost, it will automatically be recomputed using the transformations
890936
that originally created it.
891937

892938
In addition, each persisted RDD can be stored using a different *storage level*, allowing you, for example,
893939
to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space),
894940
replicate it across nodes, or store it off-heap in [Tachyon](http://tachyon-project.org/).
895-
These levels are chosen by passing a
941+
These levels are set by passing a
896942
`StorageLevel` object ([Scala](api/scala/index.html#org.apache.spark.storage.StorageLevel),
897943
[Java](api/java/org/apache/spark/storage/StorageLevel.html),
898944
[Python](api/python/pyspark.storagelevel.StorageLevel-class.html))
899945
to `persist()`. The `cache()` method is a shorthand for using the default storage level,
900-
which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of
901-
available storage levels is:
946+
which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The full set of
947+
storage levels is:
902948

903949
<table class="table">
904950
<tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr>
@@ -942,9 +988,9 @@ available storage levels is:
942988
</tr>
943989
</table>
944990

945-
**Note:** In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level.
991+
**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level.*
946992

947-
Spark also automatically persists intermediate results in shuffle operatons (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` if they plan to re-use an RDD iteratively.
993+
Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.
948994

949995
### Which Storage Level to Choose?
950996

@@ -958,7 +1004,7 @@ efficiency. We recommend going through the following process to select one:
9581004
make the objects much more space-efficient, but still reasonably fast to access.
9591005

9601006
* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter
961-
a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from
1007+
a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from
9621008
disk.
9631009

9641010
* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve
@@ -972,6 +1018,12 @@ mode has several advantages:
9721018
* It significantly reduces garbage collection costs.
9731019
* Cached data is not lost if individual executors crash.
9741020

1021+
### Removing Data
1022+
1023+
Spark automatically monitors cache usage on each node and drops out old data partitions in a
1024+
least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for
1025+
it to fall out of the cache, use the `RDD.unpersist()` method.
1026+
9751027
# Shared Variables
9761028

9771029
Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a
@@ -1044,7 +1096,7 @@ MapReduce) or sums. Spark natively supports accumulators of numeric types, and p
10441096
can add support for new types.
10451097

10461098
An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
1047-
running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala / Python).
1099+
running on the cluster can then add to it using the `add` method or the `+=` operator (in Scala and Python).
10481100
However, they cannot read its value.
10491101
Only the driver program can read the accumulator's value, using its `value` method.
10501102

@@ -1200,10 +1252,21 @@ cluster mode. The cluster location will be found based on HADOOP_CONF_DIR.
12001252
# Where to Go from Here
12011253

12021254
You can see some [example Spark programs](http://spark.apache.org/examples.html) on the Spark website.
1203-
In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `bin/run-example` script included in Spark; for example:
1255+
In addition, Spark includes several samples in the `examples` directory
1256+
([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples),
1257+
[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples),
1258+
[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python)).
1259+
Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what was changed to make the program run on a cluster.
1260+
You can run Java and Scala examples by passing the class name to Spark's `bin/run-example` script; for instance:
12041261

12051262
./bin/run-example SparkPi
12061263

1207-
For help on optimizing your program, the [configuration](configuration.html) and
1264+
For Python examples, use `spark-submit` instead:
1265+
1266+
./bin/spark-submit examples/src/main/python/pi.py
1267+
1268+
For help on optimizing your programs, the [configuration](configuration.html) and
12081269
[tuning](tuning.html) guides provide information on best practices. They are especially important for
12091270
making sure that your data is stored in memory in an efficient format.
1271+
For help on deploying, the [cluster mode overview](cluster-overview.html) describes the components involved
1272+
in distributed operation and supported cluster managers.

0 commit comments

Comments
 (0)