Skip to content

Commit 0d1a4f0

Browse files
authored
Merge branch 'master' into decimal-parsing-locale
2 parents 5236336 + 6a064ba commit 0d1a4f0

148 files changed

Lines changed: 4738 additions & 2168 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ exportMethods("arrange",
169169
"toJSON",
170170
"transform",
171171
"union",
172+
"unionAll",
172173
"unionByName",
173174
"unique",
174175
"unpersist",

R/pkg/R/DataFrame.R

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,6 +2732,20 @@ setMethod("union",
27322732
dataFrame(unioned)
27332733
})
27342734

2735+
#' Return a new SparkDataFrame containing the union of rows
2736+
#'
2737+
#' This is an alias for `union`.
2738+
#'
2739+
#' @rdname union
2740+
#' @name unionAll
2741+
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
2742+
#' @note unionAll since 1.4.0
2743+
setMethod("unionAll",
2744+
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2745+
function(x, y) {
2746+
union(x, y)
2747+
})
2748+
27352749
#' Return a new SparkDataFrame containing the union of rows, matched by column names
27362750
#'
27372751
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame

R/pkg/R/generics.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,9 @@ setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
631631
#' @rdname union
632632
setGeneric("union", function(x, y) { standardGeneric("union") })
633633

634+
#' @rdname union
635+
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
636+
634637
#' @rdname unionByName
635638
setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })
636639

R/pkg/R/stats.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ setMethod("corr",
109109
#'
110110
#' Finding frequent items for columns, possibly with false positives.
111111
#' Using the frequent element count algorithm described in
112-
#' \url{http://dx.doi.org/10.1145/762471.762473}, proposed by Karp, Schenker, and Papadimitriou.
112+
#' \url{https://doi.org/10.1145/762471.762473}, proposed by Karp, Schenker, and Papadimitriou.
113113
#'
114114
#' @param x A SparkDataFrame.
115115
#' @param cols A vector column names to search frequent items in.
@@ -143,7 +143,7 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"),
143143
#' *exact* rank of x is close to (p * N). More precisely,
144144
#' floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
145145
#' This method implements a variation of the Greenwald-Khanna algorithm (with some speed
146-
#' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
146+
#' optimizations). The algorithm was first present in [[https://doi.org/10.1145/375663.375670
147147
#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
148148
#' Note that NA values will be ignored in numerical columns before calculation. For
149149
#' columns only containing NA values, an empty list is returned.

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2458,6 +2458,7 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
24582458
expect_equal(count(unioned), 6)
24592459
expect_equal(first(unioned)$name, "Michael")
24602460
expect_equal(count(arrange(suppressWarnings(union(df, df2)), df$age)), 6)
2461+
expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
24612462

24622463
df1 <- select(df2, "age", "name")
24632464
unioned1 <- arrange(unionByName(df1, df), df1$age)

common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -371,26 +371,30 @@ private void assertErrorsContain(Set<String> errors, Set<String> contains) {
371371

372372
private void assertErrorAndClosed(RpcResult result, String expectedError) {
373373
assertTrue("unexpected success: " + result.successMessages, result.successMessages.isEmpty());
374-
// we expect 1 additional error, which should contain one of the follow messages:
375-
// - "closed"
376-
// - "Connection reset"
377-
// - "java.nio.channels.ClosedChannelException"
378374
Set<String> errors = result.errorMessages;
379375
assertEquals("Expected 2 errors, got " + errors.size() + "errors: " +
380376
errors, 2, errors.size());
381377

378+
// We expect 1 additional error due to closed connection and here are possible keywords in the
379+
// error message.
380+
Set<String> possibleClosedErrors = Sets.newHashSet(
381+
"closed",
382+
"Connection reset",
383+
"java.nio.channels.ClosedChannelException",
384+
"java.io.IOException: Broken pipe"
385+
);
382386
Set<String> containsAndClosed = Sets.newHashSet(expectedError);
383-
containsAndClosed.add("closed");
384-
containsAndClosed.add("Connection reset");
385-
containsAndClosed.add("java.nio.channels.ClosedChannelException");
387+
containsAndClosed.addAll(possibleClosedErrors);
386388

387389
Pair<Set<String>, Set<String>> r = checkErrorsContain(errors, containsAndClosed);
388390

389391
assertTrue("Got a non-empty set " + r.getLeft(), r.getLeft().isEmpty());
390392

391393
Set<String> errorsNotFound = r.getRight();
392394
assertEquals(
393-
"The size of " + errorsNotFound.toString() + " was not 2", 2, errorsNotFound.size());
395+
"The size of " + errorsNotFound + " was not " + (possibleClosedErrors.size() - 1),
396+
possibleClosedErrors.size() - 1,
397+
errorsNotFound.size());
394398
for (String err: errorsNotFound) {
395399
assertTrue("Found a wrong error " + err, containsAndClosed.contains(err));
396400
}

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ public void spill() throws IOException {
8383
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
8484

8585
/**
86-
* Allocates a LongArray of `size`. Note that this method may throw `OutOfMemoryError` if Spark
87-
* doesn't have enough memory for this allocation, or throw `TooLargePageException` if this
88-
* `LongArray` is too large to fit in a single page. The caller side should take care of these
89-
* two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
86+
* Allocates a LongArray of `size`. Note that this method may throw `SparkOutOfMemoryError`
87+
* if Spark doesn't have enough memory for this allocation, or throw `TooLargePageException`
88+
* if this `LongArray` is too large to fit in a single page. The caller side should take care of
89+
* these two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
9090
*
9191
* @throws SparkOutOfMemoryError
9292
* @throws TooLargePageException
@@ -111,7 +111,7 @@ public void freeArray(LongArray array) {
111111
/**
112112
* Allocate a memory block with at least `required` bytes.
113113
*
114-
* @throws OutOfMemoryError
114+
* @throws SparkOutOfMemoryError
115115
*/
116116
protected MemoryBlock allocatePage(long required) {
117117
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,11 @@
3737
import org.apache.spark.Partitioner;
3838
import org.apache.spark.ShuffleDependency;
3939
import org.apache.spark.SparkConf;
40-
import org.apache.spark.TaskContext;
41-
import org.apache.spark.executor.ShuffleWriteMetrics;
4240
import org.apache.spark.scheduler.MapStatus;
4341
import org.apache.spark.scheduler.MapStatus$;
4442
import org.apache.spark.serializer.Serializer;
4543
import org.apache.spark.serializer.SerializerInstance;
44+
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
4645
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
4746
import org.apache.spark.shuffle.ShuffleWriter;
4847
import org.apache.spark.storage.*;
@@ -79,7 +78,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7978
private final int numPartitions;
8079
private final BlockManager blockManager;
8180
private final Partitioner partitioner;
82-
private final ShuffleWriteMetrics writeMetrics;
81+
private final ShuffleWriteMetricsReporter writeMetrics;
8382
private final int shuffleId;
8483
private final int mapId;
8584
private final Serializer serializer;
@@ -103,8 +102,8 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
103102
IndexShuffleBlockResolver shuffleBlockResolver,
104103
BypassMergeSortShuffleHandle<K, V> handle,
105104
int mapId,
106-
TaskContext taskContext,
107-
SparkConf conf) {
105+
SparkConf conf,
106+
ShuffleWriteMetricsReporter writeMetrics) {
108107
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
109108
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
110109
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
@@ -114,7 +113,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
114113
this.shuffleId = dep.shuffleId();
115114
this.partitioner = dep.partitioner();
116115
this.numPartitions = partitioner.numPartitions();
117-
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
116+
this.writeMetrics = writeMetrics;
118117
this.serializer = dep.serializer();
119118
this.shuffleBlockResolver = shuffleBlockResolver;
120119
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.spark.memory.TooLargePageException;
3939
import org.apache.spark.serializer.DummySerializerInstance;
4040
import org.apache.spark.serializer.SerializerInstance;
41+
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
4142
import org.apache.spark.storage.BlockManager;
4243
import org.apache.spark.storage.DiskBlockObjectWriter;
4344
import org.apache.spark.storage.FileSegment;
@@ -75,7 +76,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
7576
private final TaskMemoryManager taskMemoryManager;
7677
private final BlockManager blockManager;
7778
private final TaskContext taskContext;
78-
private final ShuffleWriteMetrics writeMetrics;
79+
private final ShuffleWriteMetricsReporter writeMetrics;
7980

8081
/**
8182
* Force this sorter to spill when there are this many elements in memory.
@@ -113,7 +114,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
113114
int initialSize,
114115
int numPartitions,
115116
SparkConf conf,
116-
ShuffleWriteMetrics writeMetrics) {
117+
ShuffleWriteMetricsReporter writeMetrics) {
117118
super(memoryManager,
118119
(int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
119120
memoryManager.getTungstenMemoryMode());
@@ -144,7 +145,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
144145
*/
145146
private void writeSortedFile(boolean isLastFile) {
146147

147-
final ShuffleWriteMetrics writeMetricsToUse;
148+
final ShuffleWriteMetricsReporter writeMetricsToUse;
148149

149150
if (isLastFile) {
150151
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
@@ -241,9 +242,14 @@ private void writeSortedFile(boolean isLastFile) {
241242
//
242243
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
243244
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
244-
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
245-
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
246-
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
245+
// SPARK-3577 tracks the spill time separately.
246+
247+
// This is guaranteed to be a ShuffleWriteMetrics based on the if check in the beginning
248+
// of this method.
249+
writeMetrics.incRecordsWritten(
250+
((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
251+
taskContext.taskMetrics().incDiskBytesSpilled(
252+
((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
247253
}
248254
}
249255

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737

3838
import org.apache.spark.*;
3939
import org.apache.spark.annotation.Private;
40-
import org.apache.spark.executor.ShuffleWriteMetrics;
4140
import org.apache.spark.io.CompressionCodec;
4241
import org.apache.spark.io.CompressionCodec$;
4342
import org.apache.spark.io.NioBufferedFileInputStream;
@@ -47,6 +46,7 @@
4746
import org.apache.spark.network.util.LimitedInputStream;
4847
import org.apache.spark.scheduler.MapStatus;
4948
import org.apache.spark.scheduler.MapStatus$;
49+
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
5050
import org.apache.spark.serializer.SerializationStream;
5151
import org.apache.spark.serializer.SerializerInstance;
5252
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
@@ -73,7 +73,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
7373
private final TaskMemoryManager memoryManager;
7474
private final SerializerInstance serializer;
7575
private final Partitioner partitioner;
76-
private final ShuffleWriteMetrics writeMetrics;
76+
private final ShuffleWriteMetricsReporter writeMetrics;
7777
private final int shuffleId;
7878
private final int mapId;
7979
private final TaskContext taskContext;
@@ -122,7 +122,8 @@ public UnsafeShuffleWriter(
122122
SerializedShuffleHandle<K, V> handle,
123123
int mapId,
124124
TaskContext taskContext,
125-
SparkConf sparkConf) throws IOException {
125+
SparkConf sparkConf,
126+
ShuffleWriteMetricsReporter writeMetrics) throws IOException {
126127
final int numPartitions = handle.dependency().partitioner().numPartitions();
127128
if (numPartitions > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) {
128129
throw new IllegalArgumentException(
@@ -138,7 +139,7 @@ public UnsafeShuffleWriter(
138139
this.shuffleId = dep.shuffleId();
139140
this.serializer = dep.serializer().newInstance();
140141
this.partitioner = dep.partitioner();
141-
this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
142+
this.writeMetrics = writeMetrics;
142143
this.taskContext = taskContext;
143144
this.sparkConf = sparkConf;
144145
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);

0 commit comments

Comments
 (0)