This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new ef0f52b16a Handle thread interrupt in server client execute loop (#2622) ef0f52b16a is described below commit ef0f52b16ae148bceb74d9d3ff2a091a4d497aeb Author: nikita <72053786+nikita-sir...@users.noreply.github.com> AuthorDate: Tue Apr 26 06:35:50 2022 -0700 Handle thread interrupt in server client execute loop (#2622) Re-throw ClosedByInterruptException as UncheckedIOException to exit out of interrupted retry loops to improve the client side user experience when interrupting the process with `Ctrl+C` This fixes #2621 Co-authored-by: Nikita Sirohi <nik...@gh.st> --- .../java/org/apache/accumulo/core/rpc/TTimeoutTransport.java | 10 ++++++++++ .../src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java | 6 ++++++ .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 6 ++++++ 3 files changed, 22 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java index d7068c718d..b69d081e28 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/TTimeoutTransport.java @@ -23,9 +23,11 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; +import java.nio.channels.ClosedByInterruptException; import java.nio.channels.spi.SelectorProvider; import org.apache.accumulo.core.util.HostAndPort; @@ -87,6 +89,10 @@ public class TTimeoutTransport { socket = openSocket(addr, (int) timeoutMillis); } catch (IOException e) { // openSocket handles closing the Socket on error + if (e instanceof ClosedByInterruptException) { + Thread.currentThread().interrupt(); + throw new UncheckedIOException(e); + } throw new TTransportException(e); } @@ -100,6 +106,10 @@ public class TTimeoutTransport { return new TIOStreamTransport(input, output); } catch (IOException e) { closeSocket(socket, e); + if (e instanceof ClosedByInterruptException) { + Thread.currentThread().interrupt(); + throw new UncheckedIOException(e); + } throw new TTransportException(e); } catch (TTransportException e) { closeSocket(socket, e); diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java index 3e63ac1cbf..30c6d8a50b 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java @@ -20,7 +20,9 @@ package org.apache.accumulo.core.rpc; import java.io.FileInputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetAddress; +import java.nio.channels.ClosedByInterruptException; import java.security.KeyStore; import java.security.SecureRandom; import java.util.HashMap; @@ -328,6 +330,10 @@ public class ThriftUtil { throw e; } catch (IOException e) { log.warn("Failed to open SASL transport", e); + if (e instanceof ClosedByInterruptException) { + Thread.currentThread().interrupt(); + throw new UncheckedIOException(e); + } throw new TTransportException(e); } } else { diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index ba8ba2d8f8..0910f5332e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -21,10 +21,12 @@ package org.apache.accumulo.server.rpc; import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.UnknownHostException; +import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; import java.util.EnumSet; import java.util.HashSet; @@ -527,6 +529,10 @@ public class TServerUtils { serverUser = UserGroupInformation.getLoginUser(); } catch (IOException e) { transport.close(); + if (e instanceof ClosedByInterruptException) { + Thread.currentThread().interrupt(); + throw new UncheckedIOException(e); + } throw new TTransportException(e); }