This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new fed407a4de2 [SPARK-42090][3.2] Introduce sasl retry count in
RetryingBlockTransferor
fed407a4de2 is described below
commit fed407a4de24f7a387cd25bb2297de0bf4ca016c
Author: Ted Yu <[email protected]>
AuthorDate: Tue Jan 24 12:15:22 2023 -0600
[SPARK-42090][3.2] Introduce sasl retry count in RetryingBlockTransferor
### What changes were proposed in this pull request? This PR introduces
sasl retry count in RetryingBlockTransferor.
### Why are the changes needed?
Previously a boolean variable, saslTimeoutSeen, was used. However, the
boolean variable wouldn't cover the following scenario:
1. SaslTimeoutException
2. IOException
3. SaslTimeoutException
4. IOException
Even though IOException at #2 is retried (resulting in increment of
retryCount), the retryCount would be cleared at step #4. Since the intention of
saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException,
we should keep a counter for SaslTimeoutException retries and subtract the
value of this counter from retryCount.
### Does this PR introduce _any_ user-facing change? No
### How was this patch tested?
New test is added, courtesy of Mridul.
Closes #39611 from tedyu/sasl-cnt.
Authored-by: Ted Yu <yuzhihonggmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Closes #39710 from akpatnam25/SPARK-42090-backport-3.2.
Authored-by: Ted Yu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../network/shuffle/RetryingBlockTransferor.java | 46 +++++++++++++++-------
.../shuffle/RetryingBlockTransferorSuite.java | 35 +++++++++++++++-
2 files changed, 65 insertions(+), 16 deletions(-)
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
index 4515e3a5c28..892de991612 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
@@ -87,7 +88,16 @@ public class RetryingBlockTransferor {
/** Number of times we've attempted to retry so far. */
private int retryCount = 0;
- private boolean saslTimeoutSeen;
+ // Number of times SASL timeout has been retried without success.
+ // If we see maxRetries consecutive failures, the request is failed.
+ // On the other hand, if sasl succeeds and we are able to send other
requests subsequently,
+ // we reduce the SASL failures from retryCount (since SASL failures were
part of
+ // connection bootstrap - which ended up being successful).
+ // spark.network.auth.rpcTimeout is much lower than spark.network.timeout
and others -
+ // and so sasl is more susceptible to failures when remote service
+ // (like external shuffle service) is under load: but once it succeeds, we
do not want to
+ // include it as part of request retries.
+ private int saslRetryCount = 0;
/**
* Set of all block ids which have not been transferred successfully or with
a non-IO Exception.
@@ -123,7 +133,7 @@ public class RetryingBlockTransferor {
this.currentListener = new RetryingBlockTransferListener();
this.errorHandler = errorHandler;
this.enableSaslRetries = conf.enableSaslRetries();
- this.saslTimeoutSeen = false;
+ this.saslRetryCount = 0;
}
public RetryingBlockTransferor(
@@ -167,7 +177,7 @@ public class RetryingBlockTransferor {
numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
if (shouldRetry(e)) {
- initiateRetry();
+ initiateRetry(e);
} else {
for (String bid : blockIdsToTransfer) {
listener.onBlockTransferFailure(bid, e);
@@ -180,7 +190,10 @@ public class RetryingBlockTransferor {
* Lightweight method which initiates a retry in a different thread. The
retry will involve
* calling transferAllOutstanding() after a configured wait time.
*/
- private synchronized void initiateRetry() {
+ private synchronized void initiateRetry(Throwable e) {
+ if (enableSaslRetries && e instanceof SaslTimeoutException) {
+ saslRetryCount += 1;
+ }
retryCount += 1;
currentListener = new RetryingBlockTransferListener();
@@ -203,16 +216,17 @@ public class RetryingBlockTransferor {
boolean isIOException = e instanceof IOException
|| e.getCause() instanceof IOException;
boolean isSaslTimeout = enableSaslRetries && e instanceof
SaslTimeoutException;
- if (!isSaslTimeout && saslTimeoutSeen) {
- retryCount = 0;
- saslTimeoutSeen = false;
+ // If this is a non SASL request failure, reduce earlier SASL failures
from retryCount
+ // since some subsequent SASL attempt was successful
+ if (!isSaslTimeout && saslRetryCount > 0) {
+ Preconditions.checkState(retryCount >= saslRetryCount,
+ "retryCount must be greater than or equal to saslRetryCount");
+ retryCount -= saslRetryCount;
+ saslRetryCount = 0;
}
boolean hasRemainingRetries = retryCount < maxRetries;
boolean shouldRetry = (isSaslTimeout || isIOException) &&
hasRemainingRetries && errorHandler.shouldRetryError(e);
- if (shouldRetry && isSaslTimeout) {
- this.saslTimeoutSeen = true;
- }
return shouldRetry;
}
@@ -236,9 +250,13 @@ public class RetryingBlockTransferor {
if (this == currentListener && outstandingBlocksIds.contains(blockId))
{
outstandingBlocksIds.remove(blockId);
shouldForwardSuccess = true;
- if (saslTimeoutSeen) {
- retryCount = 0;
- saslTimeoutSeen = false;
+ // If there were SASL failures earlier, remove them from retryCount,
as there was
+ // a SASL success (and some other request post bootstrap was also
successful).
+ if (saslRetryCount > 0) {
+ Preconditions.checkState(retryCount >= saslRetryCount,
+ "retryCount must be greater than or equal to saslRetryCount");
+ retryCount -= saslRetryCount;
+ saslRetryCount = 0;
}
}
}
@@ -256,7 +274,7 @@ public class RetryingBlockTransferor {
synchronized (RetryingBlockTransferor.this) {
if (this == currentListener && outstandingBlocksIds.contains(blockId))
{
if (shouldRetry(exception)) {
- initiateRetry();
+ initiateRetry(exception);
} else {
if (errorHandler.shouldLogError(exception)) {
logger.error(
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
index 117a9ba08df..31fe6184166 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
@@ -58,10 +58,12 @@ public class RetryingBlockTransferorSuite {
private static Map<String, String> configMap;
private static RetryingBlockTransferor _retryingBlockTransferor;
+ private static final int MAX_RETRIES = 2;
+
@Before
public void initMap() {
configMap = new HashMap<String, String>() {{
- put("spark.shuffle.io.maxRetries", "2");
+ put("spark.shuffle.io.maxRetries", Integer.toString(MAX_RETRIES));
put("spark.shuffle.io.retryWait", "0");
}};
}
@@ -309,7 +311,7 @@ public class RetryingBlockTransferorSuite {
verify(listener, timeout(5000)).onBlockTransferFailure("b0",
saslTimeoutException);
verify(listener, times(3)).getTransferType();
verifyNoMoreInteractions(listener);
- assert(_retryingBlockTransferor.getRetryCount() == 2);
+ assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
}
@Test
@@ -341,6 +343,35 @@ public class RetryingBlockTransferorSuite {
assert(_retryingBlockTransferor.getRetryCount() == 1);
}
+ @Test
+ public void testIOExceptionFailsConnectionEvenWithSaslException()
+ throws IOException, InterruptedException {
+ BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+ SaslTimeoutException saslExceptionInitial = new
SaslTimeoutException("initial",
+ new TimeoutException());
+ SaslTimeoutException saslExceptionFinal = new SaslTimeoutException("final",
+ new TimeoutException());
+ IOException ioException = new IOException();
+ List<? extends Map<String, Object>> interactions = Arrays.asList(
+ ImmutableMap.of("b0", saslExceptionInitial),
+ ImmutableMap.of("b0", ioException),
+ ImmutableMap.of("b0", saslExceptionInitial),
+ ImmutableMap.of("b0", ioException),
+ ImmutableMap.of("b0", saslExceptionFinal),
+ // will not get invoked because the connection fails
+ ImmutableMap.of("b0", ioException),
+ // will not get invoked
+ ImmutableMap.of("b0", block0)
+ );
+ configMap.put("spark.shuffle.sasl.enableRetries", "true");
+ performInteractions(interactions, listener);
+ verify(listener, timeout(5000)).onBlockTransferFailure("b0",
saslExceptionFinal);
+ verify(listener, atLeastOnce()).getTransferType();
+ verifyNoMoreInteractions(listener);
+ assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
+ }
+
/**
* Performs a set of interactions in response to block requests from a
RetryingBlockFetcher.
* Each interaction is a Map from BlockId to either ManagedBuffer or
Exception. This interaction
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]