[SPARK-2468] Netty based block server / client module

This is a rewrite of the original Netty module that was added about 1.5 years 
ago. The old code was turned off by default and didn't really work because it 
lacked a frame decoder (only worked with very very small blocks).

For this pull request, I tried to make the changes non-instrusive to the rest 
of Spark. I only added an init and shutdown to BlockManager/DiskBlockManager, 
and a bunch of comments to help me understand the existing code base.

Compared with the old Netty module, this one features:
- It appears to work :)
- SPARK-2941: option to specicy nio vs oio vs epoll for channel/transport. By 
default nio is used. (Not using Epoll yet because I have found some bugs with 
its implementation)
- SPARK-2943: options to specify send buf and receive buf for users who want to 
do hyper tuning
- SPARK-2942: io errors are reported from server to client (the protocol uses 
negative length to indicate error)
- SPARK-2940: fetching multiple blocks in a single request to reduce syscalls
- SPARK-2959: clients share a single thread pool
- SPARK-2990: use PooledByteBufAllocator to reduce GC (basically a Netty 
managed pool of buffers with jmalloc)
- SPARK-2625: added fetchWaitTime metric and fixed thread-safety issue in 
metrics update.
- SPARK-2367: bump Netty version to 4.0.21.Final to address an Epoll bug 
(https://groups.google.com/forum/#!topic/netty/O7m-HxCJpCA)

Compared with the existing communication manager, this one features:
- IMO it is substantially easier to understand
- zero-copy send for the server for on-disk blocks
- one-copy receive (due to a frame decoder)
- don't quote me on this, but I think a lot less sys calls
- SPARK-2990: use PooledByteBufAllocator to reduce GC (basically a Netty 
managed pool of buffers with jmalloc)
- SPARK-2941: option to specicy nio vs oio vs epoll for channel/transport. By 
default nio is used. (Not using Epoll yet because I have found some bugs with 
its implementation)
- SPARK-2943: options to specify send buf and receive buf for users who want to 
do hyper tuning

TODOs before it can fully replace the existing ConnectionManager, if that ever 
happens (most of them should probably be done in separate PRs since this needs 
to be turned on explicitly)
- [x] Basic test cases
- [ ] More unit/integration tests for failures
- [ ] Performance analysis
- [ ] Support client connection reuse so we don't need to keep opening new 
connections (not sure how useful this would be)
- [ ] Support putting blocks in addition to fetching blocks (i.e. two way 
transfer)
- [x] Support serving non-disk blocks
- [ ] Support SASL authentication

For a more comprehensive list, see 
https://issues.apache.org/jira/browse/SPARK-2468

Thanks to @coderplay for peer coding with me on a Sunday.

Author: Reynold Xin <[email protected]>

Closes #1907 from rxin/netty and squashes the following commits:

f921421 [Reynold Xin] Upgrade Netty to 4.0.22.Final to fix another Epoll bug.
4b174ca [Reynold Xin] Shivaram's code review comment.
4a3dfe7 [Reynold Xin] Switched to nio for default (instead of epoll on Linux).
56bfb9d [Reynold Xin] Bump Netty version to 4.0.21.Final for some bug fixes.
b443a4b [Reynold Xin] Added debug message to help debug Jenkins failures.
57fc4d7 [Reynold Xin] Added test cases for BlockHeaderEncoder and 
BlockFetchingClientHandlerSuite.
22623e9 [Reynold Xin] Added exception handling and test case for 
BlockServerHandler and BlockFetchingClientHandler.
6550dd7 [Reynold Xin] Fixed block mgr init bug.
60c2edf [Reynold Xin] Beefed up server/client integration tests.
38d88d5 [Reynold Xin] Added missing test files.
6ce3f3c [Reynold Xin] Added some basic test cases.
47f7ce0 [Reynold Xin] Created server and client packages and moved files there.
b16f412 [Reynold Xin] Added commit count.
f13022d [Reynold Xin] Remove unused clone() in BlockFetcherIterator.
c57d68c [Reynold Xin] Added back missing files.
842dfa7 [Reynold Xin] Made everything work with proper reference counting.
3fae001 [Reynold Xin] Connected the new netty network module with rest of Spark.
1a8f6d4 [Reynold Xin] Completed protocol documentation.
2951478 [Reynold Xin] New Netty implementation.
cc7843d [Reynold Xin] Basic skeleton.

(cherry picked from commit 3a8b68b7353fea50245686903b308fa9eb52cb51)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f23d2a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f23d2a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f23d2a3

Branch: refs/heads/branch-1.1
Commit: 3f23d2a38c3b6559902bc2ab6975ff6b0bec875e
Parents: d3cce58
Author: Reynold Xin <[email protected]>
Authored: Thu Aug 14 19:01:33 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Thu Aug 14 19:10:58 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/network/netty/FileClient.scala |   85 --
 .../netty/FileClientChannelInitializer.scala    |   31 -
 .../spark/network/netty/FileClientHandler.scala |   50 -
 .../apache/spark/network/netty/FileHeader.scala |   71 -
 .../apache/spark/network/netty/FileServer.scala |   91 --
 .../netty/FileServerChannelInitializer.scala    |   34 -
 .../spark/network/netty/FileServerHandler.scala |   68 -
 .../spark/network/netty/NettyConfig.scala       |   59 +
 .../spark/network/netty/ShuffleCopier.scala     |  118 --
 .../spark/network/netty/ShuffleSender.scala     |   71 -
 .../netty/client/BlockFetchingClient.scala      |  135 ++
 .../client/BlockFetchingClientFactory.scala     |   99 ++
 .../client/BlockFetchingClientHandler.scala     |   63 +
 .../network/netty/client/LazyInitIterator.scala |   44 +
 .../netty/client/ReferenceCountedBuffer.scala   |   47 +
 .../network/netty/server/BlockHeader.scala      |   32 +
 .../netty/server/BlockHeaderEncoder.scala       |   47 +
 .../network/netty/server/BlockServer.scala      |  162 ++
 .../server/BlockServerChannelInitializer.scala  |   40 +
 .../netty/server/BlockServerHandler.scala       |  140 ++
 .../spark/storage/BlockDataProvider.scala       |   32 +
 .../spark/storage/BlockFetcherIterator.scala    |  138 +-
 .../org/apache/spark/storage/BlockManager.scala |   49 +-
 .../spark/storage/BlockNotFoundException.scala  |   21 +
 .../apache/spark/storage/DiskBlockManager.scala |   13 +-
 core/src/test/resources/netty-test-file.txt     | 1379 ++++++++++++++++++
 .../netty/ServerClientIntegrationSuite.scala    |  158 ++
 .../BlockFetchingClientHandlerSuite.scala       |   87 ++
 .../netty/server/BlockHeaderEncoderSuite.scala  |   64 +
 .../netty/server/BlockServerHandlerSuite.scala  |  101 ++
 pom.xml                                         |    2 +-
 31 files changed, 2817 insertions(+), 714 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
deleted file mode 100644
index c6d35f7..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.util.concurrent.TimeUnit
-
-import io.netty.bootstrap.Bootstrap
-import io.netty.channel.{Channel, ChannelOption, EventLoopGroup}
-import io.netty.channel.oio.OioEventLoopGroup
-import io.netty.channel.socket.oio.OioSocketChannel
-
-import org.apache.spark.Logging
-
-class FileClient(handler: FileClientHandler, connectTimeout: Int) extends 
Logging {
-
-  private var channel: Channel = _
-  private var bootstrap: Bootstrap = _
-  private var group: EventLoopGroup = _
-  private val sendTimeout = 60
-
-  def init(): Unit = {
-    group = new OioEventLoopGroup
-    bootstrap = new Bootstrap
-    bootstrap.group(group)
-      .channel(classOf[OioSocketChannel])
-      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
-      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
Integer.valueOf(connectTimeout))
-      .handler(new FileClientChannelInitializer(handler))
-  }
-
-  def connect(host: String, port: Int) {
-    try {
-      channel = bootstrap.connect(host, port).sync().channel()
-    } catch {
-      case e: InterruptedException =>
-        logWarning("FileClient interrupted while trying to connect", e)
-        close()
-    }
-  }
-
-  def waitForClose(): Unit = {
-    try {
-      channel.closeFuture.sync()
-    } catch {
-      case e: InterruptedException =>
-        logWarning("FileClient interrupted", e)
-    }
-  }
-
-  def sendRequest(file: String): Unit = {
-    try {
-      val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, 
TimeUnit.SECONDS)
-      if (!bSent) {
-        throw new RuntimeException("Failed to send")
-      }
-    } catch {
-      case e: InterruptedException =>
-        logError("Error", e)
-    }
-  }
-
-  def close(): Unit = {
-    if (group != null) {
-      group.shutdownGracefully()
-      group = null
-      bootstrap = null
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
deleted file mode 100644
index f4261c1..0000000
--- 
a/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import io.netty.channel.ChannelInitializer
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.string.StringEncoder
-
-
-class FileClientChannelInitializer(handler: FileClientHandler)
-  extends ChannelInitializer[SocketChannel] {
-
-  def initChannel(channel: SocketChannel) {
-    channel.pipeline.addLast("encoder", new StringEncoder).addLast("handler", 
handler)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
deleted file mode 100644
index 017302e..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import io.netty.buffer.ByteBuf
-import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
-
-import org.apache.spark.storage.BlockId
-
-
-abstract class FileClientHandler extends SimpleChannelInboundHandler[ByteBuf] {
-
-  private var currentHeader: FileHeader = null
-
-  @volatile
-  private var handlerCalled: Boolean = false
-
-  def isComplete: Boolean = handlerCalled
-
-  def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader)
-
-  def handleError(blockId: BlockId)
-
-  override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
-    if (currentHeader == null && in.readableBytes >= FileHeader.HEADER_SIZE) {
-      currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE))
-    }
-    if (in.readableBytes >= currentHeader.fileLen) {
-      handle(ctx, in, currentHeader)
-      handlerCalled = true
-      currentHeader = null
-      ctx.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
deleted file mode 100644
index 607e560..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import io.netty.buffer._
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.{BlockId, TestBlockId}
-
-private[spark] class FileHeader (
-  val fileLen: Int,
-  val blockId: BlockId) extends Logging {
-
-  lazy val buffer: ByteBuf = {
-    val buf = Unpooled.buffer()
-    buf.capacity(FileHeader.HEADER_SIZE)
-    buf.writeInt(fileLen)
-    buf.writeInt(blockId.name.length)
-    blockId.name.foreach((x: Char) => buf.writeByte(x))
-    // padding the rest of header
-    if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
-      buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
-    } else {
-      throw new Exception("too long header " + buf.readableBytes)
-      logInfo("too long header")
-    }
-    buf
-  }
-
-}
-
-private[spark] object FileHeader {
-
-  val HEADER_SIZE = 40
-
-  def getFileLenOffset = 0
-  def getFileLenSize = Integer.SIZE/8
-
-  def create(buf: ByteBuf): FileHeader = {
-    val length = buf.readInt
-    val idLength = buf.readInt
-    val idBuilder = new StringBuilder(idLength)
-    for (i <- 1 to idLength) {
-      idBuilder += buf.readByte().asInstanceOf[Char]
-    }
-    val blockId = BlockId(idBuilder.toString())
-    new FileHeader(length, blockId)
-  }
-
-  def main(args:Array[String]) {
-    val header = new FileHeader(25, TestBlockId("my_block"))
-    val buf = header.buffer
-    val newHeader = FileHeader.create(buf)
-    System.out.println("id=" + newHeader.blockId + ",size=" + 
newHeader.fileLen)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
deleted file mode 100644
index dff7795..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.net.InetSocketAddress
-
-import io.netty.bootstrap.ServerBootstrap
-import io.netty.channel.{ChannelFuture, ChannelOption, EventLoopGroup}
-import io.netty.channel.oio.OioEventLoopGroup
-import io.netty.channel.socket.oio.OioServerSocketChannel
-
-import org.apache.spark.Logging
-
-/**
- * Server that accept the path of a file an echo back its content.
- */
-class FileServer(pResolver: PathResolver, private var port: Int) extends 
Logging {
-
-  private val addr: InetSocketAddress = new InetSocketAddress(port)
-  private var bossGroup: EventLoopGroup = new OioEventLoopGroup
-  private var workerGroup: EventLoopGroup = new OioEventLoopGroup
-
-  private var channelFuture: ChannelFuture = {
-    val bootstrap = new ServerBootstrap
-    bootstrap.group(bossGroup, workerGroup)
-      .channel(classOf[OioServerSocketChannel])
-      .option(ChannelOption.SO_BACKLOG, java.lang.Integer.valueOf(100))
-      .option(ChannelOption.SO_RCVBUF, java.lang.Integer.valueOf(1500))
-      .childHandler(new FileServerChannelInitializer(pResolver))
-    bootstrap.bind(addr)
-  }
-
-  try {
-    val boundAddress = 
channelFuture.sync.channel.localAddress.asInstanceOf[InetSocketAddress]
-    port = boundAddress.getPort
-  } catch {
-    case ie: InterruptedException =>
-      port = 0
-  }
-
-  /** Start the file server asynchronously in a new thread. */
-  def start(): Unit = {
-    val blockingThread: Thread = new Thread {
-      override def run(): Unit = {
-        try {
-          channelFuture.channel.closeFuture.sync
-          logInfo("FileServer exiting")
-        } catch {
-          case e: InterruptedException =>
-            logError("File server start got interrupted", e)
-        }
-        // NOTE: bootstrap is shutdown in stop()
-      }
-    }
-    blockingThread.setDaemon(true)
-    blockingThread.start()
-  }
-
-  def getPort: Int = port
-
-  def stop(): Unit = {
-    if (channelFuture != null) {
-      channelFuture.channel().close().awaitUninterruptibly()
-      channelFuture = null
-    }
-    if (bossGroup != null) {
-      bossGroup.shutdownGracefully()
-      bossGroup = null
-    }
-    if (workerGroup != null) {
-      workerGroup.shutdownGracefully()
-      workerGroup = null
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
deleted file mode 100644
index aaa2f91..0000000
--- 
a/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import io.netty.channel.ChannelInitializer
-import io.netty.channel.socket.SocketChannel
-import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
-import io.netty.handler.codec.string.StringDecoder
-
-class FileServerChannelInitializer(pResolver: PathResolver)
-  extends ChannelInitializer[SocketChannel] {
-
-  override def initChannel(channel: SocketChannel): Unit = {
-    channel.pipeline
-      .addLast("framer", new DelimiterBasedFrameDecoder(8192, 
Delimiters.lineDelimiter : _*))
-      .addLast("stringDecoder", new StringDecoder)
-      .addLast("handler", new FileServerHandler(pResolver))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala 
b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
deleted file mode 100644
index 96f60b2..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.io.FileInputStream
-
-import io.netty.channel.{DefaultFileRegion, ChannelHandlerContext, 
SimpleChannelInboundHandler}
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.{BlockId, FileSegment}
-
-
-class FileServerHandler(pResolver: PathResolver)
-  extends SimpleChannelInboundHandler[String] with Logging {
-
-  override def channelRead0(ctx: ChannelHandlerContext, blockIdString: 
String): Unit = {
-    val blockId: BlockId = BlockId(blockIdString)
-    val fileSegment: FileSegment = pResolver.getBlockLocation(blockId)
-    if (fileSegment == null) {
-      return
-    }
-    val file = fileSegment.file
-    if (file.exists) {
-      if (!file.isFile) {
-        ctx.write(new FileHeader(0, blockId).buffer)
-        ctx.flush()
-        return
-      }
-      val length: Long = fileSegment.length
-      if (length > Integer.MAX_VALUE || length <= 0) {
-        ctx.write(new FileHeader(0, blockId).buffer)
-        ctx.flush()
-        return
-      }
-      ctx.write(new FileHeader(length.toInt, blockId).buffer)
-      try {
-        val channel = new FileInputStream(file).getChannel
-        ctx.write(new DefaultFileRegion(channel, fileSegment.offset, 
fileSegment.length))
-      } catch {
-        case e: Exception =>
-          logError("Exception: ", e)
-      }
-    } else {
-      ctx.write(new FileHeader(0, blockId).buffer)
-    }
-    ctx.flush()
-  }
-
-  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): 
Unit = {
-    logError("Exception: ", cause)
-    ctx.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala 
b/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
new file mode 100644
index 0000000..b587015
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyConfig.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty
+
+import org.apache.spark.SparkConf
+
+/**
+ * A central location that tracks all the settings we exposed to users.
+ */
+private[spark]
+class NettyConfig(conf: SparkConf) {
+
+  /** Port the server listens on. Default to a random port. */
+  private[netty] val serverPort = conf.getInt("spark.shuffle.io.port", 0)
+
+  /** IO mode: nio, oio, epoll, or auto (try epoll first and then nio). */
+  private[netty] val ioMode = conf.get("spark.shuffle.io.mode", 
"nio").toLowerCase
+
+  /** Connect timeout in secs. Default 60 secs. */
+  private[netty] val connectTimeoutMs = 
conf.getInt("spark.shuffle.io.connectionTimeout", 60) * 1000
+
+  /**
+   * Percentage of the desired amount of time spent for I/O in the child event 
loops.
+   * Only applicable in nio and epoll.
+   */
+  private[netty] val ioRatio = conf.getInt("spark.shuffle.io.netty.ioRatio", 
80)
+
+  /** Requested maximum length of the queue of incoming connections. */
+  private[netty] val backLog: Option[Int] = 
conf.getOption("spark.shuffle.io.backLog").map(_.toInt)
+
+  /**
+   * Receive buffer size (SO_RCVBUF).
+   * Note: the optimal size for receive buffer and send buffer should be
+   *  latency * network_bandwidth.
+   * Assuming latency = 1ms, network_bandwidth = 10Gbps
+   *  buffer size should be ~ 1.25MB
+   */
+  private[netty] val receiveBuf: Option[Int] =
+    conf.getOption("spark.shuffle.io.sendBuffer").map(_.toInt)
+
+  /** Send buffer size (SO_SNDBUF). */
+  private[netty] val sendBuf: Option[Int] =
+    conf.getOption("spark.shuffle.io.sendBuffer").map(_.toInt)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala 
b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
deleted file mode 100644
index e7b2855..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.util.concurrent.Executors
-
-import scala.collection.JavaConverters._
-
-import io.netty.buffer.ByteBuf
-import io.netty.channel.ChannelHandlerContext
-import io.netty.util.CharsetUtil
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.network.ConnectionManagerId
-import org.apache.spark.storage.BlockId
-
-private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
-
-  def getBlock(host: String, port: Int, blockId: BlockId,
-      resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
-
-    val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
-    val connectTimeout = conf.getInt("spark.shuffle.netty.connect.timeout", 
60000)
-    val fc = new FileClient(handler, connectTimeout)
-
-    try {
-      fc.init()
-      fc.connect(host, port)
-      fc.sendRequest(blockId.name)
-      fc.waitForClose()
-      fc.close()
-    } catch {
-      // Handle any socket-related exceptions in FileClient
-      case e: Exception => {
-        logError("Shuffle copy of block " + blockId + " from " + host + ":" + 
port + " failed", e)
-        handler.handleError(blockId)
-      }
-    }
-  }
-
-  def getBlock(cmId: ConnectionManagerId, blockId: BlockId,
-      resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
-    getBlock(cmId.host, cmId.port, blockId, resultCollectCallback)
-  }
-
-  def getBlocks(cmId: ConnectionManagerId,
-    blocks: Seq[(BlockId, Long)],
-    resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
-
-    for ((blockId, size) <- blocks) {
-      getBlock(cmId, blockId, resultCollectCallback)
-    }
-  }
-}
-
-
-private[spark] object ShuffleCopier extends Logging {
-
-  private class ShuffleClientHandler(resultCollectCallBack: (BlockId, Long, 
ByteBuf) => Unit)
-    extends FileClientHandler with Logging {
-
-    override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: 
FileHeader) {
-      logDebug("Received Block: " + header.blockId + " (" + header.fileLen + 
"B)")
-      resultCollectCallBack(header.blockId, header.fileLen.toLong, 
in.readBytes(header.fileLen))
-    }
-
-    override def handleError(blockId: BlockId) {
-      if (!isComplete) {
-        resultCollectCallBack(blockId, -1, null)
-      }
-    }
-  }
-
-  def echoResultCollectCallBack(blockId: BlockId, size: Long, content: 
ByteBuf) {
-    if (size != -1) {
-      logInfo("File: " + blockId + " content is : \" " + 
content.toString(CharsetUtil.UTF_8) + "\"")
-    }
-  }
-
-  def main(args: Array[String]) {
-    if (args.length < 3) {
-      System.err.println("Usage: ShuffleCopier <host> <port> 
<shuffle_block_id> <threads>")
-      System.exit(1)
-    }
-    val host = args(0)
-    val port = args(1).toInt
-    val blockId = BlockId(args(2))
-    val threads = if (args.length > 3) args(3).toInt else 10
-
-    val copiers = Executors.newFixedThreadPool(80)
-    val tasks = (for (i <- Range(0, threads)) yield {
-      Executors.callable(new Runnable() {
-        def run() {
-          val copier = new ShuffleCopier(new SparkConf)
-          copier.getBlock(host, port, blockId, echoResultCollectCallBack)
-        }
-      })
-    }).asJava
-    copiers.invokeAll(tasks)
-    copiers.shutdown()
-    System.exit(0)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala 
b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
deleted file mode 100644
index 95958e3..0000000
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.netty
-
-import java.io.File
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-import org.apache.spark.storage.{BlockId, FileSegment}
-
-private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) 
extends Logging {
-
-  val server = new FileServer(pResolver, portIn)
-  server.start()
-
-  def stop() {
-    server.stop()
-  }
-
-  def port: Int = server.getPort
-}
-
-
-/**
- * An application for testing the shuffle sender as a standalone program.
- */
-private[spark] object ShuffleSender {
-
-  def main(args: Array[String]) {
-    if (args.length < 3) {
-      System.err.println(
-        "Usage: ShuffleSender <port> <subDirsPerLocalDir> <list of 
shuffle_block_directories>")
-      System.exit(1)
-    }
-
-    val port = args(0).toInt
-    val subDirsPerLocalDir = args(1).toInt
-    val localDirs = args.drop(2).map(new File(_))
-
-    val pResovler = new PathResolver {
-      override def getBlockLocation(blockId: BlockId): FileSegment = {
-        if (!blockId.isShuffle) {
-          throw new Exception("Block " + blockId + " is not a shuffle block")
-        }
-        // Figure out which local directory it hashes to, and which 
subdirectory in that
-        val hash = Utils.nonNegativeHash(blockId)
-        val dirId = hash % localDirs.length
-        val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
-        val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
-        val file = new File(subDir, blockId.name)
-        new FileSegment(file, 0, file.length())
-      }
-    }
-    val sender = new ShuffleSender(port, pResovler)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
new file mode 100644
index 0000000..9fed11b
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClient.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.client
+
+import java.util.concurrent.TimeoutException
+
+import io.netty.bootstrap.Bootstrap
+import io.netty.buffer.PooledByteBufAllocator
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.{ChannelFutureListener, ChannelFuture, 
ChannelInitializer, ChannelOption}
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder
+import io.netty.handler.codec.string.StringEncoder
+import io.netty.util.CharsetUtil
+
+import org.apache.spark.Logging
+
+/**
+ * Client for fetching data blocks from 
[[org.apache.spark.network.netty.server.BlockServer]].
+ * Use [[BlockFetchingClientFactory]] to instantiate this client.
+ *
+ * The constructor blocks until a connection is successfully established.
+ *
+ * See [[org.apache.spark.network.netty.server.BlockServer]] for client/server 
protocol.
+ *
+ * Concurrency: [[BlockFetchingClient]] is not thread safe and should not be 
shared.
+ */
+@throws[TimeoutException]
+private[spark]
+class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: 
String, port: Int)
+  extends Logging {
+
+  val handler = new BlockFetchingClientHandler
+
+  /** Netty Bootstrap for creating the TCP connection. */
+  private val bootstrap: Bootstrap = {
+    val b = new Bootstrap
+    b.group(factory.workerGroup)
+      .channel(factory.socketChannelClass)
+      // Use pooled buffers to reduce temporary buffer allocation
+      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      // Disable Nagle's Algorithm since we don't want packets to wait
+      .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
+      .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
+      .option[Integer](ChannelOption.CONNECT_TIMEOUT_MILLIS, 
factory.conf.connectTimeoutMs)
+
+    b.handler(new ChannelInitializer[SocketChannel] {
+      override def initChannel(ch: SocketChannel): Unit = {
+        ch.pipeline
+          .addLast("encoder", new StringEncoder(CharsetUtil.UTF_8))
+          // maxFrameLength = 2G, lengthFieldOffset = 0, lengthFieldLength = 4
+          .addLast("framedLengthDecoder", new 
LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4))
+          .addLast("handler", handler)
+      }
+    })
+    b
+  }
+
+  /** Netty ChannelFuture for the connection. */
+  private val cf: ChannelFuture = bootstrap.connect(hostname, port)
+  if (!cf.awaitUninterruptibly(factory.conf.connectTimeoutMs)) {
+    throw new TimeoutException(
+      s"Connecting to $hostname:$port timed out 
(${factory.conf.connectTimeoutMs} ms)")
+  }
+
+  /**
+   * Ask the remote server for a sequence of blocks, and execute the callback.
+   *
+   * Note that this is asynchronous and returns immediately. Upstream caller 
should throttle the
+   * rate of fetching; otherwise we could run out of memory.
+   *
+   * @param blockIds sequence of block ids to fetch.
+   * @param blockFetchSuccessCallback callback function when a block is 
successfully fetched.
+   *                                  First argument is the block id, and 
second argument is the
+   *                                  raw data in a ByteBuffer.
+   * @param blockFetchFailureCallback callback function when we failed to 
fetch any of the blocks.
+   *                                  First argument is the block id, and 
second argument is the
+   *                                  error message.
+   */
+  def fetchBlocks(
+      blockIds: Seq[String],
+      blockFetchSuccessCallback: (String, ReferenceCountedBuffer) => Unit,
+      blockFetchFailureCallback: (String, String) => Unit): Unit = {
+    // It's best to limit the number of "write" calls since it needs to 
traverse the whole pipeline.
+    // It's also best to limit the number of "flush" calls since it requires 
system calls.
+    // Let's concatenate the string and then call writeAndFlush once.
+    // This is also why this implementation might be more efficient than 
multiple, separate
+    // fetch block calls.
+    var startTime: Long = 0
+    logTrace {
+      startTime = System.nanoTime
+      s"Sending request $blockIds to $hostname:$port"
+    }
+
+    // TODO: This is not the most elegant way to handle this ...
+    handler.blockFetchSuccessCallback = blockFetchSuccessCallback
+    handler.blockFetchFailureCallback = blockFetchFailureCallback
+
+    val writeFuture = cf.channel().writeAndFlush(blockIds.mkString("\n") + 
"\n")
+    writeFuture.addListener(new ChannelFutureListener {
+      override def operationComplete(future: ChannelFuture): Unit = {
+        if (future.isSuccess) {
+          logTrace {
+            val timeTaken = (System.nanoTime - startTime).toDouble / 1000000
+            s"Sending request $blockIds to $hostname:$port took $timeTaken ms"
+          }
+        } else {
+          // Fail all blocks.
+          logError(s"Failed to send request $blockIds to $hostname:$port", 
future.cause)
+          blockIds.foreach(blockFetchFailureCallback(_, 
future.cause.getMessage))
+        }
+      }
+    })
+  }
+
+  def waitForClose(): Unit = {
+    cf.channel().closeFuture().sync()
+  }
+
+  def close(): Unit = cf.channel().close()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
new file mode 100644
index 0000000..2b28402
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientFactory.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.client
+
+import io.netty.channel.epoll.{EpollEventLoopGroup, EpollSocketChannel}
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.nio.NioSocketChannel
+import io.netty.channel.socket.oio.OioSocketChannel
+import io.netty.channel.{EventLoopGroup, Channel}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.network.netty.NettyConfig
+import org.apache.spark.util.Utils
+
+/**
+ * Factory for creating [[BlockFetchingClient]] by using createClient. This 
factory reuses
+ * the worker thread pool for Netty.
+ *
+ * Concurrency: createClient is safe to be called from multiple threads 
concurrently.
+ */
+private[spark]
+class BlockFetchingClientFactory(val conf: NettyConfig) {
+
+  def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))
+
+  /** A thread factory so the threads are named (for debugging). */
+  val threadFactory = Utils.namedThreadFactory("spark-shuffle-client")
+
+  /** The following two are instantiated by the [[init]] method, depending 
ioMode. */
+  var socketChannelClass: Class[_ <: Channel] = _
+  var workerGroup: EventLoopGroup = _
+
+  init()
+
+  /** Initialize [[socketChannelClass]] and [[workerGroup]] based on ioMode. */
+  private def init(): Unit = {
+    def initOio(): Unit = {
+      socketChannelClass = classOf[OioSocketChannel]
+      workerGroup = new OioEventLoopGroup(0, threadFactory)
+    }
+    def initNio(): Unit = {
+      socketChannelClass = classOf[NioSocketChannel]
+      workerGroup = new NioEventLoopGroup(0, threadFactory)
+    }
+    def initEpoll(): Unit = {
+      socketChannelClass = classOf[EpollSocketChannel]
+      workerGroup = new EpollEventLoopGroup(0, threadFactory)
+    }
+
+    conf.ioMode match {
+      case "nio" => initNio()
+      case "oio" => initOio()
+      case "epoll" => initEpoll()
+      case "auto" =>
+        // For auto mode, first try epoll (only available on Linux), then nio.
+        try {
+          initEpoll()
+        } catch {
+          // TODO: Should we log the throwable? But that always happen on 
non-Linux systems.
+          // Perhaps the right thing to do is to check whether the system is 
Linux, and then only
+          // call initEpoll on Linux.
+          case e: Throwable => initNio()
+        }
+    }
+  }
+
+  /**
+   * Create a new BlockFetchingClient connecting to the given remote host / 
port.
+   *
+   * This blocks until a connection is successfully established.
+   *
+   * Concurrency: This method is safe to call from multiple threads.
+   */
+  def createClient(remoteHost: String, remotePort: Int): BlockFetchingClient = 
{
+    new BlockFetchingClient(this, remoteHost, remotePort)
+  }
+
+  def stop(): Unit = {
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
new file mode 100644
index 0000000..a1dbf61
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/client/BlockFetchingClientHandler.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.client
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
+
+import org.apache.spark.Logging
+
+
+/**
+ * Handler that processes server responses. It uses the protocol documented in
+ * [[org.apache.spark.network.netty.server.BlockServer]].
+ */
+private[client]
+class BlockFetchingClientHandler extends SimpleChannelInboundHandler[ByteBuf] 
with Logging {
+
+  var blockFetchSuccessCallback: (String, ReferenceCountedBuffer) => Unit = _
+  var blockFetchFailureCallback: (String, String) => Unit = _
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): 
Unit = {
+    logError(s"Exception in connection from ${ctx.channel.remoteAddress}", 
cause)
+    ctx.close()
+  }
+
+  override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
+    val totalLen = in.readInt()
+    val blockIdLen = in.readInt()
+    val blockIdBytes = new Array[Byte](math.abs(blockIdLen))
+    in.readBytes(blockIdBytes)
+    val blockId = new String(blockIdBytes)
+    val blockSize = totalLen - math.abs(blockIdLen) - 4
+
+    def server = ctx.channel.remoteAddress.toString
+
+    // blockIdLen is negative when it is an error message.
+    if (blockIdLen < 0) {
+      val errorMessageBytes = new Array[Byte](blockSize)
+      in.readBytes(errorMessageBytes)
+      val errorMsg = new String(errorMessageBytes)
+      logTrace(s"Received block $blockId ($blockSize B) with error $errorMsg 
from $server")
+      blockFetchFailureCallback(blockId, errorMsg)
+    } else {
+      logTrace(s"Received block $blockId ($blockSize B) from $server")
+      blockFetchSuccessCallback(blockId, new ReferenceCountedBuffer(in))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
new file mode 100644
index 0000000..9740ee6
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/client/LazyInitIterator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.client
+
+/**
+ * A simple iterator that lazily initializes the underlying iterator.
+ *
+ * The use case is that sometimes we might have many iterators open at the 
same time, and each of
+ * the iterator might initialize its own buffer (e.g. decompression buffer, 
deserialization buffer).
+ * This could lead to too many buffers open. If this iterator is used, we 
lazily initialize those
+ * buffers.
+ */
+private[spark]
+class LazyInitIterator(createIterator: => Iterator[Any]) extends Iterator[Any] 
{
+
+  lazy val proxy = createIterator
+
+  override def hasNext: Boolean = {
+    val gotNext = proxy.hasNext
+    if (!gotNext) {
+      close()
+    }
+    gotNext
+  }
+
+  override def next(): Any = proxy.next()
+
+  def close(): Unit = Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
new file mode 100644
index 0000000..ea1abf5
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/client/ReferenceCountedBuffer.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.client
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+
+import io.netty.buffer.{ByteBuf, ByteBufInputStream}
+
+
+/**
+ * A buffer abstraction based on Netty's ByteBuf so we don't expose Netty.
+ * This is a Scala value class.
+ *
+ * The buffer's life cycle is NOT managed by the JVM, and thus requiring 
explicit declaration of
+ * reference by the retain method and release method.
+ */
+private[spark]
+class ReferenceCountedBuffer(val underlying: ByteBuf) extends AnyVal {
+
+  /** Return the nio ByteBuffer view of the underlying buffer. */
+  def byteBuffer(): ByteBuffer = underlying.nioBuffer
+
+  /** Creates a new input stream that starts from the current position of the 
buffer. */
+  def inputStream(): InputStream = new ByteBufInputStream(underlying)
+
+  /** Increment the reference counter by one. */
+  def retain(): Unit = underlying.retain()
+
+  /** Decrement the reference counter by one and release the buffer if the ref 
count is 0. */
+  def release(): Unit = underlying.release()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
new file mode 100644
index 0000000..162e9cc
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeader.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.server
+
+/**
+ * Header describing a block. This is used only in the server pipeline.
+ *
+ * [[BlockServerHandler]] creates this, and [[BlockHeaderEncoder]] encodes it.
+ *
+ * @param blockSize length of the block content, excluding the length itself.
+ *                 If positive, this is the header for a block (not part of 
the header).
+ *                 If negative, this is the header and content for an error 
message.
+ * @param blockId block id
+ * @param error some error message from reading the block
+ */
+private[server]
+class BlockHeader(val blockSize: Int, val blockId: String, val error: 
Option[String] = None)

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
new file mode 100644
index 0000000..8e4dda4
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockHeaderEncoder.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.server
+
+import io.netty.buffer.ByteBuf
+import io.netty.channel.ChannelHandlerContext
+import io.netty.handler.codec.MessageToByteEncoder
+
+/**
+ * A simple encoder for BlockHeader. See [[BlockServer]] for the server to 
client protocol.
+ */
+private[server]
+class BlockHeaderEncoder extends MessageToByteEncoder[BlockHeader] {
+  override def encode(ctx: ChannelHandlerContext, msg: BlockHeader, out: 
ByteBuf): Unit = {
+    // message = message length (4 bytes) + block id length (4 bytes) + block 
id + block data
+    // message length = block id length (4 bytes) + size of block id + size of 
block data
+    val blockIdBytes = msg.blockId.getBytes
+    msg.error match {
+      case Some(errorMsg) =>
+        val errorBytes = errorMsg.getBytes
+        out.writeInt(4 + blockIdBytes.length + errorBytes.size)
+        out.writeInt(-blockIdBytes.length)  // use negative block id length to 
represent errors
+        out.writeBytes(blockIdBytes)  // next is blockId itself
+        out.writeBytes(errorBytes)  // error message
+      case None =>
+        out.writeInt(4 + blockIdBytes.length + msg.blockSize)
+        out.writeInt(blockIdBytes.length)  // First 4 bytes is blockId length
+        out.writeBytes(blockIdBytes)  // next is blockId itself
+        // msg of size blockSize will be written by ServerHandler
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
new file mode 100644
index 0000000..7b2f9a8
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServer.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.server
+
+import java.net.InetSocketAddress
+
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.PooledByteBufAllocator
+import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
+import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel}
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.oio.OioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.channel.socket.oio.OioServerSocketChannel
+import io.netty.handler.codec.LineBasedFrameDecoder
+import io.netty.handler.codec.string.StringDecoder
+import io.netty.util.CharsetUtil
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.network.netty.NettyConfig
+import org.apache.spark.storage.BlockDataProvider
+import org.apache.spark.util.Utils
+
+
+/**
+ * Server for serving Spark data blocks.
+ * This should be used together with 
[[org.apache.spark.network.netty.client.BlockFetchingClient]].
+ *
+ * Protocol for requesting blocks (client to server):
+ *   One block id per line, e.g. to request 3 blocks: 
"block1\nblock2\nblock3\n"
+ *
+ * Protocol for sending blocks (server to client):
+ *   frame-length (4 bytes), block-id-length (4 bytes), block-id, block-data.
+ *
+ *   frame-length should not include the length of itself.
+ *   If block-id-length is negative, then this is an error message rather than 
block-data. The real
+ *   length is the absolute value of the frame-length.
+ *
+ */
+private[spark]
+class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends 
Logging {
+
+  def this(sparkConf: SparkConf, dataProvider: BlockDataProvider) = {
+    this(new NettyConfig(sparkConf), dataProvider)
+  }
+
+  def port: Int = _port
+
+  def hostName: String = _hostName
+
+  private var _port: Int = conf.serverPort
+  private var _hostName: String = ""
+  private var bootstrap: ServerBootstrap = _
+  private var channelFuture: ChannelFuture = _
+
+  init()
+
+  /** Initialize the server. */
+  private def init(): Unit = {
+    bootstrap = new ServerBootstrap
+    val bossThreadFactory = 
Utils.namedThreadFactory("spark-shuffle-server-boss")
+    val workerThreadFactory = 
Utils.namedThreadFactory("spark-shuffle-server-worker")
+
+    // Use only one thread to accept connections, and 2 * num_cores for worker.
+    def initNio(): Unit = {
+      val bossGroup = new NioEventLoopGroup(1, bossThreadFactory)
+      val workerGroup = new NioEventLoopGroup(0, workerThreadFactory)
+      workerGroup.setIoRatio(conf.ioRatio)
+      bootstrap.group(bossGroup, 
workerGroup).channel(classOf[NioServerSocketChannel])
+    }
+    def initOio(): Unit = {
+      val bossGroup = new OioEventLoopGroup(1, bossThreadFactory)
+      val workerGroup = new OioEventLoopGroup(0, workerThreadFactory)
+      bootstrap.group(bossGroup, 
workerGroup).channel(classOf[OioServerSocketChannel])
+    }
+    def initEpoll(): Unit = {
+      val bossGroup = new EpollEventLoopGroup(1, bossThreadFactory)
+      val workerGroup = new EpollEventLoopGroup(0, workerThreadFactory)
+      workerGroup.setIoRatio(conf.ioRatio)
+      bootstrap.group(bossGroup, 
workerGroup).channel(classOf[EpollServerSocketChannel])
+    }
+
+    conf.ioMode match {
+      case "nio" => initNio()
+      case "oio" => initOio()
+      case "epoll" => initEpoll()
+      case "auto" =>
+        // For auto mode, first try epoll (only available on Linux), then nio.
+        try {
+          initEpoll()
+        } catch {
+          // TODO: Should we log the throwable? But that always happen on 
non-Linux systems.
+          // Perhaps the right thing to do is to check whether the system is 
Linux, and then only
+          // call initEpoll on Linux.
+          case e: Throwable => initNio()
+        }
+    }
+
+    // Use pooled buffers to reduce temporary buffer allocation
+    bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+    bootstrap.childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
+
+    // Various (advanced) user-configured settings.
+    conf.backLog.foreach { backLog =>
+      bootstrap.option[java.lang.Integer](ChannelOption.SO_BACKLOG, backLog)
+    }
+    conf.receiveBuf.foreach { receiveBuf =>
+      bootstrap.option[java.lang.Integer](ChannelOption.SO_RCVBUF, receiveBuf)
+    }
+    conf.sendBuf.foreach { sendBuf =>
+      bootstrap.option[java.lang.Integer](ChannelOption.SO_SNDBUF, sendBuf)
+    }
+
+    bootstrap.childHandler(new ChannelInitializer[SocketChannel] {
+      override def initChannel(ch: SocketChannel): Unit = {
+        ch.pipeline
+          .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max 
block id length 1024
+          .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
+          .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
+          .addLast("handler", new BlockServerHandler(dataProvider))
+      }
+    })
+
+    channelFuture = bootstrap.bind(new InetSocketAddress(_port))
+    channelFuture.sync()
+
+    val addr = 
channelFuture.channel.localAddress.asInstanceOf[InetSocketAddress]
+    _port = addr.getPort
+    _hostName = addr.getHostName
+  }
+
+  /** Shutdown the server. */
+  def stop(): Unit = {
+    if (channelFuture != null) {
+      channelFuture.channel().close().awaitUninterruptibly()
+      channelFuture = null
+    }
+    if (bootstrap != null && bootstrap.group() != null) {
+      bootstrap.group().shutdownGracefully()
+    }
+    if (bootstrap != null && bootstrap.childGroup() != null) {
+      bootstrap.childGroup().shutdownGracefully()
+    }
+    bootstrap = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
new file mode 100644
index 0000000..cc70bd0
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerChannelInitializer.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.server
+
+import io.netty.channel.ChannelInitializer
+import io.netty.channel.socket.SocketChannel
+import io.netty.handler.codec.LineBasedFrameDecoder
+import io.netty.handler.codec.string.StringDecoder
+import io.netty.util.CharsetUtil
+import org.apache.spark.storage.BlockDataProvider
+
+
+/** Channel initializer that sets up the pipeline for the BlockServer. */
+private[netty]
+class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
+  extends ChannelInitializer[SocketChannel] {
+
+  override def initChannel(ch: SocketChannel): Unit = {
+    ch.pipeline
+      .addLast("frameDecoder", new LineBasedFrameDecoder(1024))  // max block 
id length 1024
+      .addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
+      .addLast("blockHeaderEncoder", new BlockHeaderEncoder)
+      .addLast("handler", new BlockServerHandler(dataProvider))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
new file mode 100644
index 0000000..40dd5e5
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/server/BlockServerHandler.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.netty.server
+
+import java.io.FileInputStream
+import java.nio.ByteBuffer
+import java.nio.channels.FileChannel
+
+import io.netty.buffer.Unpooled
+import io.netty.channel._
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.{FileSegment, BlockDataProvider}
+
+
+/**
+ * A handler that processes requests from clients and writes block data back.
+ *
+ * The messages should have been processed by a LineBasedFrameDecoder and a 
StringDecoder first
+ * so channelRead0 is called once per line (i.e. per block id).
+ */
+private[server]
+class BlockServerHandler(dataProvider: BlockDataProvider)
+  extends SimpleChannelInboundHandler[String] with Logging {
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): 
Unit = {
+    logError(s"Exception in connection from ${ctx.channel.remoteAddress}", 
cause)
+    ctx.close()
+  }
+
+  override def channelRead0(ctx: ChannelHandlerContext, blockId: String): Unit 
= {
+    def client = ctx.channel.remoteAddress.toString
+
+    // A helper function to send error message back to the client.
+    def respondWithError(error: String): Unit = {
+      ctx.writeAndFlush(new BlockHeader(-1, blockId, Some(error))).addListener(
+        new ChannelFutureListener {
+          override def operationComplete(future: ChannelFuture) {
+            if (!future.isSuccess) {
+              // TODO: Maybe log the success case as well.
+              logError(s"Error sending error back to $client", future.cause)
+              ctx.close()
+            }
+          }
+        }
+      )
+    }
+
+    def writeFileSegment(segment: FileSegment): Unit = {
+      // Send error message back if the block is too large. Even though we are 
capable of sending
+      // large (2G+) blocks, the receiving end cannot handle it so let's fail 
fast.
+      // Once we fixed the receiving end to be able to process large blocks, 
this should be removed.
+      // Also make sure we update BlockHeaderEncoder to support length > 2G.
+
+      // See [[BlockHeaderEncoder]] for the way length is encoded.
+      if (segment.length + blockId.length + 4 > Int.MaxValue) {
+        respondWithError(s"Block $blockId size ($segment.length) greater than 
2G")
+        return
+      }
+
+      var fileChannel: FileChannel = null
+      try {
+        fileChannel = new FileInputStream(segment.file).getChannel
+      } catch {
+        case e: Exception =>
+          logError(
+            s"Error opening channel for $blockId in ${segment.file} for 
request from $client", e)
+          respondWithError(e.getMessage)
+      }
+
+      // Found the block. Send it back.
+      if (fileChannel != null) {
+        // Write the header and block data. In the case of failures, the 
listener on the block data
+        // write should close the connection.
+        ctx.write(new BlockHeader(segment.length.toInt, blockId))
+
+        val region = new DefaultFileRegion(fileChannel, segment.offset, 
segment.length)
+        ctx.writeAndFlush(region).addListener(new ChannelFutureListener {
+          override def operationComplete(future: ChannelFuture) {
+            if (future.isSuccess) {
+              logTrace(s"Sent block $blockId (${segment.length} B) back to 
$client")
+            } else {
+              logError(s"Error sending block $blockId to $client; closing 
connection", future.cause)
+              ctx.close()
+            }
+          }
+        })
+      }
+    }
+
+    def writeByteBuffer(buf: ByteBuffer): Unit = {
+      ctx.write(new BlockHeader(buf.remaining, blockId))
+      ctx.writeAndFlush(Unpooled.wrappedBuffer(buf)).addListener(new 
ChannelFutureListener {
+        override def operationComplete(future: ChannelFuture) {
+          if (future.isSuccess) {
+            logTrace(s"Sent block $blockId (${buf.remaining} B) back to 
$client")
+          } else {
+            logError(s"Error sending block $blockId to $client; closing 
connection", future.cause)
+            ctx.close()
+          }
+        }
+      })
+    }
+
+    logTrace(s"Received request from $client to fetch block $blockId")
+
+    var blockData: Either[FileSegment, ByteBuffer] = null
+
+    // First make sure we can find the block. If not, send error back to the 
user.
+    try {
+      blockData = dataProvider.getBlockData(blockId)
+    } catch {
+      case e: Exception =>
+        logError(s"Error opening block $blockId for request from $client", e)
+        respondWithError(e.getMessage)
+        return
+    }
+
+    blockData match {
+      case Left(segment) => writeFileSegment(segment)
+      case Right(buf) => writeByteBuffer(buf)
+    }
+
+  }  // end of channelRead0
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
new file mode 100644
index 0000000..5b6d086
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockDataProvider.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+
+/**
+ * An interface for providing data for blocks.
+ *
+ * getBlockData returns either a FileSegment (for zero-copy send), or a 
ByteBuffer.
+ *
+ * Aside from unit tests, [[BlockManager]] is the main class that implements 
this.
+ */
+private[spark] trait BlockDataProvider {
+  def getBlockData(blockId: String): Either[FileSegment, ByteBuffer]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 5f44f5f..91c0f47 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -18,19 +18,17 @@
 package org.apache.spark.storage
 
 import java.util.concurrent.LinkedBlockingQueue
+import org.apache.spark.network.netty.client.{LazyInitIterator, 
ReferenceCountedBuffer}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashSet
 import scala.collection.mutable.Queue
 import scala.util.{Failure, Success}
 
-import io.netty.buffer.ByteBuf
-
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.executor.ShuffleReadMetrics
 import org.apache.spark.network.BufferMessage
 import org.apache.spark.network.ConnectionManagerId
-import org.apache.spark.network.netty.ShuffleCopier
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.Utils
 
@@ -54,18 +52,28 @@ trait BlockFetcherIterator extends Iterator[(BlockId, 
Option[Iterator[Any]])] wi
 private[storage]
 object BlockFetcherIterator {
 
-  // A request to fetch one or more blocks, complete with their sizes
+  /**
+   * A request to fetch blocks from a remote BlockManager.
+   * @param address remote BlockManager to fetch from.
+   * @param blocks Sequence of tuple, where the first element is the block id,
+   *               and the second element is the estimated size, used to 
calculate bytesInFlight.
+   */
   class FetchRequest(val address: BlockManagerId, val blocks: Seq[(BlockId, 
Long)]) {
     val size = blocks.map(_._2).sum
   }
 
-  // A result of a fetch. Includes the block ID, size in bytes, and a function 
to deserialize
-  // the block (since we want all deserializaton to happen in the calling 
thread); can also
-  // represent a fetch failure if size == -1.
+  /**
+   * Result of a fetch from a remote block. A failure is represented as size 
== -1.
+   * @param blockId block id
+   * @param size estimated size of the block, used to calculate bytesInFlight.
+   *             Note that this is NOT the exact bytes.
+   * @param deserialize closure to return the result in the form of an 
Iterator.
+   */
   class FetchResult(val blockId: BlockId, val size: Long, val deserialize: () 
=> Iterator[Any]) {
     def failed: Boolean = size == -1
   }
 
+  // TODO: Refactor this whole thing to make code more reusable.
   class BasicBlockFetcherIterator(
       private val blockManager: BlockManager,
       val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
@@ -95,10 +103,10 @@ object BlockFetcherIterator {
 
     // Queue of fetch requests to issue; we'll pull requests off this 
gradually to make sure that
     // the number of bytes in flight is limited to maxBytesInFlight
-    private val fetchRequests = new Queue[FetchRequest]
+    protected val fetchRequests = new Queue[FetchRequest]
 
     // Current bytes in flight from our requests
-    private var bytesInFlight = 0L
+    protected var bytesInFlight = 0L
 
     protected def sendRequest(req: FetchRequest) {
       logDebug("Sending request for %d blocks (%s) from %s".format(
@@ -262,77 +270,55 @@ object BlockFetcherIterator {
       readMetrics: ShuffleReadMetrics)
     extends BasicBlockFetcherIterator(blockManager, blocksByAddress, 
serializer, readMetrics) {
 
-    import blockManager._
-
-    val fetchRequestsSync = new LinkedBlockingQueue[FetchRequest]
-
-    private def startCopiers(numCopiers: Int): List[_ <: Thread] = {
-      (for ( i <- Range(0,numCopiers) ) yield {
-        val copier = new Thread {
-          override def run(){
-            try {
-              while(!isInterrupted && !fetchRequestsSync.isEmpty) {
-                sendRequest(fetchRequestsSync.take())
-              }
-            } catch {
-              case x: InterruptedException => logInfo("Copier Interrupted")
-              // case _ => throw new SparkException("Exception Throw in 
Shuffle Copier")
-            }
-          }
-        }
-        copier.start
-        copier
-      }).toList
-    }
-
-    // keep this to interrupt the threads when necessary
-    private def stopCopiers() {
-      for (copier <- copiers) {
-        copier.interrupt()
-      }
-    }
-
     override protected def sendRequest(req: FetchRequest) {
-
-      def putResult(blockId: BlockId, blockSize: Long, blockData: ByteBuf) {
-        val fetchResult = new FetchResult(blockId, blockSize,
-          () => dataDeserialize(blockId, blockData.nioBuffer, serializer))
-        results.put(fetchResult)
-      }
-
       logDebug("Sending request for %d blocks (%s) from %s".format(
-        req.blocks.size, Utils.bytesToString(req.size), req.address.host))
-      val cmId = new ConnectionManagerId(req.address.host, 
req.address.nettyPort)
-      val cpier = new ShuffleCopier(blockManager.conf)
-      cpier.getBlocks(cmId, req.blocks, putResult)
-      logDebug("Sent request for remote blocks " + req.blocks + " from " + 
req.address.host )
-    }
-
-    private var copiers: List[_ <: Thread] = null
-
-    override def initialize() {
-      // Split Local Remote Blocks and set numBlocksToFetch
-      val remoteRequests = splitLocalRemoteBlocks()
-      // Add the remote requests into our queue in a random order
-      for (request <- Utils.randomize(remoteRequests)) {
-        fetchRequestsSync.put(request)
-      }
-
-      copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
-      logInfo("Started " + fetchRequestsSync.size + " remote fetches in " +
-        Utils.getUsedTimeMs(startTime))
+        req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
+      val cmId = new ConnectionManagerId(req.address.host, req.address.port)
 
-      // Get Local Blocks
-      startTime = System.currentTimeMillis
-      getLocalBlocks()
-      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
-    }
+      bytesInFlight += req.size
+      val sizeMap = req.blocks.toMap // so we can look up the size of each 
blockID
+
+      // This could throw a TimeoutException. In that case we will just retry 
the task.
+      val client = blockManager.nettyBlockClientFactory.createClient(
+        cmId.host, req.address.nettyPort)
+      val blocks = req.blocks.map(_._1.toString)
+
+      client.fetchBlocks(
+        blocks,
+        (blockId: String, refBuf: ReferenceCountedBuffer) => {
+          // Increment the reference count so the buffer won't be recycled.
+          // TODO: This could result in memory leaks when the task is stopped 
due to exception
+          // before the iterator is exhausted.
+          refBuf.retain()
+          val buf = refBuf.byteBuffer()
+          val blockSize = buf.remaining()
+          val bid = BlockId(blockId)
+
+          // TODO: remove code duplication between here and 
BlockManager.dataDeserialization.
+          results.put(new FetchResult(bid, sizeMap(bid), () => {
+            def createIterator: Iterator[Any] = {
+              val stream = blockManager.wrapForCompression(bid, 
refBuf.inputStream())
+              serializer.newInstance().deserializeStream(stream).asIterator
+            }
+            new LazyInitIterator(createIterator) {
+              // Release the buffer when we are done traversing it.
+              override def close(): Unit = refBuf.release()
+            }
+          }))
 
-    override def next(): (BlockId, Option[Iterator[Any]]) = {
-      resultsGotten += 1
-      val result = results.take()
-      // If all the results has been retrieved, copiers will exit automatically
-      (result.blockId, if (result.failed) None else Some(result.deserialize()))
+          readMetrics.synchronized {
+            readMetrics.remoteBytesRead += blockSize
+            readMetrics.remoteBlocksFetched += 1
+          }
+          logDebug("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
+        },
+        (blockId: String, errorMsg: String) => {
+          logError(s"Could not get block(s) from $cmId with error: $errorMsg")
+          for ((blockId, size) <- req.blocks) {
+            results.put(new FetchResult(blockId, -1, null))
+          }
+        }
+      )
     }
   }
   // End of NettyBlockFetcherIterator

http://git-wip-us.apache.org/repos/asf/spark/blob/3f23d2a3/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e8bbd29..e676769 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -25,16 +25,19 @@ import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 import scala.util.Random
 
-import akka.actor.{ActorSystem, Cancellable, Props}
+import akka.actor.{ActorSystem, Props}
 import sun.nio.ch.DirectBuffer
 
 import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
+import org.apache.spark.network.netty.client.BlockFetchingClientFactory
+import org.apache.spark.network.netty.server.BlockServer
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util._
 
+
 private[spark] sealed trait BlockValues
 private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends 
BlockValues
 private[spark] case class IteratorValues(iterator: Iterator[Any]) extends 
BlockValues
@@ -58,7 +61,7 @@ private[spark] class BlockManager(
     val conf: SparkConf,
     securityManager: SecurityManager,
     mapOutputTracker: MapOutputTracker)
-  extends Logging {
+  extends BlockDataProvider with Logging {
 
   private val port = conf.getInt("spark.blockManager.port", 0)
   val shuffleBlockManager = new ShuffleBlockManager(this)
@@ -86,13 +89,25 @@ private[spark] class BlockManager(
     new TachyonStore(this, tachyonBlockManager)
   }
 
+  private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
+
   // If we use Netty for shuffle, start a new Netty-based shuffle sender 
service.
-  private val nettyPort: Int = {
-    val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
-    val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0)
-    if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) 
else 0
+  private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = {
+    if (useNetty) new BlockFetchingClientFactory(conf) else null
   }
 
+  private val nettyBlockServer: BlockServer = {
+    if (useNetty) {
+      val server = new BlockServer(conf, this)
+      logInfo(s"Created NettyBlockServer binding to port: ${server.port}")
+      server
+    } else {
+      null
+    }
+  }
+
+  private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0
+
   val blockManagerId = BlockManagerId(
     executorId, connectionManager.id.host, connectionManager.id.port, 
nettyPort)
 
@@ -216,6 +231,20 @@ private[spark] class BlockManager(
     }
   }
 
+  override def getBlockData(blockId: String): Either[FileSegment, ByteBuffer] 
= {
+    val bid = BlockId(blockId)
+    if (bid.isShuffle) {
+      Left(diskBlockManager.getBlockLocation(bid))
+    } else {
+      val blockBytesOpt = doGetLocal(bid, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
+      if (blockBytesOpt.isDefined) {
+        Right(blockBytesOpt.get)
+      } else {
+        throw new BlockNotFoundException(blockId)
+      }
+    }
+  }
+
   /**
    * Get the BlockStatus for the block identified by the given ID, if it 
exists.
    * NOTE: This is mainly for testing, and it doesn't fetch information from 
Tachyon.
@@ -1061,6 +1090,14 @@ private[spark] class BlockManager(
     connectionManager.stop()
     shuffleBlockManager.stop()
     diskBlockManager.stop()
+
+    if (nettyBlockClientFactory != null) {
+      nettyBlockClientFactory.stop()
+    }
+    if (nettyBlockServer != null) {
+      nettyBlockServer.stop()
+    }
+
     actorSystem.stop(slaveActor)
     blockInfo.clear()
     memoryStore.clear()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to