Repository: spark Updated Branches: refs/heads/branch-2.3 3aa780ef3 -> 2549beae2
[SPARK-23289][CORE] OneForOneBlockFetcher.DownloadCallback.onData should write the buffer fully ## What changes were proposed in this pull request? `channel.write(buf)` may not write the whole buffer since the underlying channel is a FileChannel, we should retry until the whole buffer is written. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #20461 from zsxwing/SPARK-23289. (cherry picked from commit ec63e2d0743a4f75e1cce21d0fe2b54407a86a4a) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2549beae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2549beae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2549beae Branch: refs/heads/branch-2.3 Commit: 2549beae20fe8761242f6fb9cda35ff06a652897 Parents: 3aa780e Author: Shixiong Zhu <[email protected]> Authored: Thu Feb 1 21:00:47 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu Feb 1 21:02:54 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/network/shuffle/OneForOneBlockFetcher.java | 4 +++- core/src/test/scala/org/apache/spark/FileSuite.scala | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2549beae/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 9cac7d0..0bc5718 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -171,7 +171,9 @@ public class OneForOneBlockFetcher { @Override public void onData(String streamId, ByteBuffer buf) throws IOException { - channel.write(buf); + while (buf.hasRemaining()) { + channel.write(buf); + } } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/2549beae/core/src/test/scala/org/apache/spark/FileSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index e9539dc..55a9122 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -244,7 +244,10 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { for (i <- 0 until testOutputCopies) { // Shift values by i so that they're different in the output val alteredOutput = testOutput.map(b => (b + i).toByte) - channel.write(ByteBuffer.wrap(alteredOutput)) + val buffer = ByteBuffer.wrap(alteredOutput) + while (buffer.hasRemaining) { + channel.write(buffer) + } } channel.close() file.close() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
