Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
468 changes: 468 additions & 0 deletions CHANGES.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: SparkR
Type: Package
Title: R frontend for Spark
Version: 1.5.0
Version: 1.5.1
Date: 2013-09-09
Author: The Apache Software Foundation
Maintainer: Shivaram Venkataraman <[email protected]>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,15 @@ public BoxedUnit apply() {
/**
* Allocates new sort data structures. Called when creating the sorter and after each spill.
*/
public void initializeForWriting() throws IOException {
private void initializeForWriting() throws IOException {
// Note: Do not track memory for the pointer array for now because of SPARK-10474.
// In more detail, in TungstenAggregate we only reserve a page, but when we fall back to
// sort-based aggregation we try to acquire a page AND a pointer array, which inevitably
// fails if all other memory is already occupied. It should be safe to not track the array
// because its memory footprint is frequently much smaller than that of a page. This is a
// temporary hack that we should address in 1.6.0.
// TODO: track the pointer array memory!
this.writeMetrics = new ShuffleWriteMetrics();
final long pointerArrayMemory =
UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pointerArrayMemory);
if (memoryAcquired != pointerArrayMemory) {
shuffleMemoryManager.release(memoryAcquired);
throw new IOException("Could not acquire " + pointerArrayMemory + " bytes of memory");
}

this.inMemSorter =
new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
this.isInMemSorterExternal = false;
Expand All @@ -187,14 +186,6 @@ public void closeCurrentPage() {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
spill(true);
}

/**
* Sort and spill the current records in response to memory pressure.
* @param shouldInitializeForWriting whether to allocate memory for writing after the spill
*/
public void spill(boolean shouldInitializeForWriting) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Expand Down Expand Up @@ -225,9 +216,7 @@ public void spill(boolean shouldInitializeForWriting) throws IOException {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

if (shouldInitializeForWriting) {
initializeForWriting();
}
initializeForWriting();
}

/**
Expand Down Expand Up @@ -275,14 +264,7 @@ private long freeMemory() {
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
if (inMemSorter != null) {
if (!isInMemSorterExternal) {
long sorterMemoryUsage = inMemSorter.getMemoryUsage();
memoryFreed += sorterMemoryUsage;
shuffleMemoryManager.release(sorterMemoryUsage);
}
inMemSorter = null;
}
// TODO: track in-memory sorter memory usage (SPARK-10474)
allocatedPages.clear();
currentPage = null;
currentPagePosition = -1;
Expand Down Expand Up @@ -320,17 +302,8 @@ public void cleanupResources() {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
if (memoryAcquired < memoryToGrowPointerArray) {
shuffleMemoryManager.release(memoryAcquired);
spill();
} else {
inMemSorter.expandPointerArray();
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
}
// TODO: track the pointer array memory! (SPARK-10474)
inMemSorter.expandPointerArray();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ private[spark] object PythonRDD extends Logging {
*
* The thread will terminate after all the data are sent or any exceptions happen.
*/
private def serveIterator[T](items: Iterator[T], threadName: String): Int = {
def serveIterator[T](items: Iterator[T], threadName: String): Int = {
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
// Close the socket if no connection in 3 seconds
serverSocket.setSoTimeout(3000)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ package org.apache
*/

package object spark {
val SPARK_VERSION = "1.5.0"
val SPARK_VERSION = "1.5.1"
}
4 changes: 2 additions & 2 deletions dev/create-release/generate-changelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import traceback

SPARK_HOME = os.environ["SPARK_HOME"]
NEW_RELEASE_VERSION = "1.5.0"
PREV_RELEASE_GIT_TAG = "v1.4.0"
NEW_RELEASE_VERSION = "1.5.1"
PREV_RELEASE_GIT_TAG = "v1.5.0"

CHANGELIST = "CHANGES.txt"
OLD_CHANGELIST = "%s.old" % (CHANGELIST)
Expand Down
4 changes: 2 additions & 2 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ include:

# These allow the documentation to be updated with newer releases
# of Spark, Scala, and Mesos.
SPARK_VERSION: 1.5.0
SPARK_VERSION_SHORT: 1.5.0
SPARK_VERSION: 1.5.1
SPARK_VERSION_SHORT: 1.5.1
SCALA_BINARY_VERSION: "2.10"
SCALA_VERSION: "2.10.4"
MESOS_VERSION: 0.21.0
Expand Down
8 changes: 5 additions & 3 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
raw_input = input
xrange = range

SPARK_EC2_VERSION = "1.5.0"
SPARK_EC2_VERSION = "1.5.1"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))

VALID_SPARK_VERSIONS = set([
Expand All @@ -72,7 +72,8 @@
"1.3.1",
"1.4.0",
"1.4.1",
"1.5.0"
"1.5.0",
"1.5.1"
])

SPARK_TACHYON_MAP = {
Expand All @@ -87,7 +88,8 @@
"1.3.1": "0.5.0",
"1.4.0": "0.6.4",
"1.4.1": "0.6.4",
"1.5.0": "0.7.1"
"1.5.0": "0.7.1",
"1.5.1": "0.7.1"
}

DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ def take(self, num):
>>> df.take(2)
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
"""
return self.limit(num).collect()
with SCCallSiteSync(self._sc) as css:
port = self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(
self._jdf, num)
return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))

@ignore_unicode_prefix
@since(1.3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public UnsafeKVExternalSorter(StructType keySchema, StructType valueSchema,
// We will use the number of elements in the map as the initialSize of the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
// we will use 1 as its initial size if the map is empty.
// TODO: track pointer array memory used by this in-memory sorter!
// TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474)
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));

Expand Down Expand Up @@ -124,13 +124,8 @@ public UnsafeKVExternalSorter(StructType keySchema, StructType valueSchema,
pageSizeBytes,
inMemSorter);

// Note: This spill doesn't actually release any memory, so if we try to allocate a new
// pointer array immediately after the spill then we may fail to acquire sufficient space
// for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after*
// we have actually freed memory from our map.
sorter.spill(false /* initialize for writing */);
sorter.spill();
map.free();
sorter.initializeForWriting();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.{PythonRunner, PythonBroadcast, PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -118,6 +119,17 @@ object EvaluatePython {
def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython =
new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)())

def takeAndServe(df: DataFrame, n: Int): Int = {
registerPicklers()
// This is an annoying hack - we should refactor the code so executeCollect and executeTake
// returns InternalRow rather than Row.
val converter = CatalystTypeConverters.createToCatalystConverter(df.schema)
val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row =>
EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema)
})
PythonRDD.serveIterator(iter, s"serve-DataFrame")
}

/**
* Helper for converting from Catalyst type to java type suitable for Pyrolite.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,20 @@ private[sql] class ExamplePointUDT extends UserDefinedType[ExamplePoint] {

override def pyUDT: String = "pyspark.sql.tests.ExamplePointUDT"

override def serialize(obj: Any): Seq[Double] = {
override def serialize(obj: Any): GenericArrayData = {
obj match {
case p: ExamplePoint =>
Seq(p.x, p.y)
val output = new Array[Any](2)
output(0) = p.x
output(1) = p.y
new GenericArrayData(output)
}
}

override def deserialize(datum: Any): ExamplePoint = {
datum match {
case values: Seq[_] =>
val xy = values.asInstanceOf[Seq[Double]]
assert(xy.length == 2)
new ExamplePoint(xy(0), xy(1))
case values: util.ArrayList[_] =>
val xy = values.asInstanceOf[util.ArrayList[Double]].asScala
new ExamplePoint(xy(0), xy(1))
case values: ArrayData =>
new ExamplePoint(values.getDouble(0), values.getDouble(1))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()

withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
// 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter.
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
initialMemoryConsumption + 4096 * 16)
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}

// Add more keys to the sorter and make sure the results come out sorted.
Expand Down Expand Up @@ -305,9 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite
val sorter = map.destructAndCreateExternalSorter()

withClue(s"destructAndCreateExternalSorter should release memory used by the map") {
// 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter.
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() ===
initialMemoryConsumption + 4096 * 16)
assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption)
}

// Add more keys to the sorter and make sure the results come out sorted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {

private var _failureReasons: Map[Int, String] = Map.empty

@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)

Expand Down Expand Up @@ -67,4 +69,12 @@ case class BatchInfo(
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum

/** Set the failure reasons corresponding to every output ops in the batch */
private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
_failureReasons = reasons
}

/** Failure reasons corresponding to every output ops in the batch */
private[streaming] def failureReasons = _failureReasons
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}

private def handleJobCompletion(job: Job) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Success(_) =>
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.streaming.scheduler

import scala.collection.mutable.HashSet
import scala.util.Failure

import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils

/** Class representing a set of Jobs
* belong to the same batch.
Expand Down Expand Up @@ -62,12 +64,23 @@ case class JobSet(
}

def toBatchInfo: BatchInfo = {
new BatchInfo(
val failureReasons: Map[Int, String] = {
if (hasCompleted) {
jobs.filter(_.result.isFailure).map { job =>
(job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
}.toMap
} else {
Map.empty
}
}
val binfo = new BatchInfo(
time,
streamIdToInputInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
if (processingStartTime >= 0) Some(processingStartTime) else None,
if (processingEndTime >= 0) Some(processingEndTime) else None
)
binfo.setFailureReason(failureReasons)
binfo
}
}
Loading