Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
985d53c
[SPARK-26601][SQL] Make broadcast-exchange thread pool configurable
caneGuy Jan 11, 2019
51a6ba0
[SPARK-26503][CORE] Get rid of spark.sql.legacy.timeParser.enabled
srowen Jan 11, 2019
d9e4cf6
[SPARK-26482][CORE] Use ConfigEntry for hardcoded configs for ui cate…
HeartSaVioR Jan 11, 2019
50ebf3a
[SPARK-26551][SQL] Fix schema pruning error when selecting one comple…
viirya Jan 11, 2019
ae382c9
[SPARK-26586][SS] Fix race condition that causes streams to run with …
mukulmurthy Jan 11, 2019
19e17ac
[SPARK-25692][TEST] Increase timeout in fetchBothChunks test
dongjoon-hyun Jan 12, 2019
e00ebd5
[SPARK-26482][K8S][TEST][FOLLOWUP] Fix compile failure
dongjoon-hyun Jan 12, 2019
3587a9a
[SPARK-26607][SQL][TEST] Remove Spark 2.2.x testing from HiveExternal…
dongjoon-hyun Jan 12, 2019
5b37092
[SPARK-26538][SQL] Set default precision and scale for elements of po…
a-shkarupin Jan 12, 2019
3bd77aa
[SPARK-26564] Fix wrong assertions and error messages for parameter c…
sekikn Jan 12, 2019
4ff2b94
[SPARK-26503][CORE][DOC][FOLLOWUP] Get rid of spark.sql.legacy.timePa…
MaxGekk Jan 13, 2019
c01152d
[SPARK-23182][CORE] Allow enabling TCP keep alive on the RPC connections
peshopetrov Jan 13, 2019
09b0548
[SPARK-26450][SQL] Avoid rebuilding map of schema for every column in…
bersprockets Jan 13, 2019
985f966
[SPARK-26065][FOLLOW-UP][SQL] Revert hint behavior in join reordering
maryannxue Jan 13, 2019
3f80071
[SPARK-26576][SQL] Broadcast hint not applied to partitioned table
jzhuge Jan 13, 2019
115fecf
[SPARK-26456][SQL] Cast date/timestamp to string by Date/TimestampFor…
MaxGekk Jan 14, 2019
27759b7
Refine comment
caneGuy Jan 14, 2019
9669569
Update
caneGuy Jan 14, 2019
ac2ec82
Update
caneGuy Jan 14, 2019
bafc7ac
[SPARK-26350][SS] Allow to override group id of the Kafka consumer
zsxwing Jan 14, 2019
abc937b
[MINOR][BUILD] Remove binary license/notice files in a source release…
maropu Jan 15, 2019
33b5039
[SPARK-25935][SQL] Allow null rows for bad records from JSON/CSV parsers
MaxGekk Jan 15, 2019
a77505d
[CORE][MINOR] Fix some typos about MemoryMode
SongYadong Jan 15, 2019
b45ff02
[SPARK-26203][SQL][TEST] Benchmark performance of In and InSet expres…
aokolnychyi Jan 15, 2019
5ca45e8
[SPARK-26592][SS] Throw exception when kafka delegation token tried t…
gaborgsomogyi Jan 15, 2019
7296999
[SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for executi…
pralabhkumar Jan 15, 2019
8a54492
[SPARK-25857][CORE] Add developer documentation regarding delegation …
Jan 15, 2019
1b75f3b
[SPARK-17928][MESOS] No driver.memoryOverhead setting for mesos clust…
Jan 15, 2019
954ef96
[SPARK-25530][SQL] data source v2 API refactor (batch write)
cloud-fan Jan 15, 2019
2ebb79b
[SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced …
HeartSaVioR Jan 15, 2019
cf133e6
[SPARK-26604][CORE] Clean up channel registration for StreamManager
viirya Jan 16, 2019
819e5ea
[SPARK-26615][CORE] Fixing transport server/client resource leaks in …
attilapiros Jan 16, 2019
e92088d
[MINOR][PYTHON] Fix SQLContext to SparkSession in Python API main page
HyukjinKwon Jan 16, 2019
670bc55
[SPARK-25992][PYTHON] Document SparkContext cannot be shared for mult…
HyukjinKwon Jan 16, 2019
06d5b17
[SPARK-26629][SS] Fixed error with multiple file stream in a query + …
tdas Jan 16, 2019
190814e
[SPARK-26550][SQL] New built-in datasource - noop
MaxGekk Jan 16, 2019
8f17078
[SPARK-26619][SQL] Prune the unused serializers from SerializeFromObject
viirya Jan 16, 2019
01301d0
[SPARK-26625] Add oauthToken to spark.redaction.regex
Jan 16, 2019
dc3b35c
[SPARK-26633][REPL] Add ExecutorClassLoader.getResourceAsStream
rednaxelafx Jan 16, 2019
272428d
[SPARK-26600] Update spark-submit usage message
LucaCanali Jan 17, 2019
38f0307
[SPARK-26466][CORE] Use ConfigEntry for hardcoded configs for submit …
HeartSaVioR Jan 17, 2019
4915cb3
[MINOR][BUILD] ensure call to translate_component has correct number …
Jan 17, 2019
06af625
Refine comment
caneGuy Jan 17, 2019
47fbe49
fix code style
caneGuy Jan 17, 2019
b08805e
Refine comment
caneGuy Jan 17, 2019
650b879
[SPARK-26457] Show hadoop configurations in HistoryServer environment…
Jan 17, 2019
d89aa38
Update
caneGuy Jan 17, 2019
c0632ce
[SPARK-23817][SQL] Create file source V2 framework and migrate ORC re…
gengliangwang Jan 17, 2019
6f8c0e5
[SPARK-26593][SQL] Use Proleptic Gregorian calendar in casting UTF8St…
MaxGekk Jan 17, 2019
1b575ef
[SPARK-26621][CORE] Use ConfigEntry for hardcoded configs for shuffle…
10110346 Jan 17, 2019
ede35c8
[SPARK-26622][SQL] Revise SQL Metrics labels
juliuszsompolski Jan 17, 2019
0b3abef
[SPARK-26638][PYSPARK][ML] Pyspark vector classes always return error…
srowen Jan 17, 2019
c2d0d70
[SPARK-26640][CORE][ML][SQL][STREAMING][PYSPARK] Code cleanup from lg…
srowen Jan 18, 2019
e341864
[SPARK-26659][SQL] Fix duplicate cmd.nodeName in the explain output o…
rednaxelafx Jan 18, 2019
30d94ff
fix session error
caneGuy Jan 18, 2019
34db5f5
[SPARK-26618][SQL] Make typed Timestamp/Date literals consistent to c…
MaxGekk Jan 18, 2019
8503aa3
[SPARK-26646][TEST][PYSPARK] Fix flaky test: pyspark.mllib.tests.test…
viirya Jan 18, 2019
64cc9e5
[SPARK-26477][CORE] Use ConfigEntry for hardcoded configs for unsafe …
kiszk Jan 19, 2019
ace2364
[MINOR][TEST] Correct some unit test mistakes
10110346 Jan 19, 2019
6d9c54b
[SPARK-26645][PYTHON] Support decimals with negative scale when parsi…
mgaido91 Jan 20, 2019
6c18d8d
[SPARK-26642][K8S] Add --num-executors option to spark-submit for Spa…
LucaCanali Jan 20, 2019
9a30e23
[SPARK-26351][MLLIB] Update doc and minor correction in the mllib eva…
shahidki31 Jan 21, 2019
421227a
Fix unit test failure
caneGuy Jan 21, 2019
00d144f
[SPARK-26601][SQL] Make broadcast-exchange thread pool configurable
caneGuy Jan 11, 2019
0b6e954
Refine comment
caneGuy Jan 14, 2019
df5c075
Update
caneGuy Jan 14, 2019
dcaaebf
Update
caneGuy Jan 14, 2019
06b857c
Refine comment
caneGuy Jan 17, 2019
057c46e
fix code style
caneGuy Jan 17, 2019
b0c16d2
Refine comment
caneGuy Jan 17, 2019
121def8
Update
caneGuy Jan 17, 2019
869cd14
fix session error
caneGuy Jan 18, 2019
708b248
Fix unit test failure
caneGuy Jan 21, 2019
bbeffc1
Refine comment
caneGuy Jan 28, 2019
ad4f649
refine
caneGuy Jan 28, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ test_that("column functions", {

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)

# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ protected void channelRead0(
ManagedBuffer buf;
try {
streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
streamManager.registerChannel(channel, msg.streamChunkId.streamId);
buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex);
} catch (Exception e) {
logger.error(String.format("Error opening block %s for request from %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand All @@ -49,7 +50,7 @@ private static class StreamState {
final Iterator<ManagedBuffer> buffers;

// The channel associated to the stream
Channel associatedChannel = null;
final Channel associatedChannel;

// Used to keep track of the index of the buffer that the user has retrieved, just to ensure
// that the caller only requests each chunk one at a time, in order.
Expand All @@ -58,9 +59,10 @@ private static class StreamState {
// Used to keep track of the number of chunks being transferred and not finished yet.
volatile long chunksBeingTransferred = 0L;

StreamState(String appId, Iterator<ManagedBuffer> buffers) {
StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
this.appId = appId;
this.buffers = Preconditions.checkNotNull(buffers);
this.associatedChannel = channel;
}
}

Expand All @@ -71,13 +73,6 @@ public OneForOneStreamManager() {
streams = new ConcurrentHashMap<>();
}

@Override
public void registerChannel(Channel channel, long streamId) {
if (streams.containsKey(streamId)) {
streams.get(streamId).associatedChannel = channel;
}
}

@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
StreamState state = streams.get(streamId);
Expand Down Expand Up @@ -195,11 +190,19 @@ public long chunksBeingTransferred() {
*
* If an app ID is provided, only callers who've authenticated with the given app ID will be
* allowed to fetch from this stream.
*
* This method also associates the stream with a single client connection, which is guaranteed
* to be the only reader of the stream. Once the connection is closed, the stream will never
* be used again, enabling cleanup by `connectionTerminated`.
*/
public long registerStream(String appId, Iterator<ManagedBuffer> buffers) {
public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
long myStreamId = nextStreamId.getAndIncrement();
streams.put(myStreamId, new StreamState(appId, buffers));
streams.put(myStreamId, new StreamState(appId, buffers, channel));
return myStreamId;
}

@VisibleForTesting
public int numStreamStates() {
return streams.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,6 @@ public ManagedBuffer openStream(String streamId) {
throw new UnsupportedOperationException();
}

/**
* Associates a stream with a single client connection, which is guaranteed to be the only reader
* of the stream. The getChunk() method will be called serially on this connection and once the
* connection is closed, the stream will never be used again, enabling cleanup.
*
* This must be called before the first getChunk() on the stream, but it may be invoked multiple
* times with the same channel and stream id.
*/
public void registerChannel(Channel channel, long streamId) { }

/**
* Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
* to read from the associated streams again, so any state can be cleaned up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ private void init(String hostToBind, int portToBind) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}

if (conf.enableTcpKeepAlive()) {
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
}

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class TransportConf {
private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
private final String SPARK_NETWORK_IO_LAZYFD_KEY;
private final String SPARK_NETWORK_VERBOSE_METRICS;
private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY;

private final ConfigProvider conf;

Expand All @@ -64,6 +65,7 @@ public TransportConf(String module, ConfigProvider conf) {
SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = getConfKey("io.enableTcpKeepAlive");
}

public int getInt(String name, int defaultValue) {
Expand Down Expand Up @@ -173,6 +175,14 @@ public boolean verboseMetrics() {
return conf.getBoolean(SPARK_NETWORK_VERBOSE_METRICS, false);
}

/**
* Whether to enable TCP keep-alive. If true, the TCP keep-alives are enabled, which removes
* connections that are idle for too long.
*/
public boolean enableTcpKeepAlive() {
return conf.getBoolean(SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY, false);
}

/**
* Maximum number of retries when binding to a port before giving up.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void onFailure(int chunkIndex, Throwable e) {
for (int chunkIndex : chunkIndices) {
client.fetchChunk(STREAM_ID, chunkIndex, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
if (!sem.tryAcquire(chunkIndices.size(), 60, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public void handleChunkFetchRequest() throws Exception {
managedBuffers.add(new TestManagedBuffer(20));
managedBuffers.add(new TestManagedBuffer(30));
managedBuffers.add(new TestManagedBuffer(40));
long streamId = streamManager.registerStream("test-app", managedBuffers.iterator());
streamManager.registerChannel(channel, streamId);
long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);
TransportClient reverseClient = mock(TransportClient.class);
ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient,
rpcHandler.getStreamManager(), 2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public void handleStreamRequest() throws Exception {
managedBuffers.add(new TestManagedBuffer(20));
managedBuffers.add(new TestManagedBuffer(30));
managedBuffers.add(new TestManagedBuffer(40));
long streamId = streamManager.registerStream("test-app", managedBuffers.iterator());
streamManager.registerChannel(channel, streamId);
long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);

assert streamManager.numStreamStates() == 1;

TransportClient reverseClient = mock(TransportClient.class);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
rpcHandler, 2L);
Expand Down Expand Up @@ -94,5 +96,8 @@ public void handleStreamRequest() throws Exception {
requestHandler.handle(request3);
verify(channel, times(1)).close();
assert responseAndPromisePairs.size() == 3;

streamManager.connectionTerminated(channel);
assert streamManager.numStreamStates() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ public void managedBuffersAreFeedWhenConnectionIsClosed() throws Exception {
TestManagedBuffer buffer2 = Mockito.spy(new TestManagedBuffer(20));
buffers.add(buffer1);
buffers.add(buffer2);
long streamId = manager.registerStream("appId", buffers.iterator());

Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
manager.registerChannel(dummyChannel, streamId);
manager.registerStream("appId", buffers.iterator(), dummyChannel);
assert manager.numStreamStates() == 1;

manager.connectionTerminated(dummyChannel);

Mockito.verify(buffer1, Mockito.times(1)).release();
Mockito.verify(buffer2, Mockito.times(1)).release();
assert manager.numStreamStates() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void handleMessage(
OpenBlocks msg = (OpenBlocks) msgObj;
checkAuth(client, msg.appId);
long streamId = streamManager.registerStream(client.getClientId(),
new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds));
new ManagedBufferIterator(msg.appId, msg.execId, msg.blockIds), client.getChannel());
if (logger.isTraceEnabled()) {
logger.trace("Registered streamId {} with {} buffers for client {} from host {}",
streamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public void testOpenShuffleBlocks() {
@SuppressWarnings("unchecked")
ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
(ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
verify(streamManager, times(1)).registerStream(anyString(), stream.capture());
verify(streamManager, times(1)).registerStream(anyString(), stream.capture(),
any());
Iterator<ManagedBuffer> buffers = stream.getValue();
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* An array of long values. Compared with native JVM arrays, this:
* <ul>
* <li>supports using both in-heap and off-heap memory</li>
* <li>supports using both on-heap and off-heap memory</li>
* <li>has no bound checking, and thus can crash the JVM process when assert is turned off</li>
* </ul>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

/**
* A memory location. Tracked either by a memory address (with off-heap allocation),
* or by an offset from a JVM object (in-heap allocation).
* or by an offset from a JVM object (on-heap allocation).
*/
public class MemoryLocation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public class TaskMemoryManager {
/**
* Similar to an operating system's page table, this array maps page numbers into base object
* pointers, allowing us to translate between the hashtable's internal 64-bit address
* representation and the baseObject+offset representation which we use to support both in- and
* representation and the baseObject+offset representation which we use to support both on- and
* off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
* When using an in-heap allocator, the entries in this map will point to pages' base objects.
* When using an on-heap allocator, the entries in this map will point to pages' base objects.
* Entries are added to this map as new data pages are allocated.
*/
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
Expand All @@ -102,7 +102,7 @@ public class TaskMemoryManager {
private final long taskAttemptId;

/**
* Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods
* Tracks whether we're on-heap or off-heap. For off-heap, we short-circuit most of these methods
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.internal.config.package$;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -104,7 +105,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.blockManager = blockManager;
final ShuffleDependency<K, V, V> dep = handle.dependency();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
this, initialSize, (boolean) conf.get(package$.MODULE$.SHUFFLE_SORT_USE_RADIXSORT()));
this.peakMemoryUsedBytes = getMemoryUsage();
this.diskWriteBufferSize =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.spark.*;
import org.apache.spark.annotation.Private;
import org.apache.spark.internal.config.package$;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.NioBufferedFileInputStream;
Expand All @@ -55,7 +56,6 @@
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
import org.apache.spark.internal.config.package$;

@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
Expand Down Expand Up @@ -143,8 +143,8 @@ public UnsafeShuffleWriter(
this.taskContext = taskContext;
this.sparkConf = sparkConf;
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize",
DEFAULT_INITIAL_SORT_BUFFER_SIZE);
this.initialSortBufferSize =
(int) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
this.inputBufferSizeInBytes =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.outputBufferSizeInBytes =
Expand Down Expand Up @@ -282,10 +282,10 @@ void forceSorterToSpill() throws IOException {
* @return the partition lengths in the merged file.
*/
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final boolean compressionEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS());
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
(boolean) sparkConf.get(package$.MODULE$.SHUFFLE_UNDAFE_FAST_MERGE_ENABLE());
final boolean fastMergeIsSupported = !compressionEnabled ||
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ public long getPeakMemoryUsedBytes() {
/**
* Returns the average number of probes per key lookup.
*/
public double getAverageProbesPerLookup() {
public double getAvgHashProbeBucketListIterations() {
return (1.0 * numProbes) / numKeyLookups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import com.google.common.io.Closeables;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.internal.config.ConfigEntry;
import org.apache.spark.io.NioBufferedFileInputStream;
import org.apache.spark.io.ReadAheadInputStream;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.unsafe.Platform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;

Expand All @@ -36,9 +36,7 @@
* of the file format).
*/
public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB
private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
public static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb

private InputStream in;
private DataInputStream din;
Expand All @@ -59,28 +57,23 @@ public UnsafeSorterSpillReader(
File file,
BlockId blockId) throws IOException {
assert (file.length() > 0);
long bufferSizeBytes =
SparkEnv.get() == null ?
DEFAULT_BUFFER_SIZE_BYTES:
SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size",
DEFAULT_BUFFER_SIZE_BYTES);
if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) {
// fall back to a sane default value
logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " +
"allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes,
DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES);
bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
}
final ConfigEntry<Object> bufferSizeConfigEntry =
package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
// This value must be less than or equal to MAX_BUFFER_SIZE_BYTES. Cast to int is always safe.
final int DEFAULT_BUFFER_SIZE_BYTES =
((Long) bufferSizeConfigEntry.defaultValue().get()).intValue();
int bufferSizeBytes = SparkEnv.get() == null ? DEFAULT_BUFFER_SIZE_BYTES :
((Long) SparkEnv.get().conf().get(bufferSizeConfigEntry)).intValue();

final boolean readAheadEnabled = SparkEnv.get() != null &&
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true);
final boolean readAheadEnabled = SparkEnv.get() != null && (boolean)SparkEnv.get().conf().get(
package$.MODULE$.UNSAFE_SORTER_SPILL_READ_AHEAD_ENABLED());

final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
new NioBufferedFileInputStream(file, bufferSizeBytes);
try {
if (readAheadEnabled) {
this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
(int) bufferSizeBytes);
bufferSizeBytes);
} else {
this.in = serializerManager.wrapStream(blockId, bs);
}
Expand Down
Loading