Skip to content

Commit abef84a

Browse files
mccheahMarcelo Vanzin
authored andcommitted
[SPARK-28209][CORE][SHUFFLE] Proposed new shuffle writer API
## What changes were proposed in this pull request? As part of the shuffle storage API proposed in SPARK-25299, this introduces an API for persisting shuffle data in arbitrary storage systems. This patch introduces several concepts: * `ShuffleDataIO`, which is the root of the entire plugin tree that will be proposed over the course of the shuffle API project. * `ShuffleExecutorComponents` - the subset of plugins for managing shuffle-related components for each executor. This will in turn instantiate shuffle readers and writers. * `ShuffleMapOutputWriter` interface - instantiated once per map task. This provides child `ShufflePartitionWriter` instances for persisting the bytes for each partition in the map task. The default implementation of these plugins exactly mirror what was done by the existing shuffle writing code - namely, writing the data to local disk and writing an index file. We leverage the APIs in the `BypassMergeSortShuffleWriter` only. Follow-up PRs will use the APIs in `SortShuffleWriter` and `UnsafeShuffleWriter`, but are left as future work to minimize the review surface area. ## How was this patch tested? New unit tests were added. Micro-benchmarks indicate there's no slowdown in the affected code paths. Closes #25007 from mccheah/spark-shuffle-writer-refactor. Lead-authored-by: mcheah <[email protected]> Co-authored-by: mccheah <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 121f933 commit abef84a

15 files changed

+1087
-147
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import org.apache.spark.annotation.Private;
21+
22+
/**
23+
* :: Private ::
24+
* An interface for plugging in modules for storing and reading temporary shuffle data.
25+
* <p>
26+
* This is the root of a plugin system for storing shuffle bytes to arbitrary storage
27+
* backends in the sort-based shuffle algorithm implemented by the
28+
* {@link org.apache.spark.shuffle.sort.SortShuffleManager}. If another shuffle algorithm is
29+
* needed instead of sort-based shuffle, one should implement
30+
* {@link org.apache.spark.shuffle.ShuffleManager} instead.
31+
* <p>
32+
* A single instance of this module is loaded per process in the Spark application.
33+
* The default implementation reads and writes shuffle data from the local disks of
34+
* the executor, and is the implementation of shuffle file storage that has remained
35+
* consistent throughout most of Spark's history.
36+
* <p>
37+
* Alternative implementations of shuffle data storage can be loaded via setting
38+
* <code>spark.shuffle.sort.io.plugin.class</code>.
39+
* @since 3.0.0
40+
*/
41+
@Private
42+
public interface ShuffleDataIO {
43+
44+
/**
45+
* Called once on executor processes to bootstrap the shuffle data storage modules that
46+
* are only invoked on the executors.
47+
*/
48+
ShuffleExecutorComponents executor();
49+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.spark.annotation.Private;
23+
24+
/**
25+
* :: Private ::
26+
* An interface for building shuffle support for Executors.
27+
*
28+
* @since 3.0.0
29+
*/
30+
@Private
31+
public interface ShuffleExecutorComponents {
32+
33+
/**
34+
* Called once per executor to bootstrap this module with state that is specific to
35+
* that executor, specifically the application ID and executor ID.
36+
*/
37+
void initializeExecutor(String appId, String execId);
38+
39+
/**
40+
* Called once per map task to create a writer that will be responsible for persisting all the
41+
* partitioned bytes written by that map task.
42+
* @param shuffleId Unique identifier for the shuffle the map task is a part of
43+
* @param mapId Within the shuffle, the identifier of the map task
44+
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
45+
* with the same (shuffleId, mapId) pair can be distinguished by the
46+
* different values of mapTaskAttemptId.
47+
* @param numPartitions The number of partitions that will be written by the map task. Some of
48+
* these partitions may be empty.
49+
*/
50+
ShuffleMapOutputWriter createMapOutputWriter(
51+
int shuffleId,
52+
int mapId,
53+
long mapTaskAttemptId,
54+
int numPartitions) throws IOException;
55+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.IOException;
21+
22+
import org.apache.spark.annotation.Private;
23+
24+
/**
25+
* :: Private ::
26+
* A top-level writer that returns child writers for persisting the output of a map task,
27+
* and then commits all of the writes as one atomic operation.
28+
*
29+
* @since 3.0.0
30+
*/
31+
@Private
32+
public interface ShuffleMapOutputWriter {
33+
34+
/**
35+
* Creates a writer that can open an output stream to persist bytes targeted for a given reduce
36+
* partition id.
37+
* <p>
38+
* The chunk corresponds to bytes in the given reduce partition. This will not be called twice
39+
* for the same partition within any given map task. The partition identifier will be in the
40+
* range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was
41+
* provided upon the creation of this map output writer via
42+
* {@link ShuffleExecutorComponents#createMapOutputWriter(int, int, long, int)}.
43+
* <p>
44+
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
45+
* call to this method will be called with a reducePartitionId that is strictly greater than
46+
* the reducePartitionIds given to any previous call to this method. This method is not
47+
* guaranteed to be called for every partition id in the above described range. In particular,
48+
* no guarantees are made as to whether or not this method will be called for empty partitions.
49+
*/
50+
ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws IOException;
51+
52+
/**
53+
* Commits the writes done by all partition writers returned by all calls to this object's
54+
* {@link #getPartitionWriter(int)}.
55+
* <p>
56+
* This should ensure that the writes conducted by this module's partition writers are
57+
* available to downstream reduce tasks. If this method throws any exception, this module's
58+
* {@link #abort(Throwable)} method will be invoked before propagating the exception.
59+
* <p>
60+
* This can also close any resources and clean up temporary state if necessary.
61+
*/
62+
void commitAllPartitions() throws IOException;
63+
64+
/**
65+
* Abort all of the writes done by any writers returned by {@link #getPartitionWriter(int)}.
66+
* <p>
67+
* This should invalidate the results of writing bytes. This can also close any resources and
68+
* clean up temporary state if necessary.
69+
*/
70+
void abort(Throwable error) throws IOException;
71+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.IOException;
21+
import java.util.Optional;
22+
import java.io.OutputStream;
23+
24+
import org.apache.spark.annotation.Private;
25+
26+
/**
27+
* :: Private ::
28+
* An interface for opening streams to persist partition bytes to a backing data store.
29+
* <p>
30+
* This writer stores bytes for one (mapper, reducer) pair, corresponding to one shuffle
31+
* block.
32+
*
33+
* @since 3.0.0
34+
*/
35+
@Private
36+
public interface ShufflePartitionWriter {
37+
38+
/**
39+
* Open and return an {@link OutputStream} that can write bytes to the underlying
40+
* data store.
41+
* <p>
42+
* This method will only be called once on this partition writer in the map task, to write the
43+
* bytes to the partition. The output stream will only be used to write the bytes for this
44+
* partition. The map task closes this output stream upon writing all the bytes for this
45+
* block, or if the write fails for any reason.
46+
* <p>
47+
* Implementations that intend on combining the bytes for all the partitions written by this
48+
* map task should reuse the same OutputStream instance across all the partition writers provided
49+
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
50+
* {@link OutputStream#close()} does not close the resource, since it will be reused across
51+
* partition writes. The underlying resources should be cleaned up in
52+
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
53+
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
54+
*/
55+
OutputStream openStream() throws IOException;
56+
57+
/**
58+
* Opens and returns a {@link WritableByteChannelWrapper} for transferring bytes from
59+
* input byte channels to the underlying shuffle data store.
60+
* <p>
61+
* This method will only be called once on this partition writer in the map task, to write the
62+
* bytes to the partition. The channel will only be used to write the bytes for this
63+
* partition. The map task closes this channel upon writing all the bytes for this
64+
* block, or if the write fails for any reason.
65+
* <p>
66+
* Implementations that intend on combining the bytes for all the partitions written by this
67+
* map task should reuse the same channel instance across all the partition writers provided
68+
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
69+
* {@link WritableByteChannelWrapper#close()} does not close the resource, since the channel
70+
* will be reused across partition writes. The underlying resources should be cleaned up in
71+
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
72+
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
73+
* <p>
74+
* This method is primarily for advanced optimizations where bytes can be copied from the input
75+
* spill files to the output channel without copying data into memory. If such optimizations are
76+
* not supported, the implementation should return {@link Optional#empty()}. By default, the
77+
* implementation returns {@link Optional#empty()}.
78+
* <p>
79+
* Note that the returned {@link WritableByteChannelWrapper} itself is closed, but not the
80+
* underlying channel that is returned by {@link WritableByteChannelWrapper#channel()}. Ensure
81+
* that the underlying channel is cleaned up in {@link WritableByteChannelWrapper#close()},
82+
* {@link ShuffleMapOutputWriter#commitAllPartitions()}, or
83+
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
84+
*/
85+
default Optional<WritableByteChannelWrapper> openChannelWrapper() throws IOException {
86+
return Optional.empty();
87+
}
88+
89+
/**
90+
* Returns the number of bytes written either by this writer's output stream opened by
91+
* {@link #openStream()} or the byte channel opened by {@link #openChannelWrapper()}.
92+
* <p>
93+
* This can be different from the number of bytes given by the caller. For example, the
94+
* stream might compress or encrypt the bytes before persisting the data to the backing
95+
* data store.
96+
*/
97+
long getNumBytesWritten();
98+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.api;
19+
20+
import java.io.Closeable;
21+
import java.nio.channels.WritableByteChannel;
22+
23+
import org.apache.spark.annotation.Private;
24+
25+
/**
26+
* :: Private ::
27+
*
28+
* A thin wrapper around a {@link WritableByteChannel}.
29+
* <p>
30+
* This is primarily provided for the local disk shuffle implementation to provide a
31+
* {@link java.nio.channels.FileChannel} that keeps the channel open across partition writes.
32+
*
33+
* @since 3.0.0
34+
*/
35+
@Private
36+
public interface WritableByteChannelWrapper extends Closeable {
37+
38+
/**
39+
* The underlying channel to write bytes into.
40+
*/
41+
WritableByteChannel channel();
42+
}

0 commit comments

Comments
 (0)