This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 4fdbd87 [SPARK-28160][CORE] Fix a bug that callback function may hang
when unchecked exception missed
4fdbd87 is described below
commit 4fdbd87e3b3de804c8a0bf3d2613881f25425eed
Author: LantaoJin <[email protected]>
AuthorDate: Sun Jun 30 15:14:41 2019 -0500
[SPARK-28160][CORE] Fix a bug that callback function may hang when
unchecked exception missed
This is very like #23590 .
`ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is
large but no enough memory is available. However, when this happens,
`TransportClient.sendRpcSync` will just hang forever if the timeout set to
unlimited.
This PR catches `Throwable` and uses the error to complete `SettableFuture`.
I tested in my IDE by setting the value of size to -1 to verify the result.
Without this patch, it won't be finished until timeout (May hang forever if
timeout set to MAX_INT), or the expected `IllegalArgumentException` will be
caught.
```java
Override
public void onSuccess(ByteBuffer response) {
try {
int size = response.remaining();
ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in
runtime when debug
copy.put(response);
// flip "copy" to make it readable
copy.flip();
result.set(copy);
} catch (Throwable t) {
result.setException(t);
}
}
```
Closes #24964 from LantaoJin/SPARK-28160.
Lead-authored-by: LantaoJin <[email protected]>
Co-authored-by: lajin <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 0e421000e0ea2c090b6fab0201a6046afceec132)
Signed-off-by: Sean Owen <[email protected]>
---
.../org/apache/spark/network/client/TransportClient.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 8f354ad..3b400e2 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -254,11 +254,16 @@ public class TransportClient implements Closeable {
sendRpc(message, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
- ByteBuffer copy = ByteBuffer.allocate(response.remaining());
- copy.put(response);
- // flip "copy" to make it readable
- copy.flip();
- result.set(copy);
+ try {
+ ByteBuffer copy = ByteBuffer.allocate(response.remaining());
+ copy.put(response);
+ // flip "copy" to make it readable
+ copy.flip();
+ result.set(copy);
+ } catch (Throwable t) {
+ logger.warn("Error in responding PRC callback", t);
+ result.setException(t);
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]