Skip to content
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
864d1cd
initial API
yifeih Mar 20, 2019
c88751c
wip
yifeih Mar 21, 2019
9af216f
wip
yifeih Mar 22, 2019
a35b826
initial implementation of reader
yifeih Mar 26, 2019
14c47ae
fix based on comments
yifeih Mar 26, 2019
5bb4c32
fix java lang import and delete unneeded class
yifeih Mar 26, 2019
584e6c8
address initial comments
yifeih Mar 27, 2019
0292fe2
fix unit tests
yifeih Mar 27, 2019
71c2cc7
java checkstyle
yifeih Mar 27, 2019
43c377c
fix tests
yifeih Mar 27, 2019
9fc6a60
address some comments
yifeih Mar 28, 2019
45172a5
blah
yifeih Mar 28, 2019
4e5652b
address more comments
yifeih Mar 28, 2019
a35d8fe
Use decorators to customize how the read metrics reporter is instanti…
mccheah Apr 1, 2019
1a09ebe
blah
yifeih Apr 2, 2019
c149d24
initial tests
yifeih Apr 2, 2019
672d473
Revert "initial tests"
yifeih Apr 3, 2019
e0a3289
initial impl
yifeih Apr 3, 2019
1e89b3f
get shuffle reader tests to pass
yifeih Apr 3, 2019
495c7bd
update
yifeih Apr 3, 2019
88a03cb
tests
yifeih Apr 3, 2019
741deed
style
yifeih Apr 3, 2019
76c0381
Merge branch 'spark-25299' into yh/reader-api
yifeih Apr 3, 2019
c7c52b0
hook up executor components
yifeih Apr 4, 2019
897c0bf
fix compile
yifeih Apr 4, 2019
34eaaf6
remove unnecessary fields
yifeih Apr 4, 2019
0548800
remove unused
yifeih Apr 4, 2019
0637e70
refactor retrying iterator
yifeih Apr 4, 2019
f069dc1
remove unused import
yifeih Apr 4, 2019
0bba677
fix some comments
yifeih Apr 5, 2019
a82a725
null check
yifeih Apr 5, 2019
ac392a1
refactor interface
yifeih Apr 5, 2019
53dd94b
refactor API
yifeih Apr 5, 2019
4c0c791
shuffle iterator style
yifeih Apr 5, 2019
84f7931
add some javadocs for interfaces
yifeih Apr 5, 2019
b59efb5
attach apache headers
yifeih Apr 5, 2019
aba8a94
remove unused imports
yifeih Apr 5, 2019
5ef59b6
remove another import
yifeih Apr 5, 2019
49a1901
fix reader
yifeih Apr 5, 2019
8c6c09c
fix imports
yifeih Apr 5, 2019
6370b41
add exception comment for retry API
yifeih Apr 10, 2019
c442b63
address some comments
yifeih Apr 10, 2019
2c1272a
address comments
yifeih Apr 10, 2019
2758a5c
Merge branch 'spark-25299' into yh/reader-api
yifeih Apr 19, 2019
bd349ca
resolve conflicts
yifeih Apr 19, 2019
653f67c
style
yifeih Apr 19, 2019
9f53839
address some comments
yifeih Apr 19, 2019
94275fd
style
yifeih Apr 20, 2019
26e97c1
refactor API
yifeih Apr 20, 2019
91db776
cleanup
yifeih Apr 20, 2019
f0fa7b8
fix tests and style
yifeih Apr 22, 2019
50c8fc3
style
yifeih Apr 22, 2019
4aa4b6e
reorder result for test?
yifeih Apr 22, 2019
7d23f47
wip
yifeih Apr 26, 2019
363d4ab
address comments
yifeih Apr 29, 2019
bb7fa4c
style
yifeih Apr 29, 2019
711109b
cleanup tests
yifeih Apr 29, 2019
04a135c
Remove unused class
mccheah Apr 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import java.util.Objects;

public class ShuffleBlockInfo {
private final int shuffleId;
private final int mapId;
private final int reduceId;
private final long length;

public ShuffleBlockInfo(int shuffleId, int mapId, int reduceId, long length) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.reduceId = reduceId;
this.length = length;
}

public int getShuffleId() {
return shuffleId;
}

public int getMapId() {
return mapId;
}

public int getReduceId() {
return reduceId;
}

public long getLength() {
return length;
}

@Override
public boolean equals(Object other) {
return other instanceof ShuffleBlockInfo
&& shuffleId == ((ShuffleBlockInfo) other).shuffleId
&& mapId == ((ShuffleBlockInfo) other).mapId
&& reduceId == ((ShuffleBlockInfo) other).reduceId
&& length == ((ShuffleBlockInfo) other).length;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check equality against the location?

}

@Override
public int hashCode() {
return Objects.hash(shuffleId, mapId, reduceId, length);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hash the location?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public interface ShuffleExecutorComponents {
void initializeExecutor(String appId, String execId);

ShuffleWriteSupport writes();

ShuffleReadSupport reads();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import org.apache.spark.annotation.Experimental;

import java.io.IOException;

/**
* :: Experimental ::
* An interface for reading shuffle records.
* @since 3.0.0
*/
@Experimental
public interface ShuffleReadSupport {
/**
* Returns an underlying {@link ShuffleReaderIterable} that will iterate through shuffle data,
* given an iterable for the shuffle blocks to fetch.
*/
ShuffleReaderIterable getPartitionReaders(Iterable<ShuffleBlockInfo> blockMetadata)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import org.apache.spark.annotation.Experimental;

import java.io.InputStream;

/**
* :: Experimental ::
* An interface for reading shuffle records.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an interface?

* @since 3.0.0
*/
@Experimental
public class ShuffleReaderInputStream {

private final ShuffleBlockInfo shuffleBlockInfo;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit more, and I don't think we want to be putting the shuffle block info into this part of the API. The reader should just return plain InputStream objects without this wrapper.

We originally included the block info here because in the default implementation, we need to know the length of the block to know if we should be checking for corruption. But this is again only applicable for the default implementation. Putting the block info here would result in it being unused by other shuffle storage plugins.

What I think we want instead is for BlockStoreShuffleReader to check that each returned InputStream is an instance of some class, then have that input stream class have a method that returns which block it's reading and what its length is. Something like that, anyways.

We might want to use pattern matching instead of isInstanceof and asInstanceOf in a lot of cases.

Can we see if we can get the reader API to return Iterable<InputStream>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the slow turnaround. I'm not sure that returning just Iterable<InputStream> would work because the decryption/decompression function, serializerManager.wrapStream must take a BlockId as one of its arguments. Every implementation should have some way of giving that information back to the BlockStoreShuffleReader for the decryption/decompression to be feasibly done outside the plugin.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok that makes sense, let's keep it this way until we can think of something better. I don't think we necessarily need to use wrapStream directly - we can call wrapForCompression and wrapForEncryption individually, both of which only accepts streams and not requiring block ids. I think block ids are only required to check the type of input stream it is, but you could do that check a layer above? Regardless this way might be fine as is, but look around and see what we can do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok yea I guess I could call those two function directly. I think I can also move the corruption detection logic back inside the ShuffleBlockFetcherIterator in that case too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it looks like other parts of the codebase call those two functions directly, so it seems reasonable

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well now the class is no longer used, we should delete it yeah? =)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind it's gone =D

private final InputStream inputStream;

public ShuffleReaderInputStream(ShuffleBlockInfo shuffleBlockInfo, InputStream inputStream) {
this.shuffleBlockInfo = shuffleBlockInfo;
this.inputStream = inputStream;
}

public ShuffleBlockInfo getShuffleBlockInfo() {
return shuffleBlockInfo;
}

public InputStream getInputStream() {
return inputStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import org.apache.spark.annotation.Experimental;

import java.util.Iterator;

/**
* :: Experimental ::
* An interface for iterating through shuffle blocks to read.
* @since 3.0.0
*/
@Experimental
public interface ShuffleReaderIterable extends Iterable<ShuffleReaderInputStream> {

interface ShuffleReaderIterator extends Iterator<ShuffleReaderInputStream> {
/**
* Instructs the shuffle iterator to fetch the last block again. This is useful
* if the block is determined to be corrupt after decryption or decompression.
*/
default void retryLastBlock(Throwable t) {
throw new UnsupportedOperationException("Cannot retry fetching bad blocks", t);
}
}

@Override
ShuffleReaderIterator iterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

package org.apache.spark.shuffle.sort.io;

import org.apache.spark.MapOutputTracker;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleReadSupport;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.io.DefaultShuffleReadSupport;
import org.apache.spark.storage.BlockManager;

public class DefaultShuffleExecutorComponents implements ShuffleExecutorComponents {

private final SparkConf sparkConf;
private BlockManager blockManager;
private IndexShuffleBlockResolver blockResolver;
private MapOutputTracker mapOutputTracker;

public DefaultShuffleExecutorComponents(SparkConf sparkConf) {
this.sparkConf = sparkConf;
Expand All @@ -37,15 +41,29 @@ public DefaultShuffleExecutorComponents(SparkConf sparkConf) {
@Override
public void initializeExecutor(String appId, String execId) {
blockManager = SparkEnv.get().blockManager();
mapOutputTracker = SparkEnv.get().mapOutputTracker();
blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
}

@Override
public ShuffleWriteSupport writes() {
checkInitialized();
return new DefaultShuffleWriteSupport(sparkConf, blockResolver);
}

@Override
public ShuffleReadSupport reads() {
checkInitialized();
return new DefaultShuffleReadSupport(
blockManager,
mapOutputTracker,
sparkConf);
}

private void checkInitialized() {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
"Executor components must be initialized before getting writers.");
}
return new DefaultShuffleWriteSupport(sparkConf, blockResolver);
}
}
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TaskMetrics private[spark] () extends Serializable {
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
private var _decorFunc: TempShuffleReadMetrics => TempShuffleReadMetrics =
Predef.identity[TempShuffleReadMetrics]

/**
* Time taken on the executor to deserialize this task.
Expand Down Expand Up @@ -187,11 +189,17 @@ class TaskMetrics private[spark] () extends Serializable {
* be lost.
*/
private[spark] def createTempShuffleReadMetrics(): TempShuffleReadMetrics = synchronized {
val readMetrics = new TempShuffleReadMetrics
tempShuffleReadMetrics += readMetrics
val tempShuffleMetrics = new TempShuffleReadMetrics
val readMetrics = _decorFunc(tempShuffleMetrics)
tempShuffleReadMetrics += tempShuffleMetrics
readMetrics
}

private[spark] def decorateTempShuffleReadMetrics(
decorFunc: TempShuffleReadMetrics => TempShuffleReadMetrics): Unit = synchronized {
_decorFunc = decorFunc
}

/**
* Merge values across all temporary [[ShuffleReadMetrics]] into `_shuffleReadMetrics`.
* This is expected to be called on executor heartbeat and at the end of a task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@

package org.apache.spark.shuffle

import java.io.{InputStream, IOException}
import java.nio.ByteBuffer

import scala.collection.JavaConverters._

import org.apache.spark._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.api.shuffle.{ShuffleBlockInfo, ShuffleReadSupport}
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.{CompletionIterator, Utils}
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.io.ChunkedByteBufferOutputStream

/**
* Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by
Expand All @@ -34,33 +41,64 @@ private[spark] class BlockStoreShuffleReader[K, C](
endPartition: Int,
context: TaskContext,
readMetrics: ShuffleReadMetricsReporter,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
serializerManager: SerializerManager,
shuffleReadSupport: ShuffleReadSupport,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
extends ShuffleReader[K, C] with Logging {

private val dep = handle.dependency

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
readMetrics).toCompletionIterator
val wrappedStreams =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the name is misleading now since these streams are not really wrapped (you moved the wrapStream call to later).

shuffleReadSupport.getPartitionReaders(new Iterable[ShuffleBlockInfo] {
override def iterator: Iterator[ShuffleBlockInfo] = {
/** Read the combined key-values for this reduce task */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment looks really out of place here -- at this point you're just getting shuffle block ids, not the combined key-values. You dont' have combined key values till you create aggregatedIter down below.

mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)
.flatMap(blockManagerIdInfo => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.flatMap { blockManagerIdInfo =>

blockManagerIdInfo._2.map(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { blockInfo =>

blockInfo => {
val block = blockInfo._1.asInstanceOf[ShuffleBlockId]
new ShuffleBlockInfo(block.shuffleId, block.mapId, block.reduceId, blockInfo._2)
}
)
})
}
}.asJava).iterator()

val serializerInstance = dep.serializer.newInstance()
val retryingWrappedStreams = new Iterator[InputStream] {
override def hasNext: Boolean = wrappedStreams.hasNext

// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
override def next(): InputStream = {
var returnStream: InputStream = null
while (wrappedStreams.hasNext && returnStream == null) {
val nextStream = wrappedStreams.next()
val blockInfo = nextStream.getShuffleBlockInfo
val blockId = ShuffleBlockId(
blockInfo.getShuffleId,
blockInfo.getMapId,
blockInfo.getReduceId)
try {
val in = serializerManager.wrapStream(blockId, nextStream.getInputStream)
val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate)
// Decompress the whole block at once to detect any corruption, which could increase
// the memory usage tne potential increase the chance of OOM.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tne?

// TODO: manage the memory used here, and spill it into disk in case of OOM.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code has since changed upstream some, I think you are basing your change before this ? apache@688b0c0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok haven't been following the PRs upstream. I think we will merge from upstream to the base spark-25299 branch and address this in a separate PR to maintain consistency of the perf tests and benchmarks.

Utils.copyStream(in, out, closeStreams = true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be replacing code in ShuffleBlockFetcherIterator, but the old has extra checks (e.g. only does it for smaller blocks). Are those not needed here?

At least it seems to be missing the flag that enables corruption detection in the first place.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah i moved them to retryLast() but I realize that doesn't quite work the same way as it did before because it might move all data in memory when you don't want to. I'll refactor this part

returnStream = out.toChunkedByteBuffer.toInputStream(dispose = true)
} catch {
case e: IOException =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this catch Exception?

Copy link
Author

@yifeih yifeih Apr 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the original code has IOException because you want to catch only those, not FetchFailedException because that's the indicator from the ShuffleBlockFetcherIterator that it stopped retrying.

Copy link

@mccheah mccheah Apr 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping here? Think the last push doesn't address this.

(Edit: Tis is in the wrong place, it's for https://github.com/palantir/spark/pull/523/files/f069dc1e818f1403f03127807f8e1528df478616#r272765724)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to add a comment allowing for the implementor to have some awareness, that this catches IOException specifically. Maybe to explain the intuition behind this logic. For example, why not catch all exceptions EXCEPT FetchFailedException?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think I can add FetchFailedException because it's not part of the spark API... This is interesting because it also relates to @squito's concern about detecting failures that are used in the scheduler logic to mark all shuffle files on an executor as lost. Since other plugin implementations should be storing shuffle files remotely, it shouldn't mark one executor's shuffle files as lost, so we can perhaps make this a generic Exception and the plugins can throw whatever exceptions they want.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please see my rant below :)

wrappedStreams.retryLastBlock(e)
}
}
if (returnStream == null) {
throw new IllegalStateException("Expected shuffle reader iterator to return a stream")
}
returnStream
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensive programming: if wrappedStreams didn't have a next value and returnStream is null, it means we ran out of elements despite the underlying iterator claiming that there were indeed more elements - which indicates that retrying didn't work properly. Let's check that returnStream is not null here and report an error message accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm so the retrying not working properly would result in either wrappedStreams.next() or wrappedStreams.retryLastBlock() to throw a FetchFailedException, but you're right, we aren't guaranteed that that other implementations will do this correctly. Will add the check

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping here? Think the last push doesn't address this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind I was looking at a stale diff

}
}

val serializerInstance = dep.serializer.newInstance()
val recordIter = retryingWrappedStreams.flatMap { wrappedStream =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
Expand Down
Loading