diff --git a/bin/spark-class b/bin/spark-class index 2f0441bb3c1c2..d8b85c2fc9fb2 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -170,6 +170,9 @@ export CLASSPATH # the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM # to prepare the launch environment of this driver JVM. +export JAVA_OPTS+=" -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005" +echo $JAVA_OPTS + if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then # This is used only if the properties file actually contains these special configs # Export the environment variables needed by SparkSubmitDriverBootstrapper diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 4ac666c54fbcd..5bf78a82e8e13 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,32 +19,30 @@ package org.apache.spark.api.python import java.io._ import java.net._ -import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, UUID, Collections} - -import org.apache.spark.input.PortableDataStream - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.language.existentials +import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap} import com.google.common.base.Charsets.UTF_8 - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf} +import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat} import org.apache.spark._ -import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.language.existentials + private[spark] class PythonRDD( @transient parent: RDD[_], command: Array[Byte], envVars: JMap[String, String], pythonIncludes: JList[String], - preservePartitoning: Boolean, + preservePartitioning: Boolean, pythonExec: String, broadcastVars: JList[Broadcast[PythonBroadcast]], accumulator: Accumulator[JList[Array[Byte]]]) @@ -55,9 +53,10 @@ private[spark] class PythonRDD( override def getPartitions = firstParent.partitions - override val partitioner = if (preservePartitoning) firstParent.partitioner else None + override val partitioner = if (preservePartitioning) firstParent.partitioner else None override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { + val startTime = System.currentTimeMillis val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map( diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ab7410a1f7f99..335ee51aba8f8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -44,7 +44,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler, - SamplingUtils} +SamplingUtils} /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -1378,7 +1378,7 @@ abstract class RDD[T: ClassTag]( def toDebugString: String = { // Get a debug description of an rdd without its children def debugSelf (rdd: RDD[_]): Seq[String] = { - import Utils.bytesToString + import org.apache.spark.util.Utils.bytesToString val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else "" val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info => diff --git a/examples/src/main/python/graphx/simpleGraph.py b/examples/src/main/python/graphx/simpleGraph.py new file mode 100644 index 0000000000000..6b403db28f501 --- /dev/null +++ b/examples/src/main/python/graphx/simpleGraph.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Correlations using MLlib. +""" + +import sys + +from pyspark import SparkContext +from pyspark.graphx import GraphLoader +from pyspark.graphx import Vertex +from pyspark.graphx import Edge + +if __name__ == "__main__": + + """ + Usage: simpleGraph filename [partitions]" + """ + + sc = SparkContext(appName="PythonSimpleGraphExample") + graphFile = int(sys.argv[1]) if len(sys.argv) > 1 else "simplegraph.edges" + partitions = int(sys.argv[2]) if len(sys.argv) > 2 else 2 + + print "Running SimpleGraph example with filename=%s partitions=%d\n" % (graphFile, partitions) + + graph = GraphLoader.edgeListFile(sc, graphFile, partitions) + vertices = graph.vertices() + edges = graph.edges + + + + + diff --git a/graphx/pom.xml b/graphx/pom.xml index 72374aae6da9b..1c5ba0db5b617 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -50,6 +50,12 @@ scalacheck_${scala.binary.version} test + + junit + junit + 4.11 + test + target/scala-${scala.binary.version}/classes diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 7372dfbd9fe98..2c1c5022f3202 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -85,7 +85,7 @@ object PartitionStrategy { } /** - * Assigns edges to partitions using only the source vertex ID, colocating edges with the same + * Assigns edges to partitions using only the source vertex ID, collocating edges with the same * source. */ case object EdgePartition1D extends PartitionStrategy { @@ -98,7 +98,7 @@ object PartitionStrategy { /** * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a - * random vertex cut that colocates all same-direction edges between two vertices. + * random vertex cut that collocates all same-direction edges between two vertices. */ case object RandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaEdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaEdgeRDD.scala new file mode 100644 index 0000000000000..443aa7109bb85 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaEdgeRDD.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.graphx.api.java + +import java.lang.{Long => JLong} +import java.util.{List => JList} + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.graphx._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +/** + * EdgeRDD['ED'] is a column-oriented edge partition RDD created from RDD[Edge[ED]]. + * JavaEdgeRDD class provides a Java API to access implementations of the EdgeRDD class + * + * @param targetStorageLevel + * @tparam ED + */ +class JavaEdgeRDD[ED]( + val edges: RDD[Edge[ED]], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) + (implicit val classTag: ClassTag[ED]) + extends JavaEdgeRDDLike[ED, JavaEdgeRDD[ED], JavaRDD[(VertexId, VertexId, ED)]] { + +// /** +// * To create JavaEdgeRDD from JavaRDDs of tuples +// * (source vertex id, destination vertex id and edge property class). +// * The edge property class can be Array[Byte] +// * @param jEdges +// */ +// def this(jEdges: JavaRDD[(VertexId, VertexId, ED)]) = { +// this(jEdges.rdd.map(x => Edge[ED](x._1, x._2, x._3))) +// } + + /* Convert RDD[(PartitionID, EdgePartition[ED, VD])] to EdgeRDD[ED, VD] */ + override def edgeRDD = EdgeRDD.fromEdges(edges) + + /** + * Java Wrapper for RDD of Edges + * + * @param edgeRDD + * @return + */ + def wrapRDD(edgeRDD: RDD[Edge[ED]]): JavaRDD[Edge[ED]] = { + JavaRDD.fromRDD(edgeRDD) + } + + /** Persist RDDs of this JavaEdgeRDD with the default storage level (MEMORY_ONLY_SER) */ + def cache(): this.type = { + edges.cache() + this + } + + def collect(): JList[Edge[ED]] = { + import scala.collection.JavaConversions._ + val arr: java.util.Collection[Edge[ED]] = edges.collect().toSeq + new java.util.ArrayList(arr) + } + + /** + * Return a new single long element generated by counting all elements in the vertex RDD + */ + override def count(): JLong = edges.count() + + /** Return a new VertexRDD containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[Edge[ED], Boolean]): JavaEdgeRDD[ED] = + JavaEdgeRDD(edgeRDD.filter(x => f.call(x).booleanValue())) + + def id: JLong = edges.id.toLong + + /** Persist RDDs of this JavaEdgeRDD with the default storage level (MEMORY_ONLY_SER) */ + def persist(): this.type = { + edges.persist() + this + } + + /** Persist the RDDs of this EdgeRDD with the given storage level */ + def persist(storageLevel: StorageLevel): this.type = { + edges.persist(storageLevel) + this + } + + def unpersist(blocking: Boolean = true) : this.type = { + edgeRDD.unpersist(blocking) + this + } + + override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): JavaEdgeRDD[ED2] = { + JavaEdgeRDD(edgeRDD.mapValues(f)) + } + + override def reverse: JavaEdgeRDD[ED] = JavaEdgeRDD(edgeRDD.reverse) + + def innerJoin[ED2: ClassTag, ED3: ClassTag] + (other: EdgeRDD[ED2]) + (f: (VertexId, VertexId, ED, ED2) => ED3): JavaEdgeRDD[ED3] = { + JavaEdgeRDD(edgeRDD.innerJoin(other)(f)) + } + + def toRDD : RDD[Edge[ED]] = edges +} + +object JavaEdgeRDD { + + implicit def apply[ED: ClassTag](edges: JavaRDD[Edge[ED]]) : JavaEdgeRDD[ED] = { + JavaEdgeRDD(EdgeRDD.fromEdges(edges.rdd)) + } + + def toEdgeRDD[ED: ClassTag](edges: JavaEdgeRDD[ED]): RDD[Edge[ED]] = { + JavaEdgeRDD(edges.edgeRDD).toRDD + } + +// def apply[ED: ClassTag]( +// jEdges: JavaRDD[(VertexId, VertexId, ED)]): JavaEdgeRDD[ED] = { +// val edges : RDD[Edge[ED]] = jEdges.rdd.map(x => Edge(x._1, x._2, x._3)) +// new JavaEdgeRDD(edges) +// } +} + diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaEdgeRDDLike.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaEdgeRDDLike.scala new file mode 100644 index 0000000000000..1148e28e77187 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaEdgeRDDLike.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.graphx.api.java + +import java.lang.{Long => JLong} +import java.util.{List => JList} + +import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.graphx._ +import org.apache.spark.{Partition, TaskContext} + +import scala.reflect.ClassTag + +trait JavaEdgeRDDLike [ED, This <: JavaEdgeRDDLike[ED, This, R], +R <: JavaRDDLike[(VertexId, VertexId, ED), R]] + extends Serializable { + + def edgeRDD: EdgeRDD[ED] + + def setName() = edgeRDD.setName("JavaEdgeRDD") + + def count() : JLong = edgeRDD.count() + + def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { + edgeRDD.compute(part, context) + } + + def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): JavaEdgeRDD[ED2] + + def reverse: JavaEdgeRDD[ED] +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaGraph.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaGraph.scala new file mode 100644 index 0000000000000..ca07f7d10ae04 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaGraph.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.graphx.api.java + +import java.lang.{Double => JDouble, Long => JLong} + +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib.PageRank +import org.apache.spark.rdd.RDD + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +class JavaGraph[@specialized VD: ClassTag, @specialized ED: ClassTag] + (vertexRDD : VertexRDD[VD], edgeRDD: EdgeRDD[ED]) { + + def vertices: JavaVertexRDD[VD] = JavaVertexRDD(vertexRDD) + def edges: JavaEdgeRDD[ED] = JavaEdgeRDD(edgeRDD) + @transient lazy val graph : Graph[VD, ED] = Graph(vertexRDD, edgeRDD) + + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): JavaGraph[VD, ED] = { + val graph = Graph(vertexRDD, edgeRDD) + JavaGraph(graph.partitionBy(partitionStrategy, numPartitions)) + } + + /** The number of edges in the graph. */ + def numEdges: JLong = edges.count() + + /** The number of vertices in the graph. */ + def numVertices: JLong = vertices.count() + + def inDegrees: JavaVertexRDD[Int] = JavaVertexRDD[Int](graph.inDegrees) + + def outDegrees: JavaVertexRDD[Int] = JavaVertexRDD[Int](graph.outDegrees) + + def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2) : JavaGraph[VD2, ED] = { + JavaGraph(graph.mapVertices(map)) + } + + def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): JavaGraph[VD, ED2] = { + JavaGraph(graph.mapEdges(map)) + } + + def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): JavaGraph[VD, ED2] = { + JavaGraph(graph.mapTriplets(map)) + } + + def reverse : JavaGraph[VD, ED] = JavaGraph(graph.reverse) + + def subgraph( + epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : JavaGraph[VD, ED] = { + JavaGraph(graph.subgraph(epred, vpred)) + } + + def groupEdges(merge: (ED, ED) => ED): JavaGraph[VD, ED] = { + JavaGraph(graph.groupEdges(merge)) + } + + @deprecated("use aggregateMessages", "1.2.0") + def mapReduceTriplets[A: ClassTag]( + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], + reduceFunc: (A, A) => A, + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) : JavaVertexRDD[A] = { + JavaVertexRDD(graph.mapReduceTriplets(mapFunc, reduceFunc, activeSetOpt)) + } + + def aggregateMessages[A: ClassTag]( + sendMsg: EdgeContext[VD, ED, A] => Unit, + mergeMsg: (A, A) => A, + tripletFields: TripletFields = TripletFields.All) : JavaVertexRDD[A] = { + JavaVertexRDD(graph.aggregateMessages(sendMsg, mergeMsg, tripletFields)) + } + + def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) + (mapFunc: (VertexId, VD, Option[U]) => VD2) : JavaGraph[VD2, ED] = { + JavaGraph(graph.outerJoinVertices(other)(mapFunc)) + } + + def pagerank(tol: Double, resetProb: Double = 0.15) : JavaGraph[Double, Double] = + JavaGraph(PageRank.runUntilConvergence(graph, tol, resetProb)) +} + +object JavaGraph { + +// implicit def apply[VD: ClassTag, ED: ClassTag] +// (vertexRDD: RDD[(VertexId, VD)], edges: RDD[Edge[ED]]): JavaGraph[VD, ED] = { +// new JavaGraph[VD, ED](VertexRDD(vertexRDD), EdgeRDD.fromEdges(edges)) +// } + + implicit def apply[VD: ClassTag, ED: ClassTag] + (graph: Graph[VD, ED]): JavaGraph[VD, ED] = { + new JavaGraph[VD, ED](graph.vertices, EdgeRDD.fromEdges[ED, VD](graph.edges)) + } + + implicit def apply [VD: ClassTag, ED: ClassTag] + (vertices: JavaVertexRDD[VD], edges: JavaEdgeRDD[ED]): JavaGraph[VD, ED] = { + new JavaGraph(VertexRDD(vertices.toRDD), edges.edgeRDD) + } +} + diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaVertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaVertexRDD.scala new file mode 100644 index 0000000000000..0a5debdb175bd --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaVertexRDD.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.graphx.api.java + +import java.lang.{Long => JLong} +import java.util.{List => JList} + +import org.apache.spark.{TaskContext, Partition} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.graphx.{VertexId, VertexRDD} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +import scala.language.implicitConversions +import scala.reflect._ + +/** + * A Java-friendly interface to [[org.apache.spark.graphx.VertexRDD]], the vertex + * RDD abstraction in Spark GraphX that represents a vertex class in a graph. + * Vertices can be created from existing RDDs or it can be generated from transforming + * existing VertexRDDs using operations such as `mapValues`, `pagerank`, etc. + * For operations applicable to vertices in a graph in GraphX, please refer to + * [[org.apache.spark.graphx.VertexRDD]] + */ + +class JavaVertexRDD[VD]( + val vertices: RDD[(VertexId, VD)], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) + (implicit val classTag: ClassTag[VD]) + extends JavaVertexRDDLike[VD, JavaVertexRDD[VD], JavaRDD[(VertexId, VD)]] { + + override def vertexRDD = VertexRDD(vertices) + + override def wrapRDD(rdd: RDD[(VertexId, VD)]): JavaRDD[(VertexId, VD)] = { + JavaRDD.fromRDD(rdd) + } + + /** Convert [[org.apache.spark.api.java.JavaRDD]] to + * [[org.apache.spark.graphx.api.java.JavaVertexRDD]] instance */ + def asJavaVertexRDD = JavaRDD.fromRDD(this.vertexRDD) + + /** Persist RDDs of this VertexRDD with the default storage level (MEMORY_ONLY_SER) */ + def cache(): this.type = { + vertices.cache() + this + } + + def collect(): JList[(VertexId, VD)] = { + import scala.collection.JavaConversions._ + val arr: java.util.Collection[(VertexId, VD)] = vertices.collect().toSeq + new java.util.ArrayList(arr) + } + + /** Generate a VertexRDD for the given duration */ + override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = + vertexRDD.compute(part, context) + + /** + * Return a new single long element generated by counting all elements in the vertex RDD + */ + def count(): JLong = vertices.count() + + /** Return a new VertexRDD containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[(VertexId, VD), Boolean]): JavaVertexRDD[VD] = + JavaVertexRDD(vertexRDD.filter(x => f.call(x).booleanValue())) + + def id: JLong = vertices.id.toLong + + def innerJoin[U: ClassTag, VD2: ClassTag](other: JavaVertexRDD[U]): JavaVertexRDD[VD2] = { + def attribute_combiner(vid: VertexId, vd: VD, u: U): VD2 = { + (vd, u).asInstanceOf[VD2] + } + val t = vertexRDD.innerJoin(other.vertexRDD)(attribute_combiner) + JavaVertexRDD[VD2](t).asJavaVertexRDD + } + + /** Persist RDDs of this JavaVertexRDD with the default storage level (MEMORY_ONLY_SER) */ + def persist(): this.type = { + vertices.persist() + this + } + + /** Persist the RDDs of this VertexRDD with the given storage level */ + def persist(storageLevel: StorageLevel): this.type = { + vertices.persist(storageLevel) + this + } + + def toRDD : RDD[(VertexId, VD)] = vertices + +} + +object JavaVertexRDD { + + /** + * Convert a scala [[org.apache.spark.graphx.VertexRDD]] to a Java-friendly + * [[org.apache.spark.graphx.api.java.JavaVertexRDD]]. + */ + implicit def fromVertexRDD[VD: ClassTag](vertexRDD: VertexRDD[VD]): JavaVertexRDD[VD] = + new JavaVertexRDD[VD](vertexRDD) + + implicit def apply[VD: ClassTag](vertices: JavaRDD[(VertexId, VD)]): JavaVertexRDD[VD] = { + new JavaVertexRDD[VD](vertices) + } +} + + diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaVertexRDDLike.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaVertexRDDLike.scala new file mode 100644 index 0000000000000..29f61cb295cce --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/java/JavaVertexRDDLike.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.api.java + +import java.util.{List => JList} + +import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3} +import org.apache.spark.graphx._ +import org.apache.spark.graphx.impl.{EdgeRDDImpl, ShippableVertexPartition} +import org.apache.spark.rdd.RDD +import org.apache.spark.{Logging, Partition, TaskContext} + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +trait JavaVertexRDDLike[VD, This <: JavaVertexRDDLike[VD, This, R], + R <: JavaRDDLike[(VertexId, VD), R]] + extends Serializable with Logging { + + implicit val classTag: ClassTag[VD] + + // The type of the RDD is (VertexId, VD) + def vertexRDD: VertexRDD[VD] + + def wrapRDD(in: RDD[(VertexId, VD)]): R + + override def toString: String = vertexRDD.toDebugString + + /** + * Return an array of the first num values + * + * @param num + * @return + */ + def take(num: Int) : Array[(VertexId, VD)] = vertexRDD.take(num) + + def setName(name: String) = vertexRDD.setName(name) + + def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = { + vertexRDD.compute(part, context) + } + + /** + * To construct a new Java interface of VertexRDD that is indexed by only the visible vertices. + * The resulting vertex RDD will be based on a different index and can no longer be quickly + * joined with this RDD. + */ + def reindex() : JavaVertexRDD[VD] = JavaVertexRDD(vertexRDD.reindex()) + + /** + * Applies a function to each `VertexPartition` of this RDD and returns a new + * [[org.apache.spark.graphx.api.java.JavaVertexRDD]] + */ + def mapVertexPartitions[VD2: ClassTag]( + f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) : JavaVertexRDD[VD2] = { + JavaVertexRDD(vertexRDD.mapVertexPartitions(f)) + } + + def mapValues[VD2: ClassTag](f: VD => VD2): JavaVertexRDD[VD2] = { + JavaVertexRDD(vertexRDD.mapValues(f)) + } + + /** Hides vertices that are the same between `this` and `other`; for vertices that are different, + * keeps the values from `other`. + */ + def diff(other: VertexRDD[VD]): JavaVertexRDD[VD] = { + JavaVertexRDD(vertexRDD.diff(other)) + } + + /** Takes a [[org.apache.spark.graphx.api.java.JavaVertexRDD]] instead of a + * [[org.apache.spark.graphx.VertexRDD]] as argument. + */ + def diff(other: JavaVertexRDD[VD]): JavaVertexRDD[VD] = { + JavaVertexRDD(vertexRDD.diff(other.vertexRDD)) + } + + def leftZipJoin[VD2: ClassTag, VD3: ClassTag] + (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): JavaVertexRDD[VD3] = { + JavaVertexRDD(vertexRDD.leftZipJoin[VD2, VD3](other)(f)) + } + + def leftJoin[VD2: ClassTag, VD3: ClassTag] + (other: RDD[(VertexId, VD2)]) + (f: (VertexId, VD, Option[VD2]) => VD3) + : JavaVertexRDD[VD3] = { + JavaVertexRDD(vertexRDD.leftJoin(other)(f)) + } + + def innerZipJoin[U: ClassTag, VD2: ClassTag] + (other: VertexRDD[U]) + (f: (VertexId, VD, U) => VD2): JavaVertexRDD[VD2] = { + JavaVertexRDD(vertexRDD.innerZipJoin(other)(f)) + } + + def innerJoin[U: ClassTag, VD2: ClassTag] + (other: RDD[(VertexId, U)]) + (f: (VertexId, VD, U) => VD2): JavaVertexRDD[VD2] = { + JavaVertexRDD(vertexRDD.innerJoin(other)(f)) + } + + def aggregateUsingIndex[VD2: ClassTag] + (messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): JavaVertexRDD[VD2] = { + JavaVertexRDD(vertexRDD.aggregateUsingIndex(messages, reduceFunc)) + } + + def fromEdges[ED: ClassTag, VD: ClassTag] + (edges: EdgeRDDImpl[ED, VD], numPartitions: Int, defaultVal: VD): JavaVertexRDD[VD] = { + JavaVertexRDD(VertexRDD.fromEdges[VD] + (EdgeRDD.fromEdges[ED, VD](edges), numPartitions, defaultVal)) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonEdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonEdgeRDD.scala new file mode 100644 index 0000000000000..c353bdfdd8cca --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonEdgeRDD.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.api.python + +import java.io.{DataOutputStream, FileOutputStream} +import java.util.{List => JList, Map => JMap} + +import org.apache.spark.Accumulator +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.python.{PythonBroadcast, PythonRDD} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.graphx.Edge +import org.apache.spark.graphx.api.java.JavaEdgeRDD +import org.apache.spark.storage.StorageLevel + +private[graphx] class PythonEdgeRDD( + @transient parent: JavaRDD[_], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitioning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[PythonBroadcast]], + accumulator: Accumulator[JList[Array[Byte]]], + targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) + extends PythonRDD (parent, command, envVars, + pythonIncludes, preservePartitioning, + pythonExec, broadcastVars, accumulator) { + + def this(@transient parent: JavaEdgeRDD[_], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitioning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[PythonBroadcast]], + accumulator: Accumulator[JList[Array[Byte]]], + targetStorageLevel : StorageLevel) = { + this(parent.toRDD, command, envVars, pythonIncludes, + preservePartitioning, pythonExec, broadcastVars, accumulator, targetStorageLevel) + } + + val asJavaEdgeRDD = { + val jRDD = JavaRDD.fromRDD(this) + JavaEdgeRDD.apply(jRDD.asInstanceOf[JavaRDD[Edge[Array[Byte]]]]) + } + + def writeToFile[T](items: java.util.Iterator[T], filename: String) { + import scala.collection.JavaConverters._ + writeToFile(items.asScala, filename) + } + + def writeToFile[T](items: Iterator[T], filename: String) { + val file = new DataOutputStream(new FileOutputStream(filename)) + writeIteratorToStream(items, file) + file.close() + } + + /** A data stream is written to a given file so that the collect() method + * of class VertexRDD in Python can read it back in the client and + * display the contents of the VertexRDD as a list + */ + def writeIteratorToStream[T](items: Iterator[T], stream: DataOutputStream) = { + if (items.hasNext) { + val first = items.next() + val newIter = Seq(first).iterator ++ items + // Assuming the type of this RDD will always be Array[Byte] + newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes => + stream.writeInt(bytes.length) + stream.write(bytes) + } + } + } +} + diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonGraph.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonGraph.scala new file mode 100644 index 0000000000000..23a5ed5f93c8e --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonGraph.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.api.python + +import org.apache.spark.annotation.DeveloperApi + +@DeveloperApi +private[graphx] class PythonGraph ( + @transient val vertexRDD: PythonVertexRDD, + @transient val edgeRDD: PythonEdgeRDD) +// extends Graph[Array[Byte], Array[Byte]] with Serializable { + extends Serializable { + + val vertices = vertexRDD.asJavaVertexRDD + val edges = edgeRDD.asJavaEdgeRDD + val asJavaGraph = (vertices, edges) + +} + diff --git a/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonVertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonVertexRDD.scala new file mode 100644 index 0000000000000..dd0a42ad03100 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/api/python/PythonVertexRDD.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.api.python + +import java.io.{DataOutputStream, FileOutputStream} +import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} + +import org.apache.spark.Accumulator +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.api.python.{PythonBroadcast, PythonRDD} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.graphx.VertexId +import org.apache.spark.graphx.api.java.JavaVertexRDD +import org.apache.spark.storage.StorageLevel + +private[graphx] class PythonVertexRDD( + @transient parent: JavaRDD[_], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitioning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[PythonBroadcast]], + accumulator: Accumulator[JList[Array[Byte]]], + targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) + extends PythonRDD (parent, command, envVars, + pythonIncludes, preservePartitioning, + pythonExec, broadcastVars, accumulator) { + + def this(@transient parent: JavaVertexRDD[_], + command: Array[Byte], + envVars: JMap[String, String], + pythonIncludes: JList[String], + preservePartitioning: Boolean, + pythonExec: String, + broadcastVars: JList[Broadcast[PythonBroadcast]], + accumulator: Accumulator[JList[Array[Byte]]], + targetStorageLevel : StorageLevel) = { + this(parent.toRDD, command, envVars, pythonIncludes, + preservePartitioning, pythonExec, broadcastVars, accumulator, targetStorageLevel) + } + + val asJavaVertexRDD = { + JavaVertexRDD(JavaRDD.fromRDD(this).asInstanceOf[JavaRDD[(VertexId, Array[Byte])]]) + } + + + def writeToFile[T](items: java.util.Iterator[T], filename: String) { + import scala.collection.JavaConverters._ + writeToFile(items.asScala, filename) + } + + def writeToFile[T](items: Iterator[T], filename: String) { + val file = new DataOutputStream(new FileOutputStream(filename)) + writeIteratorToStream(items, file) + file.close() + } + + /** A data stream is written to a given file so that the collect() method + * of class VertexRDD in Python can read it back in the client and + * display the contents of the VertexRDD as a list + */ + def writeIteratorToStream[T](items: Iterator[T], stream: DataOutputStream) = { + if (items.hasNext) { + val first = items.next() + val newIter = Seq(first).iterator ++ items + // Assuming the type of this RDD will always be Array[Byte] + newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes => + stream.writeInt(bytes.length) + stream.write(bytes) + } + } + } + + def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): + JavaRDD[Array[Byte]] = { + readRDDFromFile(sc, filename, parallelism) + } +} + +object PythonVertexRDD { + val DEFAULT_SPARK_BUFFER_SIZE = 65536 +} + diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala index aa320088f2088..a559ca35a4602 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -50,7 +50,7 @@ object ShippableVertexPartition { /** * Construct a `ShippableVertexPartition` from the given vertices with the specified routing * table, filling in missing vertices mentioned in the routing table using `defaultVal`, - * and merging duplicate vertex atrribute with mergeFunc. + * and merging duplicate vertex attribute with mergeFunc. */ def apply[VD: ClassTag]( iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD, diff --git a/graphx/src/test/java/org/apache/spark/graphx/JavaAPISuite.java b/graphx/src/test/java/org/apache/spark/graphx/JavaAPISuite.java new file mode 100644 index 0000000000000..36d528ff278b2 --- /dev/null +++ b/graphx/src/test/java/org/apache/spark/graphx/JavaAPISuite.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.graphx; + + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.graphx.api.java.JavaEdgeRDD; +import org.apache.spark.graphx.api.java.JavaVertexRDD; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import scala.Tuple2; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class JavaAPISuite implements Serializable { + + private transient JavaSparkContext ssc; + private List>> myList; + private ClassTag> classTag; + + + @Before + public void initialize() { + this.ssc = new JavaSparkContext("local", "GraphX JavaAPISuite"); + + this.myList = new ArrayList>>(); + this.myList.add(new Tuple2(1L, new VertexProperty("001", "kushal"))); + this.myList.add(new Tuple2(2L, new VertexProperty("002", "xia"))); + this.myList.add(new Tuple2(3L, new VertexProperty("003", "briton"))); + + this.classTag = ClassTag$.MODULE$.apply(VertexProperty.class); + } + + @After + public void finalize() { + ssc.stop(); + ssc = null; + } + + private class VertexProperty implements Serializable { + T1 field1; + T2 field2; + + VertexProperty(T1 field1, T2 field2) { + this.field1 = field1; + this.field2 = field2; + } + + T1 getField1() { return field1; } + T2 getField2() { return field2; } + void setField1(T1 value) { this.field1 = value; } + void setField2(T2 value) { this.field2 = value; } + } + + @Test + public void testVertexRDDCount() { + + JavaRDD>> + javaRDD = ssc.parallelize(this.myList); + + JavaVertexRDD> javaVertexRDD = + JavaVertexRDD.apply(javaRDD, this.classTag); + + assertEquals(3L, javaVertexRDD.count().intValue()); + } + + @Test + public void testEdgeRDDCount() { + + List> edgeList = new ArrayList>(); + edgeList.add(new Edge(0, 1, "abcd")); + edgeList.add(new Edge(1, 2, "defg")); + edgeList.add(new Edge(2, 3, "hijk")); + edgeList.add(new Edge(1, 3, "lmno")); + + JavaRDD> javaRDD = ssc.parallelize(edgeList); + + ClassTag classTag = ClassTag$.MODULE$.apply(String.class); + + JavaEdgeRDD javaEdgeRDD = + JavaEdgeRDD.apply(javaRDD, classTag); + + assertEquals(javaEdgeRDD.count().longValue(), 4L); + } +} diff --git a/graphx/src/test/java/org/apache/spark/graphx/JavaTestUtils.scala b/graphx/src/test/java/org/apache/spark/graphx/JavaTestUtils.scala new file mode 100644 index 0000000000000..88dea1c41a548 --- /dev/null +++ b/graphx/src/test/java/org/apache/spark/graphx/JavaTestUtils.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.graphx + +import org.apache.spark.api.java.JavaSparkContext +import java.util.{List => JList} + + +import org.apache.spark.graphx.api.java.JavaVertexRDD + +import scala.reflect.ClassTag + +object JavaTestUtils { + + def attachVertexRDD[VD]( + ssc: JavaSparkContext, + data: JList[Tuple2[Long, VD]], + numPartitions: Int) = { + + implicit val cm: ClassTag[VD] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[VD]] + + val vertices = ssc.parallelize(data) + new JavaVertexRDD(vertices) + } + +} diff --git a/pom.xml b/pom.xml index 05cb3797fc55b..7c29aecb0d20d 100644 --- a/pom.xml +++ b/pom.xml @@ -86,9 +86,9 @@ + graphx core bagel - graphx mllib tools network/common diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 9556e4718e585..07c88475b109e 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -46,10 +46,12 @@ from pyspark.broadcast import Broadcast from pyspark.serializers import MarshalSerializer, PickleSerializer -# for back compatibility -from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row +from pyspark.graphx.vertex import VertexRDD +from pyspark.graphx.edge import EdgeRDD, Edge +from pyspark.graphx.graph import Graph __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer", -] + "VertexRDD", "EdgeRDD", "Edge", "Graph"] + diff --git a/python/pyspark/graphx/__init__.py b/python/pyspark/graphx/__init__.py new file mode 100644 index 0000000000000..b196a4b9777a5 --- /dev/null +++ b/python/pyspark/graphx/__init__.py @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python bindings for GraphX. +""" + +from pyspark.graphx.vertex import VertexRDD, VertexId +from pyspark.graphx.edge import Edge, EdgeRDD +from pyspark.graphx.graph import Graph + +__all__ = ["PartitioningStrategy", "VertexRDD", "EdgeRDD", "Graph", "Edge"] diff --git a/python/pyspark/graphx/edge.py b/python/pyspark/graphx/edge.py new file mode 100644 index 0000000000000..7ea5b4c619a37 --- /dev/null +++ b/python/pyspark/graphx/edge.py @@ -0,0 +1,319 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python bindings for EdgeRDD in GraphX +""" + +import os +import itertools +from tempfile import NamedTemporaryFile +# from build.py4j.java_collections import MapConverter, ListConverter +from py4j.java_collections import ListConverter, MapConverter +from pyspark.accumulators import PStatsParam +from pyspark import RDD, StorageLevel, SparkContext +from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer, \ + NoOpSerializer +from pyspark.traceback_utils import SCCallSiteSync + +__all__ = ["EdgeRDD", "Edge"] + + +class Edge(object): + """ + Edge object contains a source vertex id, target vertex id and edge properties + """ + + def __init__(self, src_id, tgt_id, edge_property): + self._src_id = src_id + self._tgt_id = tgt_id + self._property = edge_property + + @property + def srcId(self): + return self._src_id + + @property + def tgtId(self): + return self._tgt_id + + def asTuple(self): + return (self._src_id, self._tgt_id, self._property) + + def __str__(self): + return self._src_id + self._tgt_id + self._property + + +class EdgeRDD(object): + """ + EdgeRDD class defines the edge actions and transformations. The complete list of + transformations and actions is available at + `http://spark.apache.org/docs/latest/graphx-programming-guide.html` + These operations are mapped to Scala functions defined + in `org.apache.spark.graphx.impl.EdgeRDDImpl` + """ + + def __init__(self, jrdd, jrdd_deserializer = BatchedSerializer(PickleSerializer())): + """ + Constructor + :param jrdd: A JavaRDD reference passed from the parent + RDD object + :param jrdd_deserializer: The deserializer used in Python workers + created from PythonRDD to execute a + serialized Python function and RDD + + """ + + self.name = "EdgeRDD" + self.jrdd = jrdd + self.is_cached = False + self.is_checkpointed = False + self.ctx = SparkContext._active_spark_context + self.jedge_rdd_deserializer = jrdd_deserializer + self.id = jrdd.id() + self.partitionFunc = None + self.bypass_serializer = False + self.preserve_partitioning = False + + self.jedge_rdd = self.getJavaEdgeRDD(jrdd, jrdd_deserializer) + + def __repr__(self): + return self.jedge_rdd.toString() + + def cache(self): + """ + Persist this vertex RDD with the default storage level (C{MEMORY_ONLY_SER}). + """ + self.is_cached = True + self.persist(StorageLevel.MEMORY_ONLY_SER) + return self + + def checkpoint(self): + self.is_checkpointed = True + self.jedge_rdd.checkpoint() + + def count(self): + return self.jedge_rdd.count() + + def isCheckpointed(self): + """ + Return whether this RDD has been checkpointed or not + """ + return self.is_checkpointed + + def mapValues(self, f, preserves_partitioning=False): + """ + Return a new vertex RDD by applying a function to each vertex attributes, + preserving the index + + >>> rdd = sc.parallelize([(1, "b"), (2, "a"), (3, "c")]) + >>> edges = EdgeRDD(rdd) + >>> sorted(edges.mapValues(lambda x: (x + ":" + x)).collect()) + [(1, 'a:a'), (2, 'b:b'), (3, 'c:c')] + """ + def func(_, iterator): + return itertools.imap(lambda (k, v): (k, f(v)), iterator) + return PipelinedEdgeRDD(self, func, preserves_partitioning) + + def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): + self.is_cached = True + java_storage_level = self.ctx._getJavaStorageLevel(storageLevel) + self.jedge_rdd.persist(java_storage_level) + return self + + # TODO: This is a hack. take() must call JavaVertexRDD.take() + def take(self, num=10): + return self.jrdd.take(num) + + def unpersist(self, blocking = False): + self.is_cached = False + self.jedge_rdd.unpersist(blocking) + return self + + def mapEdgePartitions(self, f, preserve_partitioning=False): + def func(s, iterator): + return f(iterator) + return PipelinedEdgeRDD(self, func, preserve_partitioning) + + # TODO: The best way to do an innerJoin on vertex RDDs is to use the optimized inner + # TODO: technique defined in VertexRDDImpl. This solution does not scale + def innerJoin(self, other): + return self.jrdd.join(other.jrdd) + + def leftJoin(self, other, numPartitions=None): + return self.jrdd.leftOuterJoin(other.jrdd, numPartitions) + + def collect(self): + """ + Return a list that contains all of the elements in this RDD. + """ + with SCCallSiteSync(self.ctx) as css: + bytesInJava = self.jedge_rdd.collect().iterator() + return list(self._collect_iterator_through_file(bytesInJava)) + + def _collect_iterator_through_file(self, iterator): + # Transferring lots of data through Py4J can be slow because + # socket.readline() is inefficient. Instead, we'll dump the data to a + # file and read it back. + tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) + tempFile.close() + self.ctx._writeToFile(iterator, tempFile.name) + # Read the data into Python and deserialize it: + with open(tempFile.name, 'rb') as tempFile: + for item in self.jedge_rdd_deserializer.load_stream(tempFile): + yield item + os.unlink(tempFile.name) + + def getJavaEdgeRDD(self, rdd, rdd_deserializer): + if self.bypass_serializer: + self.jedge_rdd_deserializer = NoOpSerializer() + rdd_deserializer = NoOpSerializer() + enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" + profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None + def f(index, iterator): + return iterator + command = (f, profileStats, rdd_deserializer, + rdd_deserializer) + # the serialized command will be compressed by broadcast + ser = CloudPickleSerializer() + pickled_command = ser.dumps(command) + if len(pickled_command) > (1 << 20): # 1M + self.broadcast = self.ctx.broadcast(pickled_command) + pickled_command = ser.dumps(self.broadcast) + + # the serialized command will be compressed by broadcast + broadcast_vars = ListConverter().convert( + [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], + self.ctx._gateway._gateway_client) + self.ctx._pickled_broadcast_vars.clear() + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) + includes = ListConverter().convert(self.ctx._python_includes, + self.ctx._gateway._gateway_client) + java_storage_level = self.ctx._getJavaStorageLevel(StorageLevel.MEMORY_ONLY) + prdd = self.ctx._jvm.PythonEdgeRDD(rdd._jrdd, + bytearray(pickled_command), + env, includes, self.preserve_partitioning, + self.ctx.pythonExec, + broadcast_vars, self.ctx._javaAccumulator, + java_storage_level) + self.jedge_rdd = prdd.asJavaEdgeRDD() + if enable_profile: + self.id = self.jedge_rdd.id() + self.ctx._add_profile(self.id, profileStats) + return self.jedge_rdd + + +class PipelinedEdgeRDD(EdgeRDD): + + """ + Pipelined mapValues in EdgeRDD: + + >>> rdd = sc.parallelize([(1, ("Alice", 29)), (2, ("Bob", 30)), \ + (3, ("Charlie", 31)), (4, ("Dwayne", 32))]) + >>> vertices = VertexRDD(rdd) + >>> vertices.mapValues(lambda x: x[1] * 2).cache().collect() + [(1, ("Alice", 58)), (2, ("Bob", 60)), \ + (3, ("Charlie", 62)), (4, ("Dwayne", 64))] + + Pipelined reduces in EdgeRDD: + >>> from operator import add + >>> rdd.map(lambda x: 2 * x).reduce(add) + 20 + >>> rdd.flatMap(lambda x: [x, x]).reduce(add) + 20 + """ + + def __init__(self, prev, func, preservesPartitioning=False): + if not isinstance(prev, PipelinedEdgeRDD) or not prev.is_pipelinable(): + # This transformation is the first in its stage: + self.func = func + self.preservesPartitioning = preservesPartitioning + self.prev_jedge_rdd = prev.jedge_rdd + self.prev_jedge_rdd_deserializer = prev.jedge_rdd_deserializer + else: + prev_func = prev.func + + def pipeline_func(split, iterator): + return func(split, prev_func(split, iterator)) + self.func = pipeline_func + self.preservesPartitioning = \ + prev.preservesPartitioning and preservesPartitioning + self.prev_jedge_rdd = prev.jedge_rdd + self.prev_jedge_rdd_deserializer = prev.prev_jedge_rdd_deserializer + + self.is_cached = False + self.is_checkpointed = False + self.ctx = prev.ctx + self.prev = prev + self.jerdd_val = None + self.id = None + self.jedge_rdd_deserializer = self.ctx.serializer + self.bypass_serializer = False + self.partitionFunc = prev._partitionFunc if self.preservesPartitioning else None + self.broadcast = None + + def __del__(self): + if self.broadcast: + self.broadcast.unpersist() + self.broadcast = None + + @property + def jedge_rdd(self): + if self.jerdd_val: + return self.jerdd_val + if self.bypass_serializer: + self.jedge_rdd_deserializer = NoOpSerializer() + enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" + profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None + command = (self.func, profileStats, self.prev_jedge_rdd_deserializer, + self.jedge_rdd_deserializer) + # the serialized command will be compressed by broadcast + ser = CloudPickleSerializer() + pickled_command = ser.dumps(command) + if len(pickled_command) > (1 << 20): # 1M + self.broadcast = self.ctx.broadcast(pickled_command) + pickled_command = ser.dumps(self.broadcast) + broadcast_vars = ListConverter().convert( + [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], + self.ctx._gateway._gateway_client) + self.ctx._pickled_broadcast_vars.clear() + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) + includes = ListConverter().convert(self.ctx._python_includes, + self.ctx._gateway._gateway_client) + java_storage_level = self.ctx._getJavaStorageLevel(StorageLevel.MEMORY_ONLY) + python_rdd = self.ctx._jvm.PythonEdgeRDD(self.prev_jedge_rdd, + bytearray(pickled_command), + env, includes, self.preservesPartitioning, + self.ctx.pythonExec, + broadcast_vars, self.ctx._javaAccumulator, + java_storage_level) + self.jerdd_val = python_rdd.asJavaEdgeRDD() + + if enable_profile: + self.id = self.jerdd_val.id() + self.ctx._add_profile(self.id, profileStats) + return self.jerdd_val + + def id(self): + if self.id is None: + self.id = self.jedge_rdd.id() + return self.id + + def is_pipelinable(self): + return not (self.is_cached or self.is_checkpointed) diff --git a/python/pyspark/graphx/graph.py b/python/pyspark/graphx/graph.py new file mode 100644 index 0000000000000..3c65c62e7316c --- /dev/null +++ b/python/pyspark/graphx/graph.py @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python bindings for Graph[VertexRDD, EdgeRDD] in GraphX +""" + +import itertools +from pyspark import PickleSerializer, RDD, StorageLevel, SparkContext +from pyspark.graphx import VertexRDD, EdgeRDD + +from pyspark.graphx.partitionstrategy import PartitionStrategy +from pyspark.rdd import PipelinedRDD +from pyspark.serializers import BatchedSerializer + +__all__ = ["Graph"] + +class Graph(object): + def __init__(self, vertex_jrdd, edge_jrdd, + partition_strategy=PartitionStrategy.EdgePartition1D): + self._vertex_jrdd = VertexRDD(vertex_jrdd, vertex_jrdd.context, + BatchedSerializer(PickleSerializer())) + self._edge_jrdd = EdgeRDD(edge_jrdd, edge_jrdd.context, + BatchedSerializer(PickleSerializer())) + self._partition_strategy = partition_strategy + self._jsc = vertex_jrdd.context + + def persist(self, storageLevel): + self._vertex_jrdd.persist(storageLevel) + self._edge_jrdd.persist(storageLevel) + return + + def cache(self): + self._vertex_jrdd.cache() + self._edge_jrdd.cache() + return + + def vertices(self): + return self._vertex_jrdd + + def edges(self): + return self._edge_jrdd + + def numEdges(self): + return self._edge_jrdd.count() + + def numVertices(self): + return self._vertex_jrdd.count() + + # TODO + def partitionBy(self, partitionStrategy): + return + + # TODO + def inDegrees(self): + return + + # TODO + def outDegrees(self): + return + + # TODO + def degrees(self): + return + + def triplets(self): + if (isinstance(self._jsc, SparkContext)): + pyGraph = self._jsc.jvm.org.apache.spark.PythonGraph() + return pyGraph.triplets() + + # TODO + def unpersistVertices(self, blocking = True): + return + + def mapVertices(self, f): + return self._vertex_jrdd.mapValues(f) + + def mapEdges(self, f): + return self._vertex_jrdd.mapValues(f) + + # TODO + def mapTriplets(self, f): + return + + # TODO + def reverse(self): + return + + # TODO + def subgraph(self, epred, pred): + return + + # TODO + def groupEdges(self, mergeFunc): + return + + # TODO + def joinVertices(self, mapFunc): + return + + # TODO + def outerJoinVertices(self, mapFunc): + return + + # TODO + def collectNeighborIds(self, edgeDirection): + return + + # TODO + def collectNeighbors(self, edgeDirection): + return + + # TODO + def mapReduceTriplets(self, mapFunc, reduceFunc): + return + + def pagerank(self, num_iterations, reset_probability = 0.15): + """ + Pagerank on the graph depends on valid vertex and edge RDDs + Users can specify terminating conditions as number of + iterations or the Random reset probability or alpha + + :param num_iterations: Number of iterations for the + algorithm to terminate + :param reset_probability: Random reset probability + :return: + """ + + py_graph = self._sc._jvm.org.apache.PythonGraph.pagerank(num_iterations, reset_probability) + return py_graph.asJavaRDD() + + def connected_components(self): + py_graph = self._sc._jvm.org.apache.PythonGraph.connectedComponents() + return py_graph.asJavaRDD() + + def reverse(self): + py_graph = self._sc._jvm.org.apache.PythonGraph.reverse() + return py_graph.asJavaRDD() + + def apply(self, f): + def func(iterator): + return itertools.imap(f, iterator) + py_graph = self._sc._jvm.org.apache.PythonGraph.apply(func) + return py_graph.asJavaRDD() + + # TODO + def triangleCount(self): + return + + # TODO + def stronglyConnectedComponents(self, iterations): + return + + def Pregel(self, initial_message, vertex_program, send_message, combine_message): + return diff --git a/python/pyspark/graphx/graphloader.py b/python/pyspark/graphx/graphloader.py new file mode 100644 index 0000000000000..bfcb65acf1ae0 --- /dev/null +++ b/python/pyspark/graphx/graphloader.py @@ -0,0 +1,16 @@ + + +from pyspark import SparkContext +from pyspark.graphx import Graph, EdgeRDD, VertexRDD + + +class GraphLoader(object): + + @staticmethod + def edgeListFile(sc, filename, partitions, edgeStorageLevel, vertexStorageLevel): + + jrdd = sc.textFile(filename) + + graphLoader = sc._jvm.org.apache.spark.PythonGraphLoader + graph = graphLoader.edgeListFile(sc, filename, partitions, edgeStorageLevel, vertexStorageLevel) + return graph diff --git a/python/pyspark/graphx/partitionstrategy.py b/python/pyspark/graphx/partitionstrategy.py new file mode 100644 index 0000000000000..6071d5218b44a --- /dev/null +++ b/python/pyspark/graphx/partitionstrategy.py @@ -0,0 +1,9 @@ +__author__ = 'kdatta1' + +class PartitionStrategy(object): + EdgePartition1D = 1 + EdgePartition2D = 2 + RandomVertexCut = 3 + + + diff --git a/python/pyspark/graphx/tests.py b/python/pyspark/graphx/tests.py new file mode 100644 index 0000000000000..b24a1ef49ed3e --- /dev/null +++ b/python/pyspark/graphx/tests.py @@ -0,0 +1,203 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from pyspark.context import SparkConf, SparkContext, RDD +from pyspark.graphx.vertex import VertexRDD + + +class PyVertexRDDTestCase(unittest.TestCase): + """ + Test collect, take, count, mapValues, diff, + filter, mapVertexPartitions, innerJoin and leftJoin + for VertexRDD + """ + + def setUp(self): + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.default.parallelism", 1) + self.sc = SparkContext(appName=class_name, conf=conf) + self.sc.setCheckpointDir("/tmp") + + def tearDown(self): + self.sc.stop() + + def collect(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.take(1) + self.assertEqual(results, [(3, ("rxin", "student"))]) + + def take(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, [(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + + def count(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.count() + self.assertEqual(results, 2) + + def mapValues(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.mapValues(lambda x: x + ":" + x) + self.assertEqual(results, [(3, ("rxin:rxin", "student:student")), + (7, ("jgonzal:jgonzal", "postdoc:postdoc"))]) + + def innerJoin(self): + vertexData0 = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertexData1 = self.sc.parallelize([(1, ("rxin", "student")), (2, ("jgonzal", "postdoc"))]) + vertices0 = VertexRDD(vertexData0) + vertices1 = VertexRDD(vertexData1) + results = vertices0.innerJoin(vertices1).collect() + self.assertEqual(results, []) + + def leftJoin(self): + vertexData0 = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertexData1 = self.sc.parallelize([(1, ("rxin", "student")), (2, ("jgonzal", "postdoc"))]) + vertices0 = VertexRDD(vertexData0) + vertices1 = VertexRDD(vertexData1) + results = vertices0.diff(vertices1) + self.assertEqual(results, 2) + + +class PyEdgeRDDTestCase(unittest.TestCase): + """ + Test collect, take, count, mapValues, + filter and innerJoin for EdgeRDD + """ + + def setUp(self): + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.default.parallelism", 1) + self.sc = SparkContext(appName=class_name, conf=conf) + self.sc.setCheckpointDir("/tmp") + + def tearDown(self): + self.sc.stop() + + # TODO + def collect(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, [(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + + # TODO + def take(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, [(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + + # TODO + def count(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, 2) + + # TODO + def mapValues(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, 2) + + # TODO + def filter(self): + return + + # TODO + def innerJoin(self): + vertexData0 = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertexData1 = self.sc.parallelize([(1, ("rxin", "student")), (2, ("jgonzal", "postdoc"))]) + vertices0 = VertexRDD(vertexData0) + vertices1 = VertexRDD(vertexData1) + results = vertices0.diff(vertices1) + self.assertEqual(results, 2) + + +class PyGraphXTestCase(unittest.TestCase): + """ + Test vertices, edges, partitionBy, numEdges, numVertices, + inDegrees, outDegrees, degrees, triplets, mapVertices, + mapEdges, mapTriplets, reverse, subgraph, groupEdges, + joinVertices, outerJoinVertices, collectNeighborIds, + collectNeighbors, mapReduceTriplets, triangleCount for Graph + """ + + def setUp(self): + class_name = self.__class__.__name__ + conf = SparkConf().set("spark.default.parallelism", 1) + self.sc = SparkContext(appName=class_name, conf=conf) + self.sc.setCheckpointDir("/tmp") + + def tearDown(self): + self.sc.stop() + + def collect(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, [(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + + def take(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, [(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + + def count(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, 2) + + def mapValues(self): + vertexData = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertices = VertexRDD(vertexData) + results = vertices.collect() + self.assertEqual(results, 2) + + def diff(self): + vertexData0 = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertexData1 = self.sc.parallelize([(1, ("rxin", "student")), (2, ("jgonzal", "postdoc"))]) + vertices0 = VertexRDD(vertexData0) + vertices1 = VertexRDD(vertexData1) + results = vertices0.diff(vertices1) + self.assertEqual(results, 2) + + def innerJoin(self): + vertexData0 = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertexData1 = self.sc.parallelize([(1, ("rxin", "student")), (2, ("jgonzal", "postdoc"))]) + vertices0 = VertexRDD(vertexData0) + vertices1 = VertexRDD(vertexData1) + results = vertices0.diff(vertices1) + self.assertEqual(results, 2) + + def leftJoin(self): + vertexData0 = self.sc.parallelize([(3, ("rxin", "student")), (7, ("jgonzal", "postdoc"))]) + vertexData1 = self.sc.parallelize([(1, ("rxin", "student")), (2, ("jgonzal", "postdoc"))]) + vertices0 = VertexRDD(vertexData0) + vertices1 = VertexRDD(vertexData1) + results = vertices0.diff(vertices1) + self.assertEqual(results, 2) diff --git a/python/pyspark/graphx/vertex.py b/python/pyspark/graphx/vertex.py new file mode 100644 index 0000000000000..44e7eaf0ef147 --- /dev/null +++ b/python/pyspark/graphx/vertex.py @@ -0,0 +1,330 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python bindings for VertexRDD in GraphX +""" + +import itertools +import os +from tempfile import NamedTemporaryFile +from numpy.numarray.numerictypes import Long + +from py4j.java_collections import MapConverter, ListConverter +import operator +from pyspark.accumulators import PStatsParam +from pyspark.rdd import PipelinedRDD +from pyspark.serializers import CloudPickleSerializer, NoOpSerializer, AutoBatchedSerializer +from pyspark import RDD, PickleSerializer, StorageLevel, SparkContext +from pyspark.traceback_utils import SCCallSiteSync + + +__all__ = ["VertexRDD", "VertexId"] + + +""" +The default type of vertex id is Long +A separate VertexId type is defined +here so that other types can be used +for vertex ids in future +""" +VertexId = Long + + +class VertexRDD(object): + """ + VertexRDD class defines the vertex actions and transformations. The complete list of + transformations and actions for vertices are available at + `http://spark.apache.org/docs/latest/graphx-programming-guide.html` + These operations are mapped to Scala functions defined + in `org.apache.spark.graphx.impl.VertexRDDImpl` + """ + + def __init__(self, jrdd, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())): + """ + Constructor + :param jrdd: A JavaRDD reference passed from the parent + RDD object + :param jrdd_deserializer: The deserializer used in Python workers + created from PythonRDD to execute a + serialized Python function and RDD + """ + + self.name = "VertexRDD" + self.jrdd = jrdd + self.is_cached = False + self.is_checkpointed = False + self.ctx = SparkContext._active_spark_context + self.jvertex_rdd_deserializer = jrdd_deserializer + self.id = jrdd.id() + self.partitionFunc = None + self.bypass_serializer = False + self.preserve_partitioning = False + + self.jvertex_rdd = self.getJavaVertexRDD(jrdd, jrdd_deserializer) + + def __repr__(self): + return self.jvertex_rdd.toString() + + def cache(self): + """ + Persist this vertex RDD with the default storage level (C{MEMORY_ONLY_SER}). + """ + self.is_cached = True + self.persist(StorageLevel.MEMORY_ONLY_SER) + return self + + def checkpoint(self): + self.is_checkpointed = True + self.jvertex_rdd.checkpoint() + + def count(self): + return self.jvertex_rdd.count() + + def diff(self, other, numPartitions=2): + """ + Hides vertices that are the same between `this` and `other`. + For vertices that are different, keeps the values from `other`. + + TODO: give an example + """ + if (isinstance(other, RDD)): + vs = self.map(lambda (k, v): (k, (1, v))) + ws = other.map(lambda (k, v): (k, (2, v))) + return vs.union(ws).groupByKey(numPartitions).mapValues(lambda x: x.diff(x.__iter__())) + + def isCheckpointed(self): + """ + Return whether this RDD has been checkpointed or not + """ + return self.is_checkpointed + + def mapValues(self, f, preserves_partitioning=False): + """ + Return a new vertex RDD by applying a function to each vertex attributes, + preserving the index + + >>> rdd = sc.parallelize([(1, "b"), (2, "a"), (3, "c")]) + >>> vertices = VertexRDD(rdd) + >>> sorted(vertices.mapValues(lambda x: (x + ":" + x)).collect()) + [(1, 'a:a'), (2, 'b:b'), (3, 'c:c')] + """ + def func(_, iterator): + return itertools.imap(lambda (k, v): (k, f(v)), iterator) + return PipelinedVertexRDD(self, func, preserves_partitioning) + + def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): + self.is_cached = True + java_storage_level = self.ctx._getJavaStorageLevel(storageLevel) + self.jvertex_rdd.persist(java_storage_level) + return self + + # TODO: This is a hack. take() must call JavaVertexRDD.take() + def take(self, num=10): + return self.jrdd.take(num) + + def unpersist(self, blocking = False): + self.is_cached = False + self.jvertex_rdd.unpersist(blocking) + return self + + def mapVertexPartitions(self, f, preserve_partitioning=False): + def func(s, iterator): + return f(iterator) + return PipelinedVertexRDD(self, func, preserve_partitioning) + + # TODO + def filter(self, f): + """ + Return a new vertex RDD containing only the elements that satisfy a predicate. + + >>> rdd = sc.parallelize([(1, "b"), (2, "a"), (3, "c")]) + >>> vertices = VertexRDD(rdd) + >>> vertices.filter(lambda x: x._1 % 2 == 0).collect() + [2] + """ + return self.jvertex_rdd.filter(f) + + # TODO: The best way to do an innerJoin on vertex RDDs is to use the optimized inner + # TODO: technique defined in VertexRDDImpl. This solution does not scale + def innerJoin(self, other): + return self.jrdd.join(other.jrdd) + + def leftJoin(self, other, numPartitions=None): + return self.jrdd.leftOuterJoin(other.jrdd, numPartitions) + + def collect(self): + """ + Return a list that contains all of the elements in this RDD. + """ + with SCCallSiteSync(self.ctx) as css: + bytesInJava = self.jvertex_rdd.collect().iterator() + return list(self._collect_iterator_through_file(bytesInJava)) + + def _collect_iterator_through_file(self, iterator): + # Transferring lots of data through Py4J can be slow because + # socket.readline() is inefficient. Instead, we'll dump the data to a + # file and read it back. + tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) + tempFile.close() + self.ctx._writeToFile(iterator, tempFile.name) + # Read the data into Python and deserialize it: + with open(tempFile.name, 'rb') as tempFile: + for item in self.jvertex_rdd_deserializer.load_stream(tempFile): + yield item + os.unlink(tempFile.name) + + def getJavaVertexRDD(self, rdd, rdd_deserializer): + if self.bypass_serializer: + self.jvertex_rdd_deserializer = NoOpSerializer() + rdd_deserializer = NoOpSerializer() + enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" + profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None + def f(index, iterator): + return iterator + command = (f, profileStats, rdd_deserializer, + rdd_deserializer) + # the serialized command will be compressed by broadcast + ser = CloudPickleSerializer() + pickled_command = ser.dumps(command) + if len(pickled_command) > (1 << 20): # 1M + self.broadcast = self.ctx.broadcast(pickled_command) + pickled_command = ser.dumps(self.broadcast) + + # the serialized command will be compressed by broadcast + broadcast_vars = ListConverter().convert( + [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], + self.ctx._gateway._gateway_client) + self.ctx._pickled_broadcast_vars.clear() + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) + includes = ListConverter().convert(self.ctx._python_includes, + self.ctx._gateway._gateway_client) + target_storage_level = StorageLevel.MEMORY_ONLY + java_storage_level = self.ctx._getJavaStorageLevel(target_storage_level) + prdd = self.ctx._jvm.PythonVertexRDD(rdd._jrdd, + bytearray(pickled_command), + env, includes, self.preserve_partitioning, + self.ctx.pythonExec, + broadcast_vars, self.ctx._javaAccumulator, + java_storage_level) + self.jvertex_rdd = prdd.asJavaVertexRDD() + if enable_profile: + self.id = self.jvertex_rdd.id() + self.ctx._add_profile(self.id, profileStats) + return self.jvertex_rdd + + +class PipelinedVertexRDD(VertexRDD): + + """ + Pipelined mapValues in VertexRDD: + + >>> rdd = sc.parallelize([(1, ("Alice", 29)), (2, ("Bob", 30)), \ + (3, ("Charlie", 31)), (4, ("Dwayne", 32))]) + >>> vertices = VertexRDD(rdd) + >>> vertices.mapValues(lambda x: x[1] * 2).cache().collect() + [(1, ("Alice", 58)), (2, ("Bob", 60)), \ + (3, ("Charlie", 62)), (4, ("Dwayne", 64))] + + Pipelined reduces in VertexRDD: + >>> from operator import add + >>> rdd.map(lambda x: 2 * x).reduce(add) + 20 + >>> rdd.flatMap(lambda x: [x, x]).reduce(add) + 20 + """ + + def __init__(self, prev, func, preservesPartitioning=False): + if not isinstance(prev, PipelinedVertexRDD) or not prev.is_pipelinable(): + # This transformation is the first in its stage: + self.func = func + self.preservesPartitioning = preservesPartitioning + self.prev_jvertex_rdd = prev.jvertex_rdd + self.prev_jvertex_rdd_deserializer = prev.jvertex_rdd_deserializer + else: + prev_func = prev.func + + def pipeline_func(split, iterator): + return func(split, prev_func(split, iterator)) + self.func = pipeline_func + self.preservesPartitioning = \ + prev.preservesPartitioning and preservesPartitioning + self.prev_jvertex_rdd = prev.jvertex_rdd + self.prev_jvertex_rdd_deserializer = prev.prev_jvertex_rdd_deserializer + + self.is_cached = False + self.is_checkpointed = False + self.ctx = prev.ctx + self.prev = prev + self.jvrdd_val = None + self.id = None + self.jvertex_rdd_deserializer = self.ctx.serializer + self.bypass_serializer = False + self.partitionFunc = prev._partitionFunc if self.preservesPartitioning else None + self.broadcast = None + + def __del__(self): + if self.broadcast: + self.broadcast.unpersist() + self.broadcast = None + + @property + def jvertex_rdd(self): + if self.jvrdd_val: + return self.jvrdd_val + if self.bypass_serializer: + self.jvertex_rdd_deserializer = NoOpSerializer() + enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" + profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None + command = (self.func, profileStats, self.prev_jvertex_rdd_deserializer, + self.jvertex_rdd_deserializer) + # the serialized command will be compressed by broadcast + ser = CloudPickleSerializer() + pickled_command = ser.dumps(command) + if len(pickled_command) > (1 << 20): # 1M + self.broadcast = self.ctx.broadcast(pickled_command) + pickled_command = ser.dumps(self.broadcast) + broadcast_vars = ListConverter().convert( + [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], + self.ctx._gateway._gateway_client) + self.ctx._pickled_broadcast_vars.clear() + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) + includes = ListConverter().convert(self.ctx._python_includes, + self.ctx._gateway._gateway_client) + java_storage_level = self.ctx._getJavaStorageLevel(StorageLevel.MEMORY_ONLY) + python_rdd = self.ctx._jvm.PythonVertexRDD(self.prev_jvertex_rdd, + bytearray(pickled_command), + env, includes, self.preservesPartitioning, + self.ctx.pythonExec, + broadcast_vars, self.ctx._javaAccumulator, + java_storage_level) + self.jvrdd_val = python_rdd.asJavaVertexRDD() + + if enable_profile: + self.id = self.jvrdd_val.id() + self.ctx._add_profile(self.id, profileStats) + return self.jvrdd_val + + def id(self): + if self.id is None: + self.id = self.jvertex_rdd.id() + return self.id + + def is_pipelinable(self): + return not (self.is_cached or self.is_checkpointed) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index a975dc19cb78e..c45003e2f7e8d 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -110,6 +110,7 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") + java_import(gateway.jvm, "org.apache.spark.graphx.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 1e24da7f5f60c..95995949b3d45 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -24,7 +24,7 @@ public abstract class LocalJavaStreamingContext { - protected transient JavaStreamingContext ssc; + protected transient JavaStreamingContext ssc; @Before public void setUp() {