Repository: spark
Updated Branches:
  refs/heads/branch-2.1 5e8010349 -> 1857acc71


[SPARK-18972][CORE] Fix the netty thread names for RPC

## What changes were proposed in this pull request?

Right now the name of threads created by Netty for Spark RPC are 
`shuffle-client-**` and `shuffle-server-**`. It's pretty confusing.

This PR just uses the module name in TransportConf to set the thread name. In 
addition, it also includes the following minor fixes:

- TransportChannelHandler.channelActive and channelInactive should call the 
corresponding super methods.
- Make ShuffleBlockFetcherIterator throw NoSuchElementException if it has no 
more elements. Otherwise,  if the caller calls `next` without `hasNext`, it 
will just hang.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #16380 from zsxwing/SPARK-18972.

(cherry picked from commit f252cb5d161e064d39cc1ed1d9299307a0636174)
Signed-off-by: Shixiong Zhu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1857acc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1857acc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1857acc7

Branch: refs/heads/branch-2.1
Commit: 1857acc717dcd083d21b20ef4d09723c3901bdfb
Parents: 5e80103
Author: Shixiong Zhu <[email protected]>
Authored: Thu Dec 22 16:22:55 2016 -0800
Committer: Shixiong Zhu <[email protected]>
Committed: Thu Dec 22 16:23:01 2016 -0800

----------------------------------------------------------------------
 .../spark/network/client/TransportClientFactory.java    |  6 ++++--
 .../spark/network/server/TransportChannelHandler.java   | 12 ++++++------
 .../apache/spark/network/server/TransportServer.java    |  2 +-
 .../org/apache/spark/network/util/TransportConf.java    |  4 ++++
 .../spark/storage/ShuffleBlockFetcherIterator.scala     |  4 ++++
 5 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e895f13..cb10edf 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -100,8 +100,10 @@ public class TransportClientFactory implements Closeable {
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
     this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
-    // TODO: Make thread pool name configurable.
-    this.workerGroup = NettyUtils.createEventLoop(ioMode, 
conf.clientThreads(), "shuffle-client");
+    this.workerGroup = NettyUtils.createEventLoop(
+        ioMode,
+        conf.clientThreads(),
+        conf.getModuleName() + "-client");
     this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
       conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index c33848c..c6ccae1 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -88,14 +88,14 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
     try {
       requestHandler.channelActive();
     } catch (RuntimeException e) {
-      logger.error("Exception from request handler while registering channel", 
e);
+      logger.error("Exception from request handler while channel is active", 
e);
     }
     try {
       responseHandler.channelActive();
     } catch (RuntimeException e) {
-      logger.error("Exception from response handler while registering 
channel", e);
+      logger.error("Exception from response handler while channel is active", 
e);
     }
-    super.channelRegistered(ctx);
+    super.channelActive(ctx);
   }
 
   @Override
@@ -103,14 +103,14 @@ public class TransportChannelHandler extends 
SimpleChannelInboundHandler<Message
     try {
       requestHandler.channelInactive();
     } catch (RuntimeException e) {
-      logger.error("Exception from request handler while unregistering 
channel", e);
+      logger.error("Exception from request handler while channel is inactive", 
e);
     }
     try {
       responseHandler.channelInactive();
     } catch (RuntimeException e) {
-      logger.error("Exception from response handler while unregistering 
channel", e);
+      logger.error("Exception from response handler while channel is 
inactive", e);
     }
-    super.channelUnregistered(ctx);
+    super.channelInactive(ctx);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 0d7a677..047c5f3 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -89,7 +89,7 @@ public class TransportServer implements Closeable {
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
     EventLoopGroup bossGroup =
-      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
"shuffle-server");
+      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), 
conf.getModuleName() + "-server");
     EventLoopGroup workerGroup = bossGroup;
 
     PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(

http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 64eaba1..fc5cc09 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -73,6 +73,10 @@ public class TransportConf {
     return "spark." + module + "." + suffix;
   }
 
+  public String getModuleName() {
+    return module;
+  }
+
   /** IO mode: nio or epoll */
   public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, 
"NIO").toUpperCase(); }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 269c12d..7eda6e9 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -304,6 +304,10 @@ final class ShuffleBlockFetcherIterator(
    * Throws a FetchFailedException if the next block could not be fetched.
    */
   override def next(): (BlockId, InputStream) = {
+    if (!hasNext) {
+      throw new NoSuchElementException
+    }
+
     numBlocksProcessed += 1
     val startFetchWait = System.currentTimeMillis()
     currentResult = results.take()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to