Skip to content

Commit 588bd75

Browse files
committed
Merge pull request #165 from markhamstra/csd-1.6
Merged Apache bug fixes
2 parents 36bcbeb + 5e8618b commit 588bd75

35 files changed

Lines changed: 414 additions & 168 deletions

File tree

build/mvn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ install_mvn() {
7272
local MVN_VERSION="3.3.3"
7373

7474
install_app \
75-
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
75+
"https://www.apache.org/dyn/closer.lua?action=download&filename=/maven/maven-3/${MVN_VERSION}/binaries" \
7676
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
7777
"apache-maven-${MVN_VERSION}/bin/mvn"
7878

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
215215
}
216216
}
217217

218-
inMemSorter.reset();
219-
220218
if (!isLastFile) { // i.e. this is a spill file
221219
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
222220
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
@@ -255,6 +253,10 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
255253

256254
writeSortedFile(false);
257255
final long spillSize = freeMemory();
256+
inMemSorter.reset();
257+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
258+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
259+
// we might not be able to get memory for the pointer array.
258260
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
259261
return spillSize;
260262
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
4949
*/
5050
private int pos = 0;
5151

52+
private int initialSize;
53+
5254
public ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
5355
this.consumer = consumer;
5456
assert (initialSize > 0);
57+
this.initialSize = initialSize;
5558
this.array = consumer.allocateArray(initialSize);
5659
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
5760
}
@@ -68,6 +71,10 @@ public int numRecords() {
6871
}
6972

7073
public void reset() {
74+
if (consumer != null) {
75+
consumer.freeArray(array);
76+
this.array = consumer.allocateArray(initialSize);
77+
}
7178
pos = 0;
7279
}
7380

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,17 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
192192
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
193193
}
194194
spillWriter.close();
195-
196-
inMemSorter.reset();
197195
}
198196

199197
final long spillSize = freeMemory();
200198
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in
201199
// pages will currently be counted as memory spilled even though that space isn't actually
202200
// written to disk. This also counts the space needed to store the sorter's pointer array.
201+
inMemSorter.reset();
202+
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
203+
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
204+
// we might not be able to get memory for the pointer array.
205+
203206
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
204207

205208
return spillSize;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
8080
*/
8181
private int pos = 0;
8282

83+
private long initialSize;
84+
8385
public UnsafeInMemorySorter(
8486
final MemoryConsumer consumer,
8587
final TaskMemoryManager memoryManager,
@@ -98,6 +100,7 @@ public UnsafeInMemorySorter(
98100
LongArray array) {
99101
this.consumer = consumer;
100102
this.memoryManager = memoryManager;
103+
this.initialSize = array.size();
101104
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
102105
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
103106
this.array = array;
@@ -114,6 +117,10 @@ public void free() {
114117
}
115118

116119
public void reset() {
120+
if (consumer != null) {
121+
consumer.freeArray(array);
122+
this.array = consumer.allocateArray(initialSize);
123+
}
117124
pos = 0;
118125
}
119126

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,21 +106,22 @@ pre {
106106
line-height: 18px;
107107
padding: 6px;
108108
margin: 0;
109+
word-break: break-word;
109110
border-radius: 3px;
110111
}
111112

112113
.stage-details {
113-
max-height: 100px;
114114
overflow-y: auto;
115115
margin: 0;
116+
display: block;
116117
transition: max-height 0.25s ease-out, padding 0.25s ease-out;
117118
}
118119

119120
.stage-details.collapsed {
120-
max-height: 0;
121121
padding-top: 0;
122122
padding-bottom: 0;
123123
border: none;
124+
display: none;
124125
}
125126

126127
.description-input {
@@ -143,14 +144,15 @@ pre {
143144
max-height: 300px;
144145
overflow-y: auto;
145146
margin: 0;
147+
display: block;
146148
transition: max-height 0.25s ease-out, padding 0.25s ease-out;
147149
}
148150

149151
.stacktrace-details.collapsed {
150-
max-height: 0;
151152
padding-top: 0;
152153
padding-bottom: 0;
153154
border: none;
155+
display: none;
154156
}
155157

156158
span.expand-additional-metrics, span.expand-dag-viz {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ private[spark] class Executor(
287287
logInfo(s"Executor killed $taskName (TID $taskId)")
288288
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
289289

290-
case cDE: CommitDeniedException =>
290+
case CausedBy(cDE: CommitDeniedException) =>
291291
val reason = cDE.toTaskEndReason
292292
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
293293

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,8 @@ object SparkHadoopMapRedUtil extends Logging {
8181
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
8282
* details).
8383
*
84-
* Output commit coordinator is only contacted when the following two configurations are both set
85-
* to `true`:
86-
*
87-
* - `spark.speculation`
88-
* - `spark.hadoop.outputCommitCoordination.enabled`
84+
* Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
85+
* is set to true (which is the default).
8986
*/
9087
def commitTask(
9188
committer: MapReduceOutputCommitter,
@@ -112,11 +109,10 @@ object SparkHadoopMapRedUtil extends Logging {
112109
if (committer.needsTaskCommit(mrTaskContext)) {
113110
val shouldCoordinateWithDriver: Boolean = {
114111
val sparkConf = SparkEnv.get.conf
115-
// We only need to coordinate with the driver if there are multiple concurrent task
116-
// attempts, which should only occur if speculation is enabled
117-
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
118-
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
119-
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
112+
// We only need to coordinate with the driver if there are concurrent task attempts.
113+
// Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
114+
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
115+
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
120116
}
121117

122118
if (shouldCoordinateWithDriver) {

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,9 +1117,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11171117
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
11181118
recordsWritten += 1
11191119
}
1120-
} {
1121-
writer.close(hadoopContext)
1122-
}
1120+
}(finallyBlock = writer.close(hadoopContext))
11231121
committer.commitTask(hadoopContext)
11241122
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
11251123
outputMetrics.setRecordsWritten(recordsWritten)
@@ -1203,9 +1201,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
12031201
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
12041202
recordsWritten += 1
12051203
}
1206-
} {
1207-
writer.close()
1208-
}
1204+
}(finallyBlock = writer.close())
12091205
writer.commit()
12101206
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
12111207
outputMetrics.setRecordsWritten(recordsWritten)

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,16 @@ private[spark] abstract class Task[T](
8787
}
8888
try {
8989
(runTask(context), context.collectAccumulators())
90-
} catch { case e: Throwable =>
91-
// Catch all errors; run task failure callbacks, and rethrow the exception.
92-
context.markTaskFailed(e)
93-
throw e
90+
} catch {
91+
case e: Throwable =>
92+
// Catch all errors; run task failure callbacks, and rethrow the exception.
93+
try {
94+
context.markTaskFailed(e)
95+
} catch {
96+
case t: Throwable =>
97+
e.addSuppressed(t)
98+
}
99+
throw e
94100
} finally {
95101
// Call the task completion callbacks.
96102
context.markTaskCompleted()

0 commit comments

Comments
 (0)