@@ -1275,6 +1275,43 @@ setMethod("aggregateRDD",
12751275 Reduce(combOp , partitionList , zeroValue )
12761276 })
12771277
1278+ # ' Pipes elements to a forked externel process.
1279+ # '
1280+ # ' The same as 'pipe()' in Spark.
1281+ # '
1282+ # ' @param rdd The RDD whose elements are piped to the forked externel process.
1283+ # ' @param command The command to fork an externel process.
1284+ # ' @param env A named list to set environment variables of the externel process.
1285+ # ' @return A new RDD created by piping all elements to a forked externel process.
1286+ # ' @rdname pipeRDD
1287+ # ' @export
1288+ # ' @examples
1289+ # '\dontrun{
1290+ # ' sc <- sparkR.init()
1291+ # ' rdd <- parallelize(sc, 1:10)
1292+ # ' collect(pipeRDD(rdd, "more")
1293+ # ' Output: c("1", "2", ..., "10")
1294+ # '}
1295+ setGeneric ("pipeRDD ", function(rdd, command, env = list()) {
1296+ standardGeneric(" pipeRDD" )
1297+ })
1298+
1299+ # ' @rdname pipeRDD
1300+ # ' @aliases pipeRDD,RDD,character-method
1301+ setMethod ("pipeRDD ",
1302+ signature(rdd = " RDD" , command = " character" ),
1303+ function (rdd , command , env = list ()) {
1304+ func <- function (part ) {
1305+ trim.trailing.func <- function (x ) {
1306+ sub(" [\r\n ]*$" , " " , toString(x ))
1307+ }
1308+ input <- unlist(lapply(part , trim.trailing.func ))
1309+ res <- system2(command , stdout = TRUE , input = input , env = env )
1310+ lapply(res , trim.trailing.func )
1311+ }
1312+ lapplyPartition(rdd , func )
1313+ })
1314+
12781315# TODO: Consider caching the name in the RDD's environment
12791316# ' Return an RDD's name.
12801317# '
0 commit comments