This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 770c05b46a4e3c135c85d0772ebb2b02d9db5414 Merge: 1b6dfa7915 c78dfc5235 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Thu Aug 31 07:23:58 2023 -0400 Merge branch '2.1' .../core/clientImpl/TabletServerBatchWriter.java | 260 +++++++++++++++------ .../accumulo/tserver/TabletClientHandler.java | 97 ++++---- .../apache/accumulo/test/WriteAfterCloseIT.java | 203 ++++++++++++++++ 3 files changed, 445 insertions(+), 115 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index e0dd94d216,de66339886..cf6821c649 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@@ -66,12 -71,15 +72,13 @@@ import org.apache.accumulo.core.dataImp import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.dataImpl.thrift.TMutation; import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; ++import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; - import org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException; +import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService; - import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; + import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.HostAndPort; + import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TApplicationException; @@@ -931,15 -945,17 +945,17 @@@ public class TabletServerBatchWriter im timeoutTracker.startingWrite(); + // If there is an open session, must close it before the batchwriter closes or writes could + // happen after the batch writer closes. See #3721 try { final HostAndPort parsedServer = HostAndPort.fromString(location); - final TabletClientService.Iface client; + final TabletIngestClientService.Iface client; if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context, + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, parsedServer, context, timeoutTracker.getTimeOut()); } else { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, parsedServer, context); } try { @@@ -1038,6 -1048,114 +1048,114 @@@ throw new IOException(e); } } + + class SessionCloser implements AutoCloseable { + + private final String location; + private OptionalLong usid; + + SessionCloser(String location) { + this.location = location; + usid = OptionalLong.empty(); + } + + void setSession(long usid) { + this.usid = OptionalLong.of(usid); + } + + public long getSession() { + return usid.getAsLong(); + } + + void clearSession() { + usid = OptionalLong.empty(); + } + + @Override + public void close() throws ThriftSecurityException { + if (usid.isPresent()) { + try { + closeSession(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + } + + /** + * Checks if there is a lock held by a tserver at a specific host and port. + */ + private boolean isALockHeld(String tserver) { + var root = context.getZooKeeperRoot() + Constants.ZTSERVERS; + var zLockPath = ServiceLock.path(root + "/" + tserver); + return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0; + } + + private void closeSession() throws InterruptedException, ThriftSecurityException { + + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) + .incrementBy(100, MILLISECONDS).maxWait(60, SECONDS).backOffFactor(1.5) + .logInterval(3, MINUTES).createRetry(); + + final HostAndPort parsedServer = HostAndPort.fromString(location); + + long startTime = System.nanoTime(); + + // If somethingFailed is true then the batch writer will throw an exception on close or + // flush, so no need to close this session. Only want to close the session for retryable + // exceptions. + while (!somethingFailed.get()) { + - TabletClientService.Client client = null; ++ TabletIngestClientService.Client client = null; + + // Check if a lock is held by any tserver at the host and port. It does not need to be the + // exact tserver instance that existed when the session was created because if a new + // tserver instance comes up then the session will not exist there. Trying to get the + // exact tserver instance that created the session would require changes to the RPC that + // creates the session and this is not needed. + if (!isALockHeld(location)) { + retry.logCompletion(log, + "No tserver for failed write session " + location + " " + usid); + break; + } + + try { + if (timeout < context.getClientTimeoutInMillis()) { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context, ++ client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, parsedServer, context, + timeout); + } else { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); ++ client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, parsedServer, context); + } + + client.closeUpdate(TraceUtil.traceInfo(), usid.getAsLong()); + retry.logCompletion(log, "Closed failed write session " + location + " " + usid); + break; + } catch (NoSuchScanIDException e) { + retry.logCompletion(log, + "Failed write session no longer exists " + location + " " + usid); + // The session no longer exists, so done + break; + } catch (TApplicationException tae) { + // no need to bother closing session in this case + updateServerErrors(location, tae); + break; + } catch (ThriftSecurityException e) { + throw e; + } catch (TException e) { + retry.waitForNextAttempt(log, "Attempting to close failed write session " + location + + " " + usid + " " + e.getMessage()); + } finally { + ThriftUtil.returnClient(client, context); + } + + // if a timeout is set on the batch writer, then do not retry longer than the timeout + if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > timeout) { + log.debug("Giving up on closing session {} {} and timing out.", location, usid); + throw new TimedOutException(Set.of(location)); + } + } + } + } } // END code for sending mutations to tablet servers using background threads