Repository: spark Updated Branches: refs/heads/branch-2.1 5e8010349 -> 1857acc71
[SPARK-18972][CORE] Fix the netty thread names for RPC ## What changes were proposed in this pull request? Right now the name of threads created by Netty for Spark RPC are `shuffle-client-**` and `shuffle-server-**`. It's pretty confusing. This PR just uses the module name in TransportConf to set the thread name. In addition, it also includes the following minor fixes: - TransportChannelHandler.channelActive and channelInactive should call the corresponding super methods. - Make ShuffleBlockFetcherIterator throw NoSuchElementException if it has no more elements. Otherwise, if the caller calls `next` without `hasNext`, it will just hang. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16380 from zsxwing/SPARK-18972. (cherry picked from commit f252cb5d161e064d39cc1ed1d9299307a0636174) Signed-off-by: Shixiong Zhu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1857acc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1857acc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1857acc7 Branch: refs/heads/branch-2.1 Commit: 1857acc717dcd083d21b20ef4d09723c3901bdfb Parents: 5e80103 Author: Shixiong Zhu <[email protected]> Authored: Thu Dec 22 16:22:55 2016 -0800 Committer: Shixiong Zhu <[email protected]> Committed: Thu Dec 22 16:23:01 2016 -0800 ---------------------------------------------------------------------- .../spark/network/client/TransportClientFactory.java | 6 ++++-- .../spark/network/server/TransportChannelHandler.java | 12 ++++++------ .../apache/spark/network/server/TransportServer.java | 2 +- .../org/apache/spark/network/util/TransportConf.java | 4 ++++ .../spark/storage/ShuffleBlockFetcherIterator.scala | 4 ++++ 5 files changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index e895f13..cb10edf 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -100,8 +100,10 @@ public class TransportClientFactory implements Closeable { IOMode ioMode = IOMode.valueOf(conf.ioMode()); this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); - // TODO: Make thread pool name configurable. - this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client"); + this.workerGroup = NettyUtils.createEventLoop( + ioMode, + conf.clientThreads(), + conf.getModuleName() + "-client"); this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); } http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index c33848c..c6ccae1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -88,14 +88,14 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message try { requestHandler.channelActive(); } catch (RuntimeException e) { - logger.error("Exception from request handler while registering channel", e); + logger.error("Exception from request handler while channel is active", e); } try { responseHandler.channelActive(); } catch (RuntimeException e) { - logger.error("Exception from response handler while registering channel", e); + logger.error("Exception from response handler while channel is active", e); } - super.channelRegistered(ctx); + super.channelActive(ctx); } @Override @@ -103,14 +103,14 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message try { requestHandler.channelInactive(); } catch (RuntimeException e) { - logger.error("Exception from request handler while unregistering channel", e); + logger.error("Exception from request handler while channel is inactive", e); } try { responseHandler.channelInactive(); } catch (RuntimeException e) { - logger.error("Exception from response handler while unregistering channel", e); + logger.error("Exception from response handler while channel is inactive", e); } - super.channelUnregistered(ctx); + super.channelInactive(ctx); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index 0d7a677..047c5f3 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -89,7 +89,7 @@ public class TransportServer implements Closeable { IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = - NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); + NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 64eaba1..fc5cc09 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -73,6 +73,10 @@ public class TransportConf { return "spark." + module + "." + suffix; } + public String getModuleName() { + return module; + } + /** IO mode: nio or epoll */ public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); } http://git-wip-us.apache.org/repos/asf/spark/blob/1857acc7/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 269c12d..7eda6e9 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -304,6 +304,10 @@ final class ShuffleBlockFetcherIterator( * Throws a FetchFailedException if the next block could not be fetched. */ override def next(): (BlockId, InputStream) = { + if (!hasNext) { + throw new NoSuchElementException + } + numBlocksProcessed += 1 val startFetchWait = System.currentTimeMillis() currentResult = results.take() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
