@@ -1496,45 +1496,45 @@ setMethod("zipRDD",
14961496 stop(" Can only zip RDDs which have the same number of partitions." )
14971497 }
14981498
1499- if (getSerializedMode(x ) != getSerializedMode(other ) ||
1499+ if (getSerializedMode(x ) != getSerializedMode(other ) ||
15001500 getSerializedMode(x ) == " byte" ) {
15011501 # Append the number of elements in each partition to that partition so that we can later
15021502 # check if corresponding partitions of both RDDs have the same number of elements.
15031503 #
1504- # Note that this appending also serves the purpose of reserialization, because even if
1504+ # Note that this appending also serves the purpose of reserialization, because even if
15051505 # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
15061506 # as a single byte array. For example, partitions of an RDD generated from partitionBy()
1507- # may be encoded as multiple byte arrays.
1507+ # may be encoded as multiple byte arrays.
15081508 appendLength <- function (part ) {
15091509 part [[length(part ) + 1 ]] <- length(part ) + 1
15101510 part
15111511 }
15121512 x <- lapplyPartition(x , appendLength )
15131513 other <- lapplyPartition(other , appendLength )
15141514 }
1515-
1515+
15161516 zippedJRDD <- callJMethod(getJRDD(x ), " zip" , getJRDD(other ))
15171517 # The zippedRDD's elements are of scala Tuple2 type. The serialized
15181518 # flag Here is used for the elements inside the tuples.
15191519 serializerMode <- getSerializedMode(x )
15201520 zippedRDD <- RDD(zippedJRDD , serializerMode )
1521-
1521+
15221522 partitionFunc <- function (split , part ) {
15231523 len <- length(part )
15241524 if (len > 0 ) {
15251525 if (serializerMode == " byte" ) {
15261526 lengthOfValues <- part [[len ]]
15271527 lengthOfKeys <- part [[len - lengthOfValues ]]
15281528 stopifnot(len == lengthOfKeys + lengthOfValues )
1529-
1529+
15301530 # check if corresponding partitions of both RDDs have the same number of elements.
15311531 if (lengthOfKeys != lengthOfValues ) {
15321532 stop(" Can only zip RDDs with same number of elements in each pair of corresponding partitions." )
15331533 }
1534-
1534+
15351535 if (lengthOfKeys > 1 ) {
15361536 keys <- part [1 : (lengthOfKeys - 1 )]
1537- values <- part [(lengthOfKeys + 1 ) : (len - 1 )]
1537+ values <- part [(lengthOfKeys + 1 ) : (len - 1 )]
15381538 } else {
15391539 keys <- list ()
15401540 values <- list ()
@@ -1557,7 +1557,7 @@ setMethod("zipRDD",
15571557 part
15581558 }
15591559 }
1560-
1560+
15611561 PipelinedRDD(zippedRDD , partitionFunc )
15621562 })
15631563
@@ -1585,17 +1585,16 @@ setMethod("subtract",
15851585 mapFunction <- function (e ) { list (e , NA ) }
15861586 rdd1 <- map(x , mapFunction )
15871587 rdd2 <- map(other , mapFunction )
1588-
15891588 keys(subtractByKey(rdd1 , rdd2 , numPartitions ))
15901589 })
15911590
15921591# ' Intersection of this RDD and another one.
15931592# '
15941593# ' Return the intersection of this RDD and another one.
1595- # ' The output will not contain any duplicate elements,
1594+ # ' The output will not contain any duplicate elements,
15961595# ' even if the input RDDs did. Performs a hash partition
15971596# ' across the cluster.
1598- # ' Note that this method performs a shuffle internally.
1597+ # ' Note that this method performs a shuffle internally.
15991598# '
16001599# ' @param x An RDD.
16011600# ' @param other An RDD.
@@ -1616,7 +1615,7 @@ setMethod("intersection",
16161615 function (x , other , numPartitions = SparkR :: numPartitions(x )) {
16171616 rdd1 <- map(x , function (v ) { list (v , NA ) })
16181617 rdd2 <- map(other , function (v ) { list (v , NA ) })
1619-
1618+
16201619 filterFunction <- function (elem ) {
16211620 iters <- elem [[2 ]]
16221621 all(as.vector(
0 commit comments