This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 770c05b46a4e3c135c85d0772ebb2b02d9db5414
Merge: 1b6dfa7915 c78dfc5235
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Thu Aug 31 07:23:58 2023 -0400

    Merge branch '2.1'

 .../core/clientImpl/TabletServerBatchWriter.java   | 260 +++++++++++++++------
 .../accumulo/tserver/TabletClientHandler.java      |  97 ++++----
 .../apache/accumulo/test/WriteAfterCloseIT.java    | 203 ++++++++++++++++
 3 files changed, 445 insertions(+), 115 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index e0dd94d216,de66339886..cf6821c649
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@@ -66,12 -71,15 +72,13 @@@ import org.apache.accumulo.core.dataImp
  import org.apache.accumulo.core.dataImpl.TabletIdImpl;
  import org.apache.accumulo.core.dataImpl.thrift.TMutation;
  import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
 -import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
++import org.apache.accumulo.core.lock.ServiceLock;
  import org.apache.accumulo.core.rpc.ThriftUtil;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
- import 
org.apache.accumulo.core.tabletingest.thrift.ConstraintViolationException;
 +import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService;
- import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+ import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
  import org.apache.accumulo.core.trace.TraceUtil;
 -import org.apache.accumulo.core.trace.thrift.TInfo;
 -import org.apache.accumulo.core.util.HostAndPort;
+ import org.apache.accumulo.core.util.Retry;
  import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.thrift.TApplicationException;
@@@ -931,15 -945,17 +945,17 @@@ public class TabletServerBatchWriter im
  
        timeoutTracker.startingWrite();
  
+       // If there is an open session, must close it before the batchwriter 
closes or writes could
+       // happen after the batch writer closes. See #3721
        try {
          final HostAndPort parsedServer = HostAndPort.fromString(location);
 -        final TabletClientService.Iface client;
 +        final TabletIngestClientService.Iface client;
  
          if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) 
{
 -          client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
parsedServer, context,
 +          client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, 
parsedServer, context,
                timeoutTracker.getTimeOut());
          } else {
 -          client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
parsedServer, context);
 +          client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, 
parsedServer, context);
          }
  
          try {
@@@ -1038,6 -1048,114 +1048,114 @@@
          throw new IOException(e);
        }
      }
+ 
+     class SessionCloser implements AutoCloseable {
+ 
+       private final String location;
+       private OptionalLong usid;
+ 
+       SessionCloser(String location) {
+         this.location = location;
+         usid = OptionalLong.empty();
+       }
+ 
+       void setSession(long usid) {
+         this.usid = OptionalLong.of(usid);
+       }
+ 
+       public long getSession() {
+         return usid.getAsLong();
+       }
+ 
+       void clearSession() {
+         usid = OptionalLong.empty();
+       }
+ 
+       @Override
+       public void close() throws ThriftSecurityException {
+         if (usid.isPresent()) {
+           try {
+             closeSession();
+           } catch (InterruptedException e) {
+             throw new IllegalStateException(e);
+           }
+         }
+       }
+ 
+       /**
+        * Checks if there is a lock held by a tserver at a specific host and 
port.
+        */
+       private boolean isALockHeld(String tserver) {
+         var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+         var zLockPath = ServiceLock.path(root + "/" + tserver);
+         return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 
0;
+       }
+ 
+       private void closeSession() throws InterruptedException, 
ThriftSecurityException {
+ 
+         Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
+             .incrementBy(100, MILLISECONDS).maxWait(60, 
SECONDS).backOffFactor(1.5)
+             .logInterval(3, MINUTES).createRetry();
+ 
+         final HostAndPort parsedServer = HostAndPort.fromString(location);
+ 
+         long startTime = System.nanoTime();
+ 
+         // If somethingFailed is true then the batch writer will throw an 
exception on close or
+         // flush, so no need to close this session. Only want to close the 
session for retryable
+         // exceptions.
+         while (!somethingFailed.get()) {
+ 
 -          TabletClientService.Client client = null;
++          TabletIngestClientService.Client client = null;
+ 
+           // Check if a lock is held by any tserver at the host and port. It 
does not need to be the
+           // exact tserver instance that existed when the session was created 
because if a new
+           // tserver instance comes up then the session will not exist there. 
Trying to get the
+           // exact tserver instance that created the session would require 
changes to the RPC that
+           // creates the session and this is not needed.
+           if (!isALockHeld(location)) {
+             retry.logCompletion(log,
+                 "No tserver for failed write session " + location + " " + 
usid);
+             break;
+           }
+ 
+           try {
+             if (timeout < context.getClientTimeoutInMillis()) {
 -              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
parsedServer, context,
++              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, 
parsedServer, context,
+                   timeout);
+             } else {
 -              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
parsedServer, context);
++              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, 
parsedServer, context);
+             }
+ 
+             client.closeUpdate(TraceUtil.traceInfo(), usid.getAsLong());
+             retry.logCompletion(log, "Closed failed write session " + 
location + " " + usid);
+             break;
+           } catch (NoSuchScanIDException e) {
+             retry.logCompletion(log,
+                 "Failed write session no longer exists " + location + " " + 
usid);
+             // The session no longer exists, so done
+             break;
+           } catch (TApplicationException tae) {
+             // no need to bother closing session in this case
+             updateServerErrors(location, tae);
+             break;
+           } catch (ThriftSecurityException e) {
+             throw e;
+           } catch (TException e) {
+             retry.waitForNextAttempt(log, "Attempting to close failed write 
session " + location
+                 + " " + usid + " " + e.getMessage());
+           } finally {
+             ThriftUtil.returnClient(client, context);
+           }
+ 
+           // if a timeout is set on the batch writer, then do not retry 
longer than the timeout
+           if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > 
timeout) {
+             log.debug("Giving up on closing session {} {} and timing out.", 
location, usid);
+             throw new TimedOutException(Set.of(location));
+           }
+         }
+       }
+     }
    }
  
    // END code for sending mutations to tablet servers using background threads

Reply via email to