Repository: spark Updated Branches: refs/heads/master 0c34fa5b4 -> 7c0c26cd1
[SPARK-4064]NioBlockTransferService.fetchBlocks may cause spark to hang. cc @rxin Author: GuoQiang Li <[email protected]> Closes #2929 from witgo/SPARK-4064 and squashes the following commits: 20110f2 [GuoQiang Li] Modify the exception msg 3425225 [GuoQiang Li] review commits 2b07e49 [GuoQiang Li] If we create a lot of big broadcast variables, Spark may hang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c0c26cd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c0c26cd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c0c26cd Branch: refs/heads/master Commit: 7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42 Parents: 0c34fa5 Author: GuoQiang Li <[email protected]> Authored: Mon Oct 27 23:31:46 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Mon Oct 27 23:31:46 2014 -0700 ---------------------------------------------------------------------- .../network/nio/NioBlockTransferService.scala | 25 ++++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7c0c26cd/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index 5add4fc..e311320 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -95,16 +95,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa future.onSuccess { case message => val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) - - for (blockMessage <- blockMessageArray) { - if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { - listener.onBlockFetchFailure( - new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId")) - } else { - val blockId = blockMessage.getId - val networkSize = blockMessage.getData.limit() - listener.onBlockFetchSuccess( - blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData)) + // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty. + if (blockMessageArray.isEmpty) { + listener.onBlockFetchFailure( + new SparkException(s"Received empty message from $cmId")) + } else { + for (blockMessage <- blockMessageArray) { + val msgType = blockMessage.getType + if (msgType != BlockMessage.TYPE_GOT_BLOCK) { + listener.onBlockFetchFailure( + new SparkException(s"Unexpected message ${msgType} received from $cmId")) + } else { + val blockId = blockMessage.getId + listener.onBlockFetchSuccess( + blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData)) + } } } }(cm.futureExecContext) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
