Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,12 @@
<phase>generate-resources</phase>
<configuration>
<!-- Execute the shell script to generate the spark build information. -->
<tasks>
<target>
<exec executable="${project.basedir}/../build/spark-build-info">
<arg value="${project.build.directory}/extra-resources"/>
<arg value="${pom.version}"/>
<arg value="${project.version}"/>
</exec>
</tasks>
</target>
</configuration>
<goals>
<goal>run</goal>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1602,13 +1602,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}

test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
override def zero(initialValue: Int): Int = 0
override def addInPlace(r1: Int, r2: Int): Int = {
throw new DAGSchedulerSuiteDummyException
}
})
val acc = new LongAccumulator {
override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException
override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException
}
sc.register(acc)

// Run this on executors
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.language.existentials
import scala.reflect.ClassTag

import org.scalactic.TripleEquals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,13 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("accumulators are updated on exception failures") {
// This means use 1 core and 4 max task failures
sc = new SparkContext("local[1,4]", "test")
val param = AccumulatorParam.LongAccumulatorParam
// Create 2 accumulators, one that counts failed values and another that doesn't
val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
val acc1 = AccumulatorSuite.createLongAccum("x", true)
val acc2 = AccumulatorSuite.createLongAccum("y", false)
// Fail first 3 attempts of every task. This means each task should be run 4 times.
sc.parallelize(1 to 10, 10).map { i =>
acc1 += 1
acc2 += 1
acc1.add(1)
acc2.add(1)
if (TaskContext.get.attemptNumber() <= 2) {
throw new Exception("you did something wrong")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution

import scala.collection.mutable.HashSet

import org.apache.spark.{Accumulator, AccumulatorParam}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
Expand All @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.LongAccumulator
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}

/**
* Contains methods for debugging query execution.
Expand Down Expand Up @@ -108,26 +107,27 @@ package object debug {
private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
def output: Seq[Attribute] = child.output

implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
def zero(initialValue: HashSet[String]): HashSet[String] = {
initialValue.clear()
initialValue
}

def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
v1 ++= v2
v1
class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] {
private val _set = new HashSet[T]()
override def isZero: Boolean = _set.isEmpty
override def copy(): AccumulatorV2[T, HashSet[T]] = {
val newAcc = new SetAccumulator[T]()
newAcc._set ++= _set
newAcc
}
override def reset(): Unit = _set.clear()
override def add(v: T): Unit = _set += v
override def merge(other: AccumulatorV2[T, HashSet[T]]): Unit = _set ++= other.value
override def value: HashSet[T] = _set
}

/**
* A collection of metrics for each column of output.
*
* @param elementTypes the actual runtime types for the output. Useful when there are bugs
* causing the wrong data to be projected.
*/
case class ColumnMetrics(
elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
case class ColumnMetrics() {
val elementTypes = new SetAccumulator[String]
sparkContext.register(elementTypes)
}

val tupleCount: LongAccumulator = sparkContext.longAccumulator

Expand Down Expand Up @@ -155,7 +155,7 @@ package object debug {
while (i < numColumns) {
val value = currentRow.get(i, output(i).dataType)
if (value != null) {
columnStats(i).elementTypes += HashSet(value.getClass.getName)
columnStats(i).elementTypes.add(value.getClass.getName)
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
}

test("Normal accumulator should do boxing") {
// We need this test to make sure BoxingFinder works.
val l = sparkContext.accumulator(0L)
val f = () => { l += 1L }
val cl = BoxingFinder.getClassReader(f.getClass)
val boxingFinder = new BoxingFinder()
cl.accept(boxingFinder, 0)
assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test")
}

/**
* Call `df.collect()` and verify if the collected metrics are same as "expectedMetrics".
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}

def createContainer(host: String): Container = {
// When YARN 2.6+ is required, avoid deprecation by using version with long second arg
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Expand Down