Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private[spark] class PipedRDD[T: ClassTag](
}
}

private object PipedRDD {
private[spark] object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -585,3 +585,29 @@ case class CoGroup(
outputObjAttr: Attribute,
left: LogicalPlan,
right: LogicalPlan) extends BinaryNode with ObjectProducer

object PipeElements {
def apply[T : Encoder](
command: String,
child: LogicalPlan): LogicalPlan = {
val deserialized = CatalystSerde.deserialize[T](child)
implicit val encoder = Encoders.STRING
val piped = PipeElements(
implicitly[Encoder[T]].clsTag.runtimeClass,
implicitly[Encoder[T]].schema,
CatalystSerde.generateObjAttr[String],
command,
deserialized)
CatalystSerde.serialize[String](piped)
}
}

/**
* A relation produced by piping elements to a forked external process.
*/
case class PipeElements[T](
argumentClass: Class[_],
argumentSchema: StructType,
outputObjAttr: Attribute,
command: String,
child: LogicalPlan) extends ObjectConsumer with ObjectProducer
18 changes: 18 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2889,6 +2889,24 @@ class Dataset[T] private[sql](
flatMap(func)(encoder)
}

/**
* Return a new Dataset of string created by piping elements to a forked external process.
* The resulting Dataset is computed by executing the given process once per partition.
* All elements of each input partition are written to a process's stdin as lines of input
* separated by a newline. The resulting partition consists of the process's stdout output, with
* each line of stdout resulting in one element of the output partition. A process is invoked
* even for empty partitions.
*
* @param command command to run in forked process.
*
* @group typedrel
* @since 3.2.0
*/
def pipe(command: String): Dataset[String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An open question: should we expose other params in the API:

def pipe(
      command: Seq[String],
      env: Map[String, String] = Map(),
      printPipeContext: (String => Unit) => Unit = null,
      printRDDElement: (T, String => Unit) => Unit = null,
      separateWorkingDir: Boolean = false,
      bufferSize: Int = 8192,
      encoding: String = Codec.defaultCharsetCodec.name): RDD[String]

I believe the pipe(command: String) should be the most common API. But I'm not sure how many scenarios the other params are needed(seems the environment variables are useful).

Copy link
Member Author

@viirya viirya Jan 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, for now I think a simple command parameter is pretty common case and should be enough. If there is some needs for other parameters, we can add them later.

implicit val stringEncoder = Encoders.STRING
withTypedPlan[String](PipeElements[T](command, logicalPlan))
}

/**
* Applies a function `f` to all rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.python.MapInPandasExec(func, output, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.PipeElements(_, _, objAttr, command, child) =>
execution.PipeElementsExec(objAttr, command, planLater(child)) :: Nil
case logical.AppendColumns(f, _, _, in, out, child) =>
execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.existentials
import org.apache.spark.api.java.function.MapFunction
import org.apache.spark.api.r._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PipedRDD, RDD}
import org.apache.spark.sql.Row
import org.apache.spark.sql.api.r.SQLUtils._
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -624,3 +624,32 @@ case class CoGroupExec(
}
}
}

/**
* Piping elements to a forked external process.
* The output of its child must be a single-field row containing the input object.
*/
case class PipeElementsExec(
outputObjAttr: Attribute,
command: String,
child: SparkPlan)
extends ObjectConsumerExec with ObjectProducerExec {

override protected def doExecute(): RDD[InternalRow] = {
val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType)
val printRDDElement: (InternalRow, String => Unit) => Unit = (row, printFunc) => {
printFunc(getObject(row).toString)
}

child.execute()
.pipe(command = PipedRDD.tokenize(command), printRDDElement = printRDDElement)
.mapPartitionsInternal { iter =>
val outputObject = ObjectOperator.wrapObjectToRow(outputObjectType)
iter.map(ele => outputObject(ele))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ele -> e?

Copy link
Member Author

@viirya viirya Jan 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the variable name.

}
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning
}
24 changes: 23 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.Assertions._
import org.scalatest.exceptions.TestFailedException
import org.scalatest.prop.TableDrivenPropertyChecks._

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.{SparkException, TaskContext, TestUtils}
import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample}
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
Expand Down Expand Up @@ -2007,6 +2007,28 @@ class DatasetSuite extends QueryTest

checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil)
}

test("SPARK-34205: Pipe Dataset") {
assume(TestUtils.testCommandAvailable("cat"))

val nums = spark.range(4)
val piped = nums.pipe("cat").toDF

checkAnswer(piped, Row("0") :: Row("1") :: Row("2") :: Row("3") :: Nil)

val piped2 = nums.pipe("wc -l").toDF.collect()
assert(piped2.size == 2)
assert(piped2(0).getString(0).trim == "2")
assert(piped2(1).getString(0).trim == "2")
}

test("SPARK-34205: pipe Dataset with empty partition") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making this sure!

val data = Seq(123, 4567).toDF("num").repartition(8, $"num")
val piped = data.pipe("wc -l")
assert(piped.count == 8)
val lineCounts = piped.map(_.trim.toInt).collect().toSet
assert(Set(0, 1, 1) == lineCounts)
}
}

case class Bar(a: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,20 @@ class StreamSuite extends StreamTest {
}
}
}

test("SPARK-34205: Pipe Streaming Dataset") {
assume(TestUtils.testCommandAvailable("cat"))

val inputData = MemoryStream[Int]
val piped = inputData.toDS()
.pipe("cat").toDF

testStream(piped)(
AddData(inputData, 1, 2, 3),
CheckAnswer(Row("1"), Row("2"), Row("3")),
AddData(inputData, 4),
CheckAnswer(Row("1"), Row("2"), Row("3"), Row("4")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather check CheckNewAnswer(Row("4")) to ensure inputs in previous batch are not affected to the next batch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified. Thanks.

}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down