This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ff084d2852e [SPARK-44756][CORE] Executor hangs when
RetryingBlockTransferor fails to initiate retry
ff084d2852e is described below
commit ff084d2852e62c6670e074ef423ae16c915710bc
Author: Harunobu Daikoku <[email protected]>
AuthorDate: Tue Sep 26 11:07:41 2023 -0500
[SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to
initiate retry
### What changes were proposed in this pull request?
This PR fixes a bug in `RetryingBlockTransferor` that happens when retry
initiation has failed.
With this patch, the callers of
`RetryingBlockTransfeathror#initiateRetry()` will catch any error and invoke
the parent listener's exception handler.
### Why are the changes needed?
This is needed to prevent an edge case where retry initiation fails and
executor gets stuck.
More details in SPARK-44756
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
Added a new test case in `RetryingBlockTransferorSuite` that simulates the
problematic scenario.
<img width="772" alt="image"
src="https://github.com/apache/spark/assets/17327104/f20ec327-f5c9-4d74-b861-1ea4e05eb46b">
Closes #42426 from hdaikoku/SPARK-44756.
Authored-by: Harunobu Daikoku <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../network/shuffle/RetryingBlockTransferor.java | 47 ++++++++++++++++------
.../shuffle/RetryingBlockTransferorSuite.java | 34 +++++++++++++++-
2 files changed, 67 insertions(+), 14 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
index 892de991612..c628b201b20 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
@@ -144,6 +144,11 @@ public class RetryingBlockTransferor {
this(conf, transferStarter, blockIds, listener,
ErrorHandler.NOOP_ERROR_HANDLER);
}
+ @VisibleForTesting
+ synchronized void setCurrentListener(RetryingBlockTransferListener listener)
{
+ this.currentListener = listener;
+ }
+
/**
* Initiates the transfer of all blocks provided in the constructor, with
possible retries
* in the event of transient IOExceptions.
@@ -176,12 +181,14 @@ public class RetryingBlockTransferor {
listener.getTransferType(), blockIdsToTransfer.length,
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
- if (shouldRetry(e)) {
- initiateRetry(e);
- } else {
- for (String bid : blockIdsToTransfer) {
- listener.onBlockTransferFailure(bid, e);
- }
+ if (shouldRetry(e) && initiateRetry(e)) {
+ // successfully initiated a retry
+ return;
+ }
+
+ // retry is not possible, so fail remaining blocks
+ for (String bid : blockIdsToTransfer) {
+ listener.onBlockTransferFailure(bid, e);
}
}
}
@@ -189,8 +196,10 @@ public class RetryingBlockTransferor {
/**
* Lightweight method which initiates a retry in a different thread. The
retry will involve
* calling transferAllOutstanding() after a configured wait time.
+ * Returns true if the retry was successfully initiated, false otherwise.
*/
- private synchronized void initiateRetry(Throwable e) {
+ @VisibleForTesting
+ synchronized boolean initiateRetry(Throwable e) {
if (enableSaslRetries && e instanceof SaslTimeoutException) {
saslRetryCount += 1;
}
@@ -201,10 +210,17 @@ public class RetryingBlockTransferor {
listener.getTransferType(), retryCount, maxRetries,
outstandingBlocksIds.size(),
retryWaitTime);
- executorService.submit(() -> {
- Uninterruptibles.sleepUninterruptibly(retryWaitTime,
TimeUnit.MILLISECONDS);
- transferAllOutstanding();
- });
+ try {
+ executorService.execute(() -> {
+ Uninterruptibles.sleepUninterruptibly(retryWaitTime,
TimeUnit.MILLISECONDS);
+ transferAllOutstanding();
+ });
+ } catch (Throwable t) {
+ logger.error("Exception while trying to initiate retry", t);
+ return false;
+ }
+
+ return true;
}
/**
@@ -240,7 +256,8 @@ public class RetryingBlockTransferor {
* listener. Note that in the event of a retry, we will immediately replace
the 'currentListener'
* field, indicating that any responses from non-current Listeners should be
ignored.
*/
- private class RetryingBlockTransferListener implements
+ @VisibleForTesting
+ class RetryingBlockTransferListener implements
BlockFetchingListener, BlockPushingListener {
private void handleBlockTransferSuccess(String blockId, ManagedBuffer
data) {
// We will only forward this success message to our parent listener if
this block request is
@@ -274,7 +291,11 @@ public class RetryingBlockTransferor {
synchronized (RetryingBlockTransferor.this) {
if (this == currentListener && outstandingBlocksIds.contains(blockId))
{
if (shouldRetry(exception)) {
- initiateRetry(exception);
+ if (!initiateRetry(exception)) {
+ // failed to initiate a retry, so fail this block
+ outstandingBlocksIds.remove(blockId);
+ shouldForwardFailure = true;
+ }
} else {
if (errorHandler.shouldLogError(exception)) {
logger.error(
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
index 041d88c698d..eed92ced4e1 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
@@ -372,6 +372,32 @@ public class RetryingBlockTransferorSuite {
assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
}
+ @Test
+ public void testRetryInitiationFailure() throws IOException,
InterruptedException {
+ BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
+ // IOException will initiate a retry, but the initiation will fail
+ ImmutableMap.<String, Object>builder()
+ .put("b0", new IOException("Connection failed or something"))
+ .put("b1", block1)
+ .build()
+ );
+
+ configureInteractions(interactions, listener);
+ _retryingBlockTransferor = spy(_retryingBlockTransferor);
+ // Simulate a failure to initiate a retry.
+ doReturn(false).when(_retryingBlockTransferor).initiateRetry(any());
+ // Override listener, so that it delegates to the spied instance and not
the original class.
+ _retryingBlockTransferor.setCurrentListener(
+ _retryingBlockTransferor.new RetryingBlockTransferListener());
+ _retryingBlockTransferor.start();
+
+ verify(listener, timeout(5000)).onBlockTransferFailure(eq("b0"), any());
+ verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
+ verifyNoMoreInteractions(listener);
+ }
+
/**
* Performs a set of interactions in response to block requests from a
RetryingBlockFetcher.
* Each interaction is a Map from BlockId to either ManagedBuffer or
Exception. This interaction
@@ -384,6 +410,13 @@ public class RetryingBlockTransferorSuite {
*/
private static void performInteractions(List<? extends Map<String, Object>>
interactions,
BlockFetchingListener listener)
+ throws IOException, InterruptedException {
+ configureInteractions(interactions, listener);
+ _retryingBlockTransferor.start();
+ }
+
+ private static void configureInteractions(List<? extends Map<String,
Object>> interactions,
+ BlockFetchingListener listener)
throws IOException, InterruptedException {
MapConfigProvider provider = new MapConfigProvider(configMap);
@@ -440,6 +473,5 @@ public class RetryingBlockTransferorSuite {
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
_retryingBlockTransferor =
new RetryingBlockTransferor(conf, fetchStarter, blockIdArray,
listener);
- _retryingBlockTransferor.start();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]