Skip to content

Commit 4b174ca

Browse files
committed
Shivaram's code review comment.
1 parent 4a3dfe7 commit 4b174ca

3 files changed

Lines changed: 55 additions & 11 deletions

File tree

core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,19 @@ class BlockHeaderEncoder extends MessageToByteEncoder[BlockHeader] {
2929
override def encode(ctx: ChannelHandlerContext, msg: BlockHeader, out: ByteBuf): Unit = {
3030
// message = message length (4 bytes) + block id length (4 bytes) + block id + block data
3131
// message length = block id length (4 bytes) + size of block id + size of block data
32-
val blockId = msg.blockId.getBytes
32+
val blockIdBytes = msg.blockId.getBytes
3333
msg.error match {
3434
case Some(errorMsg) =>
3535
val errorBytes = errorMsg.getBytes
36-
out.writeInt(4 + blockId.length + errorBytes.size)
37-
out.writeInt(-blockId.length)
38-
out.writeBytes(blockId)
39-
out.writeBytes(errorBytes)
36+
out.writeInt(4 + blockIdBytes.length + errorBytes.size)
37+
out.writeInt(-blockIdBytes.length) // use negative block id length to represent errors
38+
out.writeBytes(blockIdBytes) // next is blockId itself
39+
out.writeBytes(errorBytes) // error message
4040
case None =>
41-
val blockId = msg.blockId.getBytes
42-
out.writeInt(4 + blockId.length + msg.blockSize)
43-
out.writeInt(blockId.length)
44-
out.writeBytes(blockId)
41+
out.writeInt(4 + blockIdBytes.length + msg.blockSize)
42+
out.writeInt(blockIdBytes.length) // First 4 bytes is blockId length
43+
out.writeBytes(blockIdBytes) // next is blockId itself
44+
// msg of size blockSize will be written by ServerHandler
4545
}
4646
}
4747
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.netty.server
19+
20+
import io.netty.channel.ChannelInitializer
21+
import io.netty.channel.socket.SocketChannel
22+
import io.netty.handler.codec.LineBasedFrameDecoder
23+
import io.netty.handler.codec.string.StringDecoder
24+
import io.netty.util.CharsetUtil
25+
import org.apache.spark.storage.BlockDataProvider
26+
27+
28+
/** Channel initializer that sets up the pipeline for the BlockServer. */
29+
private[netty]
30+
class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
31+
extends ChannelInitializer[SocketChannel] {
32+
33+
override def initChannel(ch: SocketChannel): Unit = {
34+
ch.pipeline
35+
.addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024
36+
.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
37+
.addLast("blockHeaderEncoder", new BlockHeaderEncoder)
38+
.addLast("handler", new BlockServerHandler(dataProvider))
39+
}
40+
}

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ object BlockFetcherIterator {
277277

278278
bytesInFlight += req.size
279279
val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
280+
281+
// This could throw a TimeoutException. In that case we will just retry the task.
280282
val client = blockManager.nettyBlockClientFactory.createClient(
281283
cmId.host, req.address.nettyPort)
282284
val blocks = req.blocks.map(_._1.toString)
@@ -304,8 +306,10 @@ object BlockFetcherIterator {
304306
}
305307
}))
306308

307-
readMetrics.remoteBytesRead += blockSize
308-
readMetrics.remoteBlocksFetched += 1
309+
readMetrics.synchronized {
310+
readMetrics.remoteBytesRead += blockSize
311+
readMetrics.remoteBlocksFetched += 1
312+
}
309313
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
310314
},
311315
(blockId: String, errorMsg: String) => {

0 commit comments

Comments
 (0)