Skip to content

Commit 5c7e871

Browse files
authored
Merge pull request #3 from apache/master
merge
2 parents 33a8b0e + 4e7a4cd commit 5c7e871

File tree

364 files changed

+21878
-3471
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

364 files changed

+21878
-3471
lines changed

R/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export R_HOME=/home/username/R
2020
Build Spark with [Maven](https://spark.apache.org/docs/latest/building-spark.html#buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
2121

2222
```bash
23-
build/mvn -DskipTests -Psparkr package
23+
./build/mvn -DskipTests -Psparkr package
2424
```
2525

2626
#### Running sparkR

R/pkg/R/functions.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2741,7 +2741,7 @@ setMethod("format_string", signature(format = "character", x = "Column"),
27412741
#' head(tmp)}
27422742
#' @note from_unixtime since 1.5.0
27432743
setMethod("from_unixtime", signature(x = "Column"),
2744-
function(x, format = "yyyy-MM-dd HH:mm:ss") {
2744+
function(x, format = "uuuu-MM-dd HH:mm:ss") {
27452745
jc <- callJStatic("org.apache.spark.sql.functions",
27462746
"from_unixtime",
27472747
x@jc, format)
@@ -3029,7 +3029,7 @@ setMethod("unix_timestamp", signature(x = "Column", format = "missing"),
30293029
#' @aliases unix_timestamp,Column,character-method
30303030
#' @note unix_timestamp(Column, character) since 1.5.0
30313031
setMethod("unix_timestamp", signature(x = "Column", format = "character"),
3032-
function(x, format = "yyyy-MM-dd HH:mm:ss") {
3032+
function(x, format = "uuuu-MM-dd HH:mm:ss") {
30333033
jc <- callJStatic("org.apache.spark.sql.functions", "unix_timestamp", x@jc, format)
30343034
column(jc)
30353035
})

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ This README file only contains basic setup instructions.
2525
Spark is built using [Apache Maven](https://maven.apache.org/).
2626
To build Spark and its example programs, run:
2727

28-
build/mvn -DskipTests clean package
28+
./build/mvn -DskipTests clean package
2929

3030
(You do not need to do this if you downloaded a pre-built package.)
3131

bin/spark-class

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,27 @@ fi
6868
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
6969
# command array and checks the value to see if the launcher succeeded.
7070
build_command() {
71-
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
71+
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
7272
printf "%d\0" $?
7373
}
7474

7575
# Turn off posix mode since it does not allow process substitution
7676
set +o posix
7777
CMD=()
78-
while IFS= read -d '' -r ARG; do
79-
CMD+=("$ARG")
78+
DELIM=$'\n'
79+
CMD_START_FLAG="false"
80+
while IFS= read -d "$DELIM" -r ARG; do
81+
if [ "$CMD_START_FLAG" == "true" ]; then
82+
CMD+=("$ARG")
83+
else
84+
if [ "$ARG" == $'\0' ]; then
85+
# After NULL character is consumed, change the delimiter and consume command string.
86+
DELIM=''
87+
CMD_START_FLAG="true"
88+
elif [ "$ARG" != "" ]; then
89+
echo "$ARG"
90+
fi
91+
fi
8092
done < <(build_command "$@")
8193

8294
COUNT=${#CMD[@]}

conf/spark-env.sh.template

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons
5757
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
5858

59+
# Options for launcher
60+
# - SPARK_LAUNCHER_OPTS, to set config properties and Java options for the launcher (e.g. "-Dx=y")
61+
5962
# Generic options for the daemons used in the standalone deploy mode
6063
# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
6164
# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@
172172
<groupId>org.apache.commons</groupId>
173173
<artifactId>commons-math3</artifactId>
174174
</dependency>
175+
<dependency>
176+
<groupId>org.apache.commons</groupId>
177+
<artifactId>commons-text</artifactId>
178+
</dependency>
175179
<dependency>
176180
<groupId>com.google.code.findbugs</groupId>
177181
<artifactId>jsr305</artifactId>
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import org.apache.spark.annotation.Private;
21+
22+
/**
23+
* :: Private ::
24+
* An interface for plugging in modules for storing and reading temporary shuffle data.
25+
* <p>
26+
* This is the root of a plugin system for storing shuffle bytes to arbitrary storage
27+
* backends in the sort-based shuffle algorithm implemented by the
28+
* {@link org.apache.spark.shuffle.sort.SortShuffleManager}. If another shuffle algorithm is
29+
* needed instead of sort-based shuffle, one should implement
30+
* {@link org.apache.spark.shuffle.ShuffleManager} instead.
31+
* <p>
32+
* A single instance of this module is loaded per process in the Spark application.
33+
* The default implementation reads and writes shuffle data from the local disks of
34+
* the executor, and is the implementation of shuffle file storage that has remained
35+
* consistent throughout most of Spark's history.
36+
* <p>
37+
* Alternative implementations of shuffle data storage can be loaded via setting
38+
* <code>spark.shuffle.sort.io.plugin.class</code>.
39+
* @since 3.0.0
40+
*/
41+
@Private
42+
public interface ShuffleDataIO {
43+
44+
/**
45+
* Called once on executor processes to bootstrap the shuffle data storage modules that
46+
* are only invoked on the executors.
47+
*/
48+
ShuffleExecutorComponents executor();
49+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.spark.annotation.Private;
23+
24+
/**
25+
* :: Private ::
26+
* An interface for building shuffle support for Executors.
27+
*
28+
* @since 3.0.0
29+
*/
30+
@Private
31+
public interface ShuffleExecutorComponents {
32+
33+
/**
34+
* Called once per executor to bootstrap this module with state that is specific to
35+
* that executor, specifically the application ID and executor ID.
36+
*/
37+
void initializeExecutor(String appId, String execId);
38+
39+
/**
40+
* Called once per map task to create a writer that will be responsible for persisting all the
41+
* partitioned bytes written by that map task.
42+
* @param shuffleId Unique identifier for the shuffle the map task is a part of
43+
* @param mapId Within the shuffle, the identifier of the map task
44+
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
45+
* with the same (shuffleId, mapId) pair can be distinguished by the
46+
* different values of mapTaskAttemptId.
47+
* @param numPartitions The number of partitions that will be written by the map task. Some of
48+
* these partitions may be empty.
49+
*/
50+
ShuffleMapOutputWriter createMapOutputWriter(
51+
int shuffleId,
52+
int mapId,
53+
long mapTaskAttemptId,
54+
int numPartitions) throws IOException;
55+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.spark.annotation.Private;
23+
24+
/**
25+
* :: Private ::
26+
* A top-level writer that returns child writers for persisting the output of a map task,
27+
* and then commits all of the writes as one atomic operation.
28+
*
29+
* @since 3.0.0
30+
*/
31+
@Private
32+
public interface ShuffleMapOutputWriter {
33+
34+
/**
35+
* Creates a writer that can open an output stream to persist bytes targeted for a given reduce
36+
* partition id.
37+
* <p>
38+
* The chunk corresponds to bytes in the given reduce partition. This will not be called twice
39+
* for the same partition within any given map task. The partition identifier will be in the
40+
* range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was
41+
* provided upon the creation of this map output writer via
42+
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}.
43+
* <p>
44+
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
45+
* call to this method will be called with a reducePartitionId that is strictly greater than
46+
* the reducePartitionIds given to any previous call to this method. This method is not
47+
* guaranteed to be called for every partition id in the above described range. In particular,
48+
* no guarantees are made as to whether or not this method will be called for empty partitions.
49+
*/
50+
ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws IOException;
51+
52+
/**
53+
* Commits the writes done by all partition writers returned by all calls to this object's
54+
* {@link #getPartitionWriter(int)}.
55+
* <p>
56+
* This should ensure that the writes conducted by this module's partition writers are
57+
* available to downstream reduce tasks. If this method throws any exception, this module's
58+
* {@link #abort(Throwable)} method will be invoked before propagating the exception.
59+
* <p>
60+
* This can also close any resources and clean up temporary state if necessary.
61+
*/
62+
void commitAllPartitions() throws IOException;
63+
64+
/**
65+
* Abort all of the writes done by any writers returned by {@link #getPartitionWriter(int)}.
66+
* <p>
67+
* This should invalidate the results of writing bytes. This can also close any resources and
68+
* clean up temporary state if necessary.
69+
*/
70+
void abort(Throwable error) throws IOException;
71+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.IOException;
21+
import java.util.Optional;
22+
import java.io.OutputStream;
23+
24+
import org.apache.spark.annotation.Private;
25+
26+
/**
27+
* :: Private ::
28+
* An interface for opening streams to persist partition bytes to a backing data store.
29+
* <p>
30+
* This writer stores bytes for one (mapper, reducer) pair, corresponding to one shuffle
31+
* block.
32+
*
33+
* @since 3.0.0
34+
*/
35+
@Private
36+
public interface ShufflePartitionWriter {
37+
38+
/**
39+
* Open and return an {@link OutputStream} that can write bytes to the underlying
40+
* data store.
41+
* <p>
42+
* This method will only be called once on this partition writer in the map task, to write the
43+
* bytes to the partition. The output stream will only be used to write the bytes for this
44+
* partition. The map task closes this output stream upon writing all the bytes for this
45+
* block, or if the write fails for any reason.
46+
* <p>
47+
* Implementations that intend on combining the bytes for all the partitions written by this
48+
* map task should reuse the same OutputStream instance across all the partition writers provided
49+
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
50+
* {@link OutputStream#close()} does not close the resource, since it will be reused across
51+
* partition writes. The underlying resources should be cleaned up in
52+
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
53+
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
54+
*/
55+
OutputStream openStream() throws IOException;
56+
57+
/**
58+
* Opens and returns a {@link WritableByteChannelWrapper} for transferring bytes from
59+
* input byte channels to the underlying shuffle data store.
60+
* <p>
61+
* This method will only be called once on this partition writer in the map task, to write the
62+
* bytes to the partition. The channel will only be used to write the bytes for this
63+
* partition. The map task closes this channel upon writing all the bytes for this
64+
* block, or if the write fails for any reason.
65+
* <p>
66+
* Implementations that intend on combining the bytes for all the partitions written by this
67+
* map task should reuse the same channel instance across all the partition writers provided
68+
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
69+
* {@link WritableByteChannelWrapper#close()} does not close the resource, since the channel
70+
* will be reused across partition writes. The underlying resources should be cleaned up in
71+
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
72+
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
73+
* <p>
74+
* This method is primarily for advanced optimizations where bytes can be copied from the input
75+
* spill files to the output channel without copying data into memory. If such optimizations are
76+
* not supported, the implementation should return {@link Optional#empty()}. By default, the
77+
* implementation returns {@link Optional#empty()}.
78+
* <p>
79+
* Note that the returned {@link WritableByteChannelWrapper} itself is closed, but not the
80+
* underlying channel that is returned by {@link WritableByteChannelWrapper#channel()}. Ensure
81+
* that the underlying channel is cleaned up in {@link WritableByteChannelWrapper#close()},
82+
* {@link ShuffleMapOutputWriter#commitAllPartitions()}, or
83+
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
84+
*/
85+
default Optional<WritableByteChannelWrapper> openChannelWrapper() throws IOException {
86+
return Optional.empty();
87+
}
88+
89+
/**
90+
* Returns the number of bytes written either by this writer's output stream opened by
91+
* {@link #openStream()} or the byte channel opened by {@link #openChannelWrapper()}.
92+
* <p>
93+
* This can be different from the number of bytes given by the caller. For example, the
94+
* stream might compress or encrypt the bytes before persisting the data to the backing
95+
* data store.
96+
*/
97+
long getNumBytesWritten();
98+
}

0 commit comments

Comments
 (0)