Skip to content

Commit 70cab8b

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-26085
2 parents d784142 + 81550b3 commit 70cab8b

File tree

60 files changed

+986
-370
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+986
-370
lines changed

R/pkg/tests/fulltests/test_streaming.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ test_that("Specify a schema by using a DDL-formatted string when reading", {
127127
expect_false(awaitTermination(q, 5 * 1000))
128128
callJMethod(q@ssq, "processAllAvailable")
129129
expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
130+
stopQuery(q)
130131

131132
expect_error(read.stream(path = parquetPath, schema = "name stri"),
132133
"DataType stri is not supported.")

bin/load-spark-env.sh

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,40 @@ if [ -z "${SPARK_HOME}" ]; then
2626
source "$(dirname "$0")"/find-spark-home
2727
fi
2828

29+
SPARK_ENV_SH="spark-env.sh"
2930
if [ -z "$SPARK_ENV_LOADED" ]; then
3031
export SPARK_ENV_LOADED=1
3132

3233
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
3334

34-
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
35+
SPARK_ENV_SH="${SPARK_CONF_DIR}/${SPARK_ENV_SH}"
36+
if [[ -f "${SPARK_ENV_SH}" ]]; then
3537
# Promote all variable declarations to environment (exported) variables
3638
set -a
37-
. "${SPARK_CONF_DIR}/spark-env.sh"
39+
. ${SPARK_ENV_SH}
3840
set +a
3941
fi
4042
fi
4143

4244
# Setting SPARK_SCALA_VERSION if not already set.
4345

4446
if [ -z "$SPARK_SCALA_VERSION" ]; then
47+
SCALA_VERSION_1=2.12
48+
SCALA_VERSION_2=2.11
4549

46-
ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
47-
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"
48-
49-
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
50-
echo -e "Presence of build for multiple Scala versions detected." 1>&2
51-
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION in spark-env.sh.' 1>&2
50+
ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}"
51+
ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}"
52+
ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables"
53+
if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then
54+
echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2
55+
echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2
56+
echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2
5257
exit 1
5358
fi
5459

55-
if [ -d "$ASSEMBLY_DIR2" ]; then
56-
export SPARK_SCALA_VERSION="2.11"
60+
if [[ -d "$ASSEMBLY_DIR_1" ]]; then
61+
export SPARK_SCALA_VERSION=${SCALA_VERSION_1}
5762
else
58-
export SPARK_SCALA_VERSION="2.12"
63+
export SPARK_SCALA_VERSION=${SCALA_VERSION_2}
5964
fi
6065
fi

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
2121
import java.security.PrivilegedExceptionAction
2222
import java.text.DateFormat
23-
import java.util.{Arrays, Date, Locale}
23+
import java.util.{Arrays, Comparator, Date, Locale}
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.immutable.Map
@@ -269,10 +269,11 @@ private[spark] class SparkHadoopUtil extends Logging {
269269
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
270270
}
271271
})
272-
Arrays.sort(fileStatuses,
273-
(o1: FileStatus, o2: FileStatus) => {
272+
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
273+
override def compare(o1: FileStatus, o2: FileStatus): Int = {
274274
Longs.compare(o1.getModificationTime, o2.getModificationTime)
275-
})
275+
}
276+
})
276277
fileStatuses
277278
} catch {
278279
case NonFatal(e) =>

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,12 @@ package object config {
570570
.stringConf
571571
.createOptional
572572

573+
private[spark] val UI_REQUEST_HEADER_SIZE =
574+
ConfigBuilder("spark.ui.requestHeaderSize")
575+
.doc("Value for HTTP request header size in bytes.")
576+
.bytesConf(ByteUnit.BYTE)
577+
.createWithDefaultString("8k")
578+
573579
private[spark] val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners")
574580
.doc("Class names of listeners to add to SparkContext during initialization.")
575581
.stringConf

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,13 +356,15 @@ private[spark] object JettyUtils extends Logging {
356356

357357
(connector, connector.getLocalPort())
358358
}
359+
val httpConfig = new HttpConfiguration()
360+
httpConfig.setRequestHeaderSize(conf.get(UI_REQUEST_HEADER_SIZE).toInt)
359361

360362
// If SSL is configured, create the secure connector first.
361363
val securePort = sslOptions.createJettySslContextFactory().map { factory =>
362364
val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0)
363365
val secureServerName = if (serverName.nonEmpty) s"$serverName (HTTPS)" else serverName
364366
val connectionFactories = AbstractConnectionFactory.getFactories(factory,
365-
new HttpConnectionFactory())
367+
new HttpConnectionFactory(httpConfig))
366368

367369
def sslConnect(currentPort: Int): (ServerConnector, Int) = {
368370
newConnector(connectionFactories, currentPort)
@@ -377,7 +379,7 @@ private[spark] object JettyUtils extends Logging {
377379

378380
// Bind the HTTP port.
379381
def httpConnect(currentPort: Int): (ServerConnector, Int) = {
380-
newConnector(Array(new HttpConnectionFactory()), currentPort)
382+
newConnector(Array(new HttpConnectionFactory(httpConfig)), currentPort)
381383
}
382384

383385
val (httpConnector, httpPort) = Utils.startServiceOnPort[ServerConnector](port, httpConnect,

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -843,7 +843,7 @@ private[ui] class TaskPagedTable(
843843
</div>
844844
</td>
845845
<td>{UIUtils.formatDate(task.launchTime)}</td>
846-
<td>{formatDuration(task.duration)}</td>
846+
<td>{formatDuration(task.taskMetrics.map(_.executorRunTime))}</td>
847847
<td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
848848
{UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))}
849849
</td>
@@ -996,7 +996,9 @@ private[ui] object ApiHelper {
996996
HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR,
997997
HEADER_HOST -> TaskIndexNames.HOST,
998998
HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME,
999-
HEADER_DURATION -> TaskIndexNames.DURATION,
999+
// SPARK-26109: Duration of task as executorRunTime to make it consistent with the
1000+
// aggregated tasks summary metrics table and the previous versions of Spark.
1001+
HEADER_DURATION -> TaskIndexNames.EXEC_RUN_TIME,
10001002
HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY,
10011003
HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME,
10021004
HEADER_GC_TIME -> TaskIndexNames.GC_TIME,

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import java.security.SecureRandom
3131
import java.util.{Locale, Properties, Random, UUID}
3232
import java.util.concurrent._
3333
import java.util.concurrent.TimeUnit.NANOSECONDS
34-
import java.util.concurrent.atomic.AtomicBoolean
3534
import java.util.zip.GZIPInputStream
3635

3736
import scala.annotation.tailrec
@@ -93,53 +92,6 @@ private[spark] object Utils extends Logging {
9392
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
9493
@volatile private var localRootDirs: Array[String] = null
9594

96-
/**
97-
* The performance overhead of creating and logging strings for wide schemas can be large. To
98-
* limit the impact, we bound the number of fields to include by default. This can be overridden
99-
* by setting the 'spark.debug.maxToStringFields' conf in SparkEnv.
100-
*/
101-
val DEFAULT_MAX_TO_STRING_FIELDS = 25
102-
103-
private[spark] def maxNumToStringFields = {
104-
if (SparkEnv.get != null) {
105-
SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
106-
} else {
107-
DEFAULT_MAX_TO_STRING_FIELDS
108-
}
109-
}
110-
111-
/** Whether we have warned about plan string truncation yet. */
112-
private val truncationWarningPrinted = new AtomicBoolean(false)
113-
114-
/**
115-
* Format a sequence with semantics similar to calling .mkString(). Any elements beyond
116-
* maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder.
117-
*
118-
* @return the trimmed and formatted string.
119-
*/
120-
def truncatedString[T](
121-
seq: Seq[T],
122-
start: String,
123-
sep: String,
124-
end: String,
125-
maxNumFields: Int = maxNumToStringFields): String = {
126-
if (seq.length > maxNumFields) {
127-
if (truncationWarningPrinted.compareAndSet(false, true)) {
128-
logWarning(
129-
"Truncated the string representation of a plan since it was too large. This " +
130-
"behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.")
131-
}
132-
val numFields = math.max(0, maxNumFields - 1)
133-
seq.take(numFields).mkString(
134-
start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
135-
} else {
136-
seq.mkString(start, sep, end)
137-
}
138-
}
139-
140-
/** Shorthand for calling truncatedString() without start or end strings. */
141-
def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")
142-
14395
/** Serialize an object using Java serialization */
14496
def serialize[T](o: T): Array[Byte] = {
14597
val bos = new ByteArrayOutputStream()

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,6 @@ import org.apache.spark.scheduler.SparkListener
4545

4646
class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
4747

48-
test("truncatedString") {
49-
assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]")
50-
assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
51-
assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
52-
assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
53-
assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
54-
}
55-
5648
test("timeConversion") {
5749
// Test -1
5850
assert(Utils.timeStringAsSeconds("-1") === -1)

0 commit comments

Comments
 (0)