Skip to content
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1957e82
[SPARK-25299] Introduce the new shuffle writer API (#5) (#520)
mccheah Mar 20, 2019
857552a
[SPARK-25299] Local shuffle implementation of the shuffle writer API …
mccheah Apr 3, 2019
d13037f
[SPARK-25299] Make UnsafeShuffleWriter use the new API (#536)
mccheah Apr 17, 2019
8f5fb60
[SPARK-25299] Use the shuffle writer plugin for the SortShuffleWriter…
mccheah Apr 15, 2019
e17c7ea
[SPARK-25299] Shuffle locations api (#517)
mccheah Apr 19, 2019
3f0c131
[SPARK-25299] Move shuffle writers back to being given specific parti…
mccheah Apr 19, 2019
f982df7
[SPARK-25299] Don't set map status twice in bypass merge sort shuffle…
mccheah Apr 19, 2019
6891197
[SPARK-25299] Propose a new NIO transfer API for partition writing. (…
mccheah May 24, 2019
7b44ed2
Remove shuffle location support.
mccheah Jun 27, 2019
df75f1f
Remove changes to UnsafeShuffleWriter
mccheah Jun 27, 2019
a8558af
Revert changes for SortShuffleWriter
mccheah Jun 27, 2019
806d7bb
Revert a bunch of other stuff
mccheah Jun 27, 2019
3167030
More reverts
mccheah Jun 27, 2019
70f59db
Set task contexts in failing test
mccheah Jun 28, 2019
3083d86
Fix style
mccheah Jun 28, 2019
4c3d692
Check for null on the block manager as well.
mccheah Jun 28, 2019
2421c92
Add task attempt id in the APIs
mccheah Jul 1, 2019
982f207
Address comments
mccheah Jul 8, 2019
594d1e2
Fix style
mccheah Jul 8, 2019
66aae91
Address comments.
mccheah Jul 12, 2019
8b432f9
Merge remote-tracking branch 'origin/master' into spark-shuffle-write…
mccheah Jul 17, 2019
9f597dd
Address comments.
mccheah Jul 18, 2019
86c1829
Restructure test
mccheah Jul 18, 2019
a7885ae
Add ShuffleWriteMetricsReporter to the createMapOutputWriter API.
mccheah Jul 19, 2019
9893c6c
Add more documentation
mccheah Jul 19, 2019
cd897e7
REfactor reading records from file in test
mccheah Jul 19, 2019
9f17b9b
Address comments
mccheah Jul 24, 2019
e53a001
Code tags
mccheah Jul 24, 2019
56fa450
Add some docs
mccheah Jul 24, 2019
b8b7b8d
Change mockito format in BypassMergeSortShuffleWriterSuite
mccheah Jul 25, 2019
2d29404
Remove metrics from the API.
mccheah Jul 29, 2019
06ea01a
Address more comments.
mccheah Jul 29, 2019
7dceec9
Args per line
mccheah Jul 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw it was recommended that this package be used instead of o.a.s.api. The problem is that org.apache.spark.shuffle is explicit removed from the documentation, and we want this to (eventually) be documented. So either need to go back to the old package, or tweak SparkBuild.scala to not filter this sub-package...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about conflicts with the other kinds of APIs in the org.apache.spark.api.* namespace, particularly because these are all related to other language bindings, e.g. org.apache.spark.api.java.function.Function, org.apache.spark.api.r.RRDD. Let's modify SparkBuild.scala instead - I'll look into what that would require. Can that be done in a follow-up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Better be proactive and file a bug to make these interfaces non-Private and at the same time make sure they're showing up properly in documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import org.apache.spark.annotation.Private;

/**
* :: Private ::
* An interface for plugging in modules for storing and reading temporary shuffle data.
* <p>
* This is the root of a plugin system for storing shuffle bytes to arbitrary storage
* backends in the sort-based shuffle algorithm implemented by the
* {@link org.apache.spark.shuffle.sort.SortShuffleManager}. If another shuffle algorithm is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to check how these links render in the final documentation, since as I mentioned that package is removed from public docs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the link isn't made.
Screen Shot 2019-08-01 at 4 52 00 PM

* needed instead of sort-based shuffle, one should implement
* {@link org.apache.spark.shuffle.ShuffleManager} instead.
* <p>
* A single instance of this module is loaded per process in the Spark application.
* The default implementation reads and writes shuffle data from the local disks of
* the executor, and is the implementation of shuffle file storage that has remained
* consistent throughout most of Spark's history.
* <p>
* Alternative implementations of shuffle data storage can be loaded via setting
* spark.shuffle.io.plugin.class.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<code></code>.

* @since 3.0.0
*/
@Private
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question from SPARK-28568. Is it an API or not? Looks so given the PR description. @Private is:

  • This should be used only when the standard Scala / Java means of protecting classes are
  • insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation
  • in its place.

So @Private doesn't look like for APIs. Shall we change it to @Unstable (maybe with an explicit warning)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon it'll all eventually be @Experimental, but we decided to start by making it @Private just in case spark 3.0 gets released in the middle. (discussed here: #25007 (comment))

Looks like we forgot to file a follow up jira about that, I just filed https://issues.apache.org/jira/browse/SPARK-28592

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, okie. That's good.
My impression was that @Unstable guarantees less than @Experimental. Maybe we can consider this point as well later.

public interface ShuffleDataIO {

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some JavaDoc explaining the difference between the ShuffleManager plugin and this plugin system.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question that may be naive, why do we choose Java over Scala? I see Spark classes except the ones dealing with underlying memory write in Scala...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a public interface, it is better to use Java, so that other users can implement with Java, Scala or other jvm languages.

If we defined the APIs using Scala, mostly user can only use Scala to implement it, unless it is well designed to avoid Scala specific features, so that it can be leveraged by Java.


/**
* Called once on executor processes to bootstrap the shuffle data storage modules that
* are only invoked on the executors.
* <p>
* At this point, this module is responsible for reading and writing shuffle data bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this paragraph.

* from the backing store.
*/
ShuffleExecutorComponents executor();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
* An interface for building shuffle support for Executors.
*
* @since 3.0.0
*/
@Private
public interface ShuffleExecutorComponents {

/**
* Called once per executor to bootstrap this module with state that is specific to
* that executor, specifically the application ID and executor ID.
*/
void initializeExecutor(String appId, String execId);

/**
* Returns the modules that are responsible for persisting shuffle data to the backing
* store.
* <p>
* This may be called multiple times on each executor. Implementations should not make
* any assumptions about the lifetime of the returned module.
*/
ShuffleWriteSupport writes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this better to use writes or write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't have a strong preference either way.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.IOException;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;

/**
* :: Private ::
* A top-level writer that returns child writers for persisting the output of a map task,
* and then commits all of the writes as one atomic operation.
*
* @since 3.0.0
*/
@Private
public interface ShuffleMapOutputWriter {

/**
* Creates a writer that can open an output stream to persist bytes targeted for a given reduce
* partition id.
* <p>

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want these to have line breaks in the generated HTML. But I'm not sure what the stance is across the rest of the codebase - we can remove these if pretty-formatting with line breaks isn't necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think it is needed for javadoc, though its not needed for scaladoc. IMO its worth keeping them.

https://www.oracle.com/technetwork/java/javase/documentation/index-137868.html#format

* The chunk corresponds to bytes in the given reduce partition. This will not be called twice
* for the same partition within any given map task. The partition identifier will be in the
* range of precisely 0 (inclusive) to numPartitions (exclusive), where numPartitions was
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention in order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the docs more thorough, indicating ordering and also indicating how there's no guarantee that this will be called for an empty partition.

* provided upon the creation of this map output writer via
* {@link ShuffleWriteSupport#createMapOutputWriter(
* int, int, long, int, ShuffleWriteMetricsReporter)}.
* <p>
* Calls to this method will be invoked with monotonically increasing reducePartitionIds; each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How useful is this? I think we can make Spark shuffle more flexible if we don't guarantee this. Do you have a concrete example of how an implementation can leverage this guarantee?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark's existing implementation makes this assumption. The index & data file assume they are in sequential order.

though it would be really easy to change the index format to allow for the order to random (just need to include a start and end, rather having the end be implicit).

* call to this method will be called with a reducePartitionId that is strictly greater than
* the reducePartitionIds given to any previous call to this method. This method is not
* guaranteed to be called for every partition id in the above described range. In particular,
* no guarantees are made as to whether or not this method will be called for empty partitions.
*/
ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws IOException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "calls to this method will be invoked with monotonically increasing reducePartitionIds"? This may cause potential issues in future and cause burden on implementation. for example, if people want to implement multiple partition writers and write shuffle data in parallel. It cannot guarantee monotonically increasing reducePartitionIds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People using this will be using it with SortShuffleManager which has a specific algorithm that won't open streams in parallel. If these invariants are broken, it implies the algorithm has changed, in which case we'd need to reconsider these APIs.


/**
* Commits the writes done by all partition writers returned by all calls to this object's
* {@link #getPartitionWriter(int)}.
* <p>
* This should ensure that the writes conducted by this module's partition writers are
* available to downstream reduce tasks. If this method throws any exception, this module's
* {@link #abort(Throwable)} method will be invoked before propagating the exception.
* <p>
* This can also close any resources and clean up temporary state if necessary.
*/
void commitAllPartitions() throws IOException;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this return Optional<MapShuffleLocations>?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gczsjdy any reason to return Optional<MapShuffleLocations>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ended up adjusting the API for shuffle locations. This will come later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the SPIP has the latest API.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao @mccheah has explained well, because Optional<MapShuffleLocations> make implementers customize locations recorded in Driver. @mccheah This will be in driver lifecycle subissue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to that effect yeah - it also has implications on the reader API, but these are concerns to be addressed in subsequent patches.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. : )


/**
* Abort all of the writes done by any writers returned by {@link #getPartitionWriter(int)}.
* <p>
* This should invalidate the results of writing bytes. This can also close any resources and
* clean up temporary state if necessary.
*/
void abort(Throwable error) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.IOException;
import java.io.OutputStream;

import java.nio.channels.Channels;
import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.sort.DefaultTransferrableWritableByteChannel;

/**
* :: Private ::
* An interface for opening streams to persist partition bytes to a backing data store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add that this stores bytes for one (mapper, reducer) pair, which corresponds to one ShuffleBlock

* <p>
* This writer stores bytes for one (mapper, reducer) pair, corresponding to one shuffle
* block.
*
* @since 3.0.0
*/
@Private
public interface ShufflePartitionWriter {

/**
* Open and return an {@link OutputStream} that can write bytes to the underlying
* data store.
* <p>
* This method will only be called once on this partition writer in the map task, to write the
* bytes to the partition. The output stream will only be used to write the bytes for this
* partition. The map task closes this output stream upon writing all the bytes for this
* block, or if the write fails for any reason.
* <p>
* Implementations that intend on combining the bytes for all the partitions written by this
* map task should reuse the same OutputStream instance across all the partition writers provided
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
* {@link OutputStream#close()} does not close the resource, since it will be reused across
* partition writes. The underlying resources should be cleaned up in
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
*/
OutputStream openStream() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to say more here about the lifecycle of this outputstream. In particular, that (a) the framework will only keep one of these outputstreams open at a time per map task (b) the framework ensures that the outputstreams are closed, even if there are any exceptions and (c) if an individual implementation wants to keep all the output for one map task together (like the index / data file organization of local shuffle output), then they may want to reuse the the real underlying outputstream across all ShufflePartitionWriters of one ShuffleMapOutputWriter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more docs.


/**
* Opens and returns a {@link TransferrableWritableByteChannel} for transferring bytes from
* input byte channels to the underlying shuffle data store.
* <p>
* This method will only be called once on this partition writer in the map task, to write the
* bytes to the partition. The channel will only be used to write the bytes for this
* partition. The map task closes this channel upon writing all the bytes for this
* block, or if the write fails for any reason.
* <p>
* Implementations that intend on combining the bytes for all the partitions written by this
* map task should reuse the same channel instance across all the partition writers provided
* by the parent {@link ShuffleMapOutputWriter}. If one does so, ensure that
* {@link TransferrableWritableByteChannel#close()} does not close the resource, since it
* will be reused across partition writes. The underlying resources should be cleaned up in
* {@link ShuffleMapOutputWriter#commitAllPartitions()} and
* {@link ShuffleMapOutputWriter#abort(Throwable)}.
* <p>
* This method is primarily for advanced optimizations where bytes can be copied from the input
* spill files to the output channel without copying data into memory.
* <p>
* The default implementation should be sufficient for most situations. Only override this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm not sure this is true, if the goal is to actually provide an optimization. In that case, the default implementation is only sufficient if your stream is a FileInputStream (just checked what Channels.newChannel() does).

Otherwise, the wrapper created will copy data into user memory, basically negating the optimization.

(Which maybe is an argument for returning null here and falling back to the normal IO path when that happens.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we're saying is that this kind of low-level optimization isn't the first place to look to improve performance most of the time, so to speak. So if one has to do the optimization, they should provide the proper override, but, the specific optimization isn't a critical factor to consider outside of the local disk implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why not follow my suggestion and return null here by default? It makes it much more clear that this implementation is not needed, and that by default the non-nio path is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's primarily to avoid returning null from the API - in that case I'd rather return Optional, then Optional.empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The main thing is returning something that indicates that this feature is not supported, instead of by default wrapping things a way that might actually hurt performance.

* method if there is a very specific optimization that needs to be built.
*/
default TransferrableWritableByteChannel openTransferrableChannel() throws IOException {
return new DefaultTransferrableWritableByteChannel(
Channels.newChannel(openStream()));
}

/**
* Returns the number of bytes written either by this writer's output stream opened by
* {@link #openStream()} or the byte channel opened by {@link #openTransferrableChannel()}.
* <p>
* This can be different from the number of bytes given by the caller. For example, the
* stream might compress or encrypt the bytes before persisting the data to the backing
* data store.
*/
long getNumBytesWritten();
Copy link

@hiboyang hiboyang Jul 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class delegates writing to OutputStream by openStream(). Will getNumBytesWritten() in this class access internal state inside that OutputStream? How about let OutputStream track the number of bytes written so this class does not need to access OutputStream? One possible solution is to add a subclass of OutputStream to track number of bytes. Something like existing TimeTrackingOutputStream class in Spark which extends OutputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that if the implementation also supports creating a custom WritableByteChannel, then the number of bytes written would be from that of the channel, not the output stream. One could see us having both a custom output stream and an added method on WritableByteChannelWrapper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I also remember why we didn't attach it to the output stream - it's particularly because of the lifecycle. If we have an output stream for the partition that pads bytes upon closing the stream, it's unclear that one will continue to call methods on the output stream object after it has been closed. That's why we have the contract:

  1. Open stream for writing bytes.
  2. Write bytes
  3. Close stream
  4. Get written bytes for that partition, accounting for the fact that the above step closed the stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the OutputStream returned by openStream() is tightly coupled with ShufflePartitionWriter. Could we merge them together into one class, e.g.

ShufflePartitionWriterStream extends OutputStream {
  open();
  getNumBytesWritten();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An OutputStream instance is considered opened as soon as the object exists, which is why OutputStream extends Closeable. As soon as I have a reference to the OutputStream object I can call write on it to push bytes to the sink. So having a separate open method doesn't make sense.

The open method belongs in the ShufflePartitionWriter API, which is effectively what we have with openStream and openChannel.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean the OutputStream returned by openStream() is tightly coupled with ShufflePartitionWriter, thus suggest merging them together. for example, rename ShufflePartitionWriter to ShufflePartitionWriterStream which extends OutputStream:

ShufflePartitionWriterStream extends OutputStream {
void open();
long getNumBytesWritten();
}

In this case, user do not need to create a ShufflePartitionWriter and then call its openStream() method to get an OutputStream. Instead, user will create ShufflePartitionWriterStream, which is already an OutputStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But again, do we call getNumBytesWritten before or after calling close on this object? If before, does it include the bytes that might be padded in close-ing the stream? If after, are we going to be invoking methods on a closed resource, and is that reasonable?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.IOException;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;

/**
* :: Private ::
* A module that returns shuffle writers to persist data that is written by shuffle map tasks.
*
* @since 3.0.0
*/
@Private
public interface ShuffleWriteSupport {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This layer has already been removed. : )


/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
*
* @param shuffleId Unique identifier for the shuffle stage of the map task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually an identifier of the shuffle itself, not the stage, right? If you reuse a shuffle, you get the same shuffle id, but different stage id.

* @param mapId Within the shuffle stage, the identifier of the map task
* @param mapTaskAttemptId Identifier of the task attempt. Multiple attempts of the same map task
* with the same (shuffleId, mapId) pair can be distinguished by the
* different values of mapTaskAttemptId.
* @param numPartitions The number of partitions that will be written by the map task. Some of
* these partitions may be empty.
* @param mapTaskWriteMetrics The map task's write metrics, which can be updated by the returned
* writer. The updates that are posted to this reporter are listed in
* the Spark UI. Note that the caller will update the total write time
* at the end of the map task, so implementations should not call
* {@link ShuffleWriteMetricsReporter#incWriteTime(long)}.
*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
long mapTaskAttemptId,
int numPartitions,
ShuffleWriteMetricsReporter mapTaskWriteMetrics) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two problems here:

  • ShuffleWriteMetricsReporter is private[spark]
  • as I mentioned, the package it's on is not publicly documented, so is not considered a public API.

If this will be exposed it needs to be moved inside the api package.

But on a separate note, is this needed? I remember some discussion about this but don't remember what metrics the plugin is expected to update... it seems to me all metrics are already updated by the shuffle code itself (e.g. BypassMergeSortShuffleWriter, ShuffleExternalSorter, etc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in response to this thread: #25007 (comment). I'd much rather not have this be part of the API and would rather have implementations call TaskContext.get() to get the metrics reporter for the task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito for further thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imran IIRC will only be back next week, so unless you're ok with waiting, probably should remove this and re-add it later after we figure out exactly what's needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also was out Thursday and Friday or last week, so now we can coordinate on this together.

ping @squito for thoughts on this matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delays from me. So after a closer look, I actually am pretty sure we should remove this from the api, and also any use of it from LocalDiskShuffleMapOutputWriter. That also means that test change I was originally commenting on, which sets the TaskContext, could also be removed.

I think the current code in this patch is wrong, its double counting the write time for the final merged file. The original code did not create a TimeTrackingOutputStream for the merged file -- it just counted the time for the total creation of that file.

https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L190-L213

and it seems like we'd have to do that, as we might be using a channel there, and then we wouldn't have an equivalent way of doing it for the channel.

The current code in this pr does something similar here in BypassMergeSortShuffleWriter.writePartitionedData(): https://github.com/apache/spark/pull/25007/files#diff-8b6b7a5dadc0d8e97307d0f8e8378d8fR247

But its also passing that to the LocalDiskShuffleMapOutputWriter in a TimeTrackingOutputStream: https://github.com/apache/spark/pull/25007/files#diff-17636cf695d4c63ea3e15c3d71d63707R133

Sorry I should have looked at the use of those metrics more closely in the first place. But I think this means we can remove that metrics object from the api entirely.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api;

import java.io.Closeable;
import java.io.IOException;

import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.spark.annotation.Private;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong import grouping.


/**
* :: Private ::
* Represents an output byte channel that can copy bytes from input file channels to some
* arbitrary storage system.
* <p>
* This API is provided for advanced users who can transfer bytes from a file channel to
* some output sink without copying data into memory. Most users should not need to use
* this functionality; this is primarily provided for the built-in shuffle storage backends
* that persist shuffle files on local disk.
* <p>
* For a simpler alternative, see {@link ShufflePartitionWriter}.
*
* @since 3.0.0
*/
@Private
public interface TransferrableWritableByteChannel extends Closeable {

/**
* Copy all bytes from the source readable byte channel into this byte channel.
* <p>
* This method should block until all of the bytes from the source (that is, up until
* numBytesToTransfer) are available in the output storage layer.
*
* @param source File to transfer bytes from. Do not call anything on this channel other than
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is a bit weird. Why the restriction on what method can be called on the input?

That sounds to me like the abstraction is wrong, and this interface maybe shouldn't exist, and instead plugins should be returning a WritableByteChannel.

That way the input is never exposed to the plugin and whatever problem that restriction is expected to solve would not exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed why this can't return a WritableByteChannel in this thread: palantir#535 (comment)

Copy link
Contributor Author

@mccheah mccheah Jul 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to summarize the discussion from that thread here so that we have some record of the decision in the mainline repository.

Basically this comes from a need to block the close method from closing the underlying channel, but still keeping the optimization of FileChannel#transferTo(otherFileChannel).

We can't return any custom implementation of WritableByteChannel here that shields closing, because FileChannel#transferTo(writableByteChannel) needs to specifically receive a WritableByteChannel that is an instance of FileChannel. But if the WritableByteChannel is indeed an instance of FileChannel, then FileChannel#close() will force the underlying channel to close, which is not what we want to do particularly for the local disk storage implementation given here.

I agree that the abstraction here is convoluted. I proposed having an abstraction on top of the input FileChannel to prevent the plugin system from calling arbitrary methods on the input, but we decided that wasn't necessary in palantir#535 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically this comes from a need to block the close method from closing the underlying channel

I see. Still this is a really convoluted interface, which makes me more strongly prefer that this be hidden as an optimization in the internal, local file implementation, instead of being exposed in the public API.

If really exposing this in the public API, I think it would be better to have an interface that makes it clear what's the intent here: provide a FileChannel (and not any other kind of channel) that, depending on the plugin, should not be closed. So something like:

interface FileChannelWrapper {

  FileChannel openChannel();

  boolean shouldClose();

}

Plugins by default would just return a null wrapper, and then the code calling this could do:

FileChannel channel = transferToEnabled ? writer.fileChannelWrapper() : null;
if (channel != null) {
  Utils.copyBlahBlahBlah();
  if (channel.shouldClose()) channel.close();
} else {
  // non-nio code path.
}

It's not great, but it's (i) less code than you have here and (ii) much clearer what's the actual intent of this API, without the restrictions about what methods should and should not be called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to just return the FileChannel from ShufflePartitionWriter, and have the shuffle code never close the returned channel. Instead, the ShuffleMapOutputWriter would do it when it's safe to do so.

That's a little awkward though since it would basically codify into the interface what is just a suggestion in openStream() (where it says you should override close() if you don't want the underlying stream to be closed).

But that kinda gives me an idea for tweaking the above interface a bit; have this in ShufflePartitionWriter instead:

FileChannel openChannel();

default void closeChannel(FileChannel channel) = { channel.close(); }

If you do the same for the openStream() method (i.e. have a closeStream()), then the API is symmetrical, and you avoid the use of stream wrappers to block the call to close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with a different sort of abstraction that captures the spirit of this idea in the latest patch. Please take a look and let me know what you think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't see your follow-up comment. I'm not as fond of passing the resource back to the plugin to ask the plugin to close it? I'm not sure, that isn't a very common programming idiom with respect to streams and resources - usually the close method is attached to the closeable resource itself.

I liked the channel wrapper abstraction. We used to have the close method attached to the ShufflePartitionWriter and only closed the partition writer itself and not the channel, but that seemed prone to error on the plugin writer's side - since the partition writer returns a closeable resource itself, should the plugin expect the closeable resource it returns to be closed, or should it expect ShufflePartitionWriter#close to close the resource? Making openStream and openChannel return closeable resources is clearer with respect to the lifecycle of the returned resource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting here to keep context, even though this code doesn't exist anymore.

I see you implemented basically my first suggestion. I think that if you instead make ShufflePartitionWriter implement Closeable, you can achieve the same goal with less things for people to implement. (openChannel() would just return a WritableByteChannel, and close() would close whatever was opened by either open call.)

It could even simplify some code in BypassMergeSortShuffleWriter since now the close() call would be the same regardless of whether it's a stream or a channel.

But that's minor now, and I'm just trying to reduce the amount of code needed for this optimization to work. Current version seems fine otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this approach over making ShufflePartitionWriter implement Closeable, because ShufflePartitionWriter would return a Closeable resource (OutputStream or WritableByteChannel) that itself is not closed by the caller. That's why we originally went with TransferrableWritableByteChannel which itself is Closeable and doesn't return any Closeable objects. I think this is an in-between version, but it's unfortunate that WritableByteChannelWrapper itself returns a WritableByteChannel that itself isn't closed.

* {@link FileChannel#transferTo(long, long, WritableByteChannel)}.
* @param transferStartPosition Start position of the input file to transfer from.
* @param numBytesToTransfer Number of bytes to transfer from the given source.
*/
void transferFrom(
FileChannel source,
long transferStartPosition,
long numBytesToTransfer) throws IOException;
}
Loading