Skip to content

Commit d062001

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-35410
2 parents 01a8c02 + fb93163 commit d062001

39 files changed

Lines changed: 1497 additions & 1144 deletions

File tree

.github/workflows/build_and_test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,7 @@ jobs:
537537
uses: actions/checkout@v2
538538
with:
539539
repository: databricks/tpcds-kit
540+
ref: 2a5078a782192ddb6efbcead8de9973d6ab4f069
540541
path: ./tpcds-kit
541542
- name: Build tpcds-kit
542543
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private[spark] class TaskSchedulerImpl(
128128

129129
@volatile private var hasReceivedTask = false
130130
@volatile private var hasLaunchedTask = false
131-
private val starvationTimer = new Timer(true)
131+
private val starvationTimer = new Timer("task-starvation-timer", true)
132132

133133
// Incrementing task IDs
134134
val nextTaskId = new AtomicLong(0)
@@ -152,7 +152,7 @@ private[spark] class TaskSchedulerImpl(
152152

153153
protected val executorIdToHost = new HashMap[String, String]
154154

155-
private val abortTimer = new Timer(true)
155+
private val abortTimer = new Timer("task-abort-timer", true)
156156
// Exposed for testing
157157
val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]
158158

core/src/test/scala/org/apache/spark/SparkFunSuite.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,22 @@ package org.apache.spark
1919

2020
// scalastyle:off
2121
import java.io.File
22+
import java.nio.file.Path
2223
import java.util.{Locale, TimeZone}
2324

24-
import org.apache.log4j.spi.LoggingEvent
25-
2625
import scala.annotation.tailrec
26+
import scala.collection.mutable.ArrayBuffer
27+
2728
import org.apache.commons.io.FileUtils
2829
import org.apache.log4j.{Appender, AppenderSkeleton, Level, Logger}
30+
import org.apache.log4j.spi.LoggingEvent
2931
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, Failed, Outcome}
3032
import org.scalatest.funsuite.AnyFunSuite
33+
3134
import org.apache.spark.internal.Logging
3235
import org.apache.spark.internal.config.Tests.IS_TESTING
3336
import org.apache.spark.util.{AccumulatorContext, Utils}
3437

35-
import scala.collection.mutable.ArrayBuffer
36-
3738
/**
3839
* Base abstract class for all unit tests in Spark for handling common functionality.
3940
*
@@ -119,6 +120,17 @@ abstract class SparkFunSuite
119120
file
120121
}
121122

123+
/**
124+
* Get a Path relative to the root project. It is assumed that a spark home is set.
125+
*/
126+
protected final def getWorkspaceFilePath(first: String, more: String*): Path = {
127+
if (!(sys.props.contains("spark.test.home") || sys.env.contains("SPARK_HOME"))) {
128+
fail("spark.test.home or SPARK_HOME is not set.")
129+
}
130+
val sparkHome = sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
131+
java.nio.file.Paths.get(sparkHome, first +: more: _*)
132+
}
133+
122134
/**
123135
* Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to
124136
* set up and tear down resources.

docs/monitoring.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ license: |
1919
limitations under the License.
2020
---
2121

22+
* This will become a table of contents (this text will be scraped).
23+
{:toc}
24+
2225
There are several ways to monitor Spark applications: web UIs, metrics, and external instrumentation.
2326

2427
# Web Interfaces

docs/sql-migration-guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ license: |
8787

8888
- In Spark 3.2, Spark supports `DayTimeIntervalType` and `YearMonthIntervalType` as inputs and outputs of `TRANSFORM` clause in Hive `SERDE` mode, the behavior is different between Hive `SERDE` mode and `ROW FORMAT DELIMITED` mode when these two types are used as inputs. In Hive `SERDE` mode, `DayTimeIntervalType` column is converted to `HiveIntervalDayTime`, its string format is `[-]?d h:m:s.n`, but in `ROW FORMAT DELIMITED` mode the format is `INTERVAL '[-]?d h:m:s.n' DAY TO TIME`. In Hive `SERDE` mode, `YearMonthIntervalType` column is converted to `HiveIntervalYearMonth`, its string format is `[-]?y-m`, but in `ROW FORMAT DELIMITED` mode the format is `INTERVAL '[-]?y-m' YEAR TO MONTH`.
8989

90+
- In Spark 3.2, `hash(0) == hash(-0)` for floating point types. Previously, different values were generated.
91+
9092
## Upgrading from Spark SQL 3.0 to 3.1
9193

9294
- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.

docs/submitting-applications.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ The master URL passed to Spark can be in one of the following formats:
181181
The cluster location will be found based on the <code>HADOOP_CONF_DIR</code> or <code>YARN_CONF_DIR</code> variable.
182182
</td></tr>
183183
<tr><td> <code>k8s://HOST:PORT</code> </td><td> Connect to a <a href="running-on-kubernetes.html">Kubernetes</a> cluster in
184-
<code>cluster</code> mode. Client mode is currently unsupported and will be supported in future releases.
184+
<code>client</code> or <code>cluster</code> mode depending on the value of <code>--deploy-mode</code>.
185185
The <code>HOST</code> and <code>PORT</code> refer to the <a href="https://kubernetes.io/docs/reference/generated/kube-apiserver/">Kubernetes API Server</a>.
186186
It connects using TLS by default. In order to force it to use an unsecured connection, you can use
187187
<code>k8s://http://HOST:PORT</code>.

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
#
17-
sbt.version=1.5.1
17+
sbt.version=1.5.2

python/pyspark/ml/tuning.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ class CrossValidator(Estimator, _CrossValidatorParams, HasParallelism, HasCollec
602602
>>> from pyspark.ml.classification import LogisticRegression
603603
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
604604
>>> from pyspark.ml.linalg import Vectors
605-
>>> from pyspark.ml.tuning import CrossValidatorModel
605+
>>> from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
606606
>>> import tempfile
607607
>>> dataset = spark.createDataFrame(
608608
... [(Vectors.dense([0.0]), 0.0),
@@ -1141,6 +1141,7 @@ class TrainValidationSplit(Estimator, _TrainValidationSplitParams, HasParallelis
11411141
>>> from pyspark.ml.classification import LogisticRegression
11421142
>>> from pyspark.ml.evaluation import BinaryClassificationEvaluator
11431143
>>> from pyspark.ml.linalg import Vectors
1144+
>>> from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
11441145
>>> from pyspark.ml.tuning import TrainValidationSplitModel
11451146
>>> import tempfile
11461147
>>> dataset = spark.createDataFrame(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,25 @@ abstract class HashExpression[E] extends Expression {
369369
protected def genHashBoolean(input: String, result: String): String =
370370
genHashInt(s"$input ? 1 : 0", result)
371371

372-
protected def genHashFloat(input: String, result: String): String =
373-
genHashInt(s"Float.floatToIntBits($input)", result)
372+
protected def genHashFloat(input: String, result: String): String = {
373+
s"""
374+
|if($input == -0.0f) {
375+
| ${genHashInt("0", result)}
376+
|} else {
377+
| ${genHashInt(s"Float.floatToIntBits($input)", result)}
378+
|}
379+
""".stripMargin
380+
}
374381

375-
protected def genHashDouble(input: String, result: String): String =
376-
genHashLong(s"Double.doubleToLongBits($input)", result)
382+
protected def genHashDouble(input: String, result: String): String = {
383+
s"""
384+
|if($input == -0.0d) {
385+
| ${genHashLong("0L", result)}
386+
|} else {
387+
| ${genHashLong(s"Double.doubleToLongBits($input)", result)}
388+
|}
389+
""".stripMargin
390+
}
377391

378392
protected def genHashDecimal(
379393
ctx: CodegenContext,
@@ -523,7 +537,9 @@ abstract class InterpretedHashFunction {
523537
case s: Short => hashInt(s, seed)
524538
case i: Int => hashInt(i, seed)
525539
case l: Long => hashLong(l, seed)
540+
case f: Float if (f == -0.0f) => hashInt(0, seed)
526541
case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed)
542+
case d: Double if (d == -0.0d) => hashLong(0L, seed)
527543
case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
528544
case d: Decimal =>
529545
val precision = dataType.asInstanceOf[DecimalType].precision

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ trait InvokeLike extends Expression with NonSQLExpression {
5252

5353
protected lazy val needNullCheck: Boolean = propagateNull && arguments.exists(_.nullable)
5454
protected lazy val evaluatedArgs: Array[Object] = new Array[Object](arguments.length)
55+
private lazy val boxingFn: Any => Any =
56+
ScalaReflection.typeBoxedJavaMapping
57+
.get(dataType)
58+
.map(cls => v => cls.cast(v))
59+
.getOrElse(identity)
60+
5561

5662
/**
5763
* Prepares codes for arguments.
@@ -122,12 +128,7 @@ trait InvokeLike extends Expression with NonSQLExpression {
122128
* @param dataType the data type of the return object
123129
* @return the return object of a method call
124130
*/
125-
def invoke(
126-
obj: Any,
127-
method: Method,
128-
arguments: Seq[Expression],
129-
input: InternalRow,
130-
dataType: DataType): Any = {
131+
def invoke(obj: Any, method: Method, input: InternalRow): Any = {
131132
var i = 0
132133
val len = arguments.length
133134
while (i < len) {
@@ -145,12 +146,7 @@ trait InvokeLike extends Expression with NonSQLExpression {
145146
case e: java.lang.reflect.InvocationTargetException if e.getCause != null =>
146147
throw e.getCause
147148
}
148-
val boxedClass = ScalaReflection.typeBoxedJavaMapping.get(dataType)
149-
if (boxedClass.isDefined) {
150-
boxedClass.get.cast(ret)
151-
} else {
152-
ret
153-
}
149+
boxingFn(ret)
154150
}
155151
}
156152

@@ -256,7 +252,7 @@ case class StaticInvoke(
256252
@transient lazy val method = findMethod(cls, functionName, argClasses)
257253

258254
override def eval(input: InternalRow): Any = {
259-
invoke(null, method, arguments, input, dataType)
255+
invoke(null, method, input)
260256
}
261257

262258
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
@@ -358,7 +354,7 @@ case class Invoke(
358354
} else {
359355
obj.getClass.getMethod(functionName, argClasses: _*)
360356
}
361-
invoke(obj, invokeMethod, arguments, input, dataType)
357+
invoke(obj, invokeMethod, input)
362358
}
363359
}
364360

0 commit comments

Comments
 (0)