Skip to content

Commit 034acb4

Browse files
committed
fix
1 parent 6d059f2 commit 034acb4

File tree

4 files changed

+21
-7
lines changed

4 files changed

+21
-7
lines changed

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public void onComplete(String streamId) throws IOException {
234234
callback.onSuccess(ByteBuffer.allocate(0));
235235
} catch (Exception ex) {
236236
IOException ioExc = new IOException("Failure post-processing complete stream;" +
237-
" failing this rpc and leaving channel active");
237+
" failing this rpc and leaving channel active", ex);
238238
callback.onFailure(ioExc);
239239
streamHandler.onFailure(streamId, ioExc);
240240
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlockStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
2929

3030
/**
31-
* A request to Upload a block, which the destintation should receive as a stream.
31+
* A request to Upload a block, which the destination should receive as a stream.
3232
*
3333
* The actual block data is not contained here. It will be passed to the StreamCallbackWithID
3434
* that is returned from RpcHandler.receiveStream()

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,12 @@ private[spark] class NettyBlockTransferService(
152152
val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
153153
val callback = new RpcResponseCallback {
154154
override def onSuccess(response: ByteBuffer): Unit = {
155-
logTrace(s"Successfully uploaded block $blockId${if (asStream) "as stream" else ""}")
155+
logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
156156
result.success((): Unit)
157157
}
158158

159159
override def onFailure(e: Throwable): Unit = {
160-
logError(s"Error while uploading $blockId${if (asStream) "as stream" else ""}", e)
160+
logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)
161161
result.failure(e)
162162
}
163163
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,10 @@ private[spark] class BlockManager(
413413
// to the final location, but that would require a deeper refactor of this code. So instead
414414
// we just write to a temp file, and call putBytes on the data in that file.
415415
val tmpFile = diskBlockManager.createTempLocalBlock()._2
416+
val channel = new CountingWritableChannel(
417+
Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile))))
418+
logTrace(s"Streaming block $blockId to tmp file $tmpFile")
416419
new StreamCallbackWithID {
417-
val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile))
418420

419421
override def getID: String = blockId.name
420422

@@ -425,15 +427,27 @@ private[spark] class BlockManager(
425427
}
426428

427429
override def onComplete(streamId: String): Unit = {
430+
logTrace(s"Done receiving block $blockId, now putting into local blockManager")
428431
// Read the contents of the downloaded file as a buffer to put into the blockManager.
429432
// Note this is all happening inside the netty thread as soon as it reads the end of the
430433
// stream.
431434
channel.close()
432435
// TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
433436
// using a lot of memory here. We won't get a jvm OOM, but might get killed by the
434437
// OS / cluster manager. We could at least read the tmp file as a stream.
435-
val buffer = ChunkedByteBuffer.map(tmpFile,
436-
conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
438+
val buffer = securityManager.getIOEncryptionKey() match {
439+
case Some(key) =>
440+
// we need to pass in the size of the unencrypted block
441+
val blockSize = channel.getCount
442+
val allocator = level.memoryMode match {
443+
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
444+
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
445+
}
446+
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
447+
448+
case None =>
449+
ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
450+
}
437451
putBytes(blockId, buffer, level)(classTag)
438452
tmpFile.delete()
439453
}

0 commit comments

Comments
 (0)