This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c78dfc5235 ensures no writes happen after batch writer closes (#3733)
c78dfc5235 is described below

commit c78dfc5235f74c03d149971f2fb63e55ed27c6c1
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Aug 31 07:08:14 2023 -0400

    ensures no writes happen after batch writer closes (#3733)
    
    Fixes a problem with the batch writer where when retries happened that
    writes could possibly happen after the batch writer was closed.  Adds a
    test that causes writers after close without the fixes in this PR.
    
    This fixes #3721
    
    ---------
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../core/clientImpl/TabletServerBatchWriter.java   | 260 +++++++++++++++------
 .../accumulo/tserver/TabletClientHandler.java      |  97 ++++----
 .../apache/accumulo/test/WriteAfterCloseIT.java    | 203 ++++++++++++++++
 3 files changed, 445 insertions(+), 115 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index c5740c02f8..de66339886 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -19,6 +19,8 @@
 package org.apache.accumulo.core.clientImpl;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
@@ -35,15 +37,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -65,14 +71,15 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 import org.apache.accumulo.core.dataImpl.thrift.TMutation;
 import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
-import 
org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.thrift.TApplicationException;
@@ -160,7 +167,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
   private final HashSet<String> serverSideErrors = new HashSet<>();
   private final FailedMutations failedMutations;
   private int unknownErrors = 0;
-  private boolean somethingFailed = false;
+  private final AtomicBoolean somethingFailed = new AtomicBoolean(false);
   private Exception lastUnknownError = null;
 
   private static class TimeoutTracker {
@@ -263,7 +270,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
 
     checkForFailures();
 
-    waitRTE(() -> (totalMemUsed > maxMem || flushing) && !somethingFailed);
+    waitRTE(() -> (totalMemUsed > maxMem || flushing) && 
!somethingFailed.get());
 
     // do checks again since things could have changed while waiting and not 
holding lock
     if (closed) {
@@ -323,7 +330,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
 
       if (flushing) {
         // some other thread is currently flushing, so wait
-        waitRTE(() -> flushing && !somethingFailed);
+        waitRTE(() -> flushing && !somethingFailed.get());
 
         checkForFailures();
 
@@ -335,7 +342,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
       startProcessing();
       checkForFailures();
 
-      waitRTE(() -> totalMemUsed > 0 && !somethingFailed);
+      waitRTE(() -> totalMemUsed > 0 && !somethingFailed.get());
 
       flushing = false;
       this.notifyAll();
@@ -362,7 +369,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
 
       startProcessing();
 
-      waitRTE(() -> totalMemUsed > 0 && !somethingFailed);
+      waitRTE(() -> totalMemUsed > 0 && !somethingFailed.get());
 
       logStats();
 
@@ -508,7 +515,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
   private void updatedConstraintViolations(List<ConstraintViolationSummary> 
cvsList) {
     if (!cvsList.isEmpty()) {
       synchronized (this) {
-        somethingFailed = true;
+        somethingFailed.set(true);
         violations.add(cvsList);
         this.notifyAll();
       }
@@ -524,7 +531,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
           .forEach(context::requireNotDeleted);
 
       synchronized (this) {
-        somethingFailed = true;
+        somethingFailed.set(true);
         // add these authorizationFailures to those collected by this batch 
writer
         authorizationFailures.forEach((ke, code) -> this.authorizationFailures
             .computeIfAbsent(ke, k -> new HashSet<>()).add(code));
@@ -534,14 +541,14 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
   }
 
   private synchronized void updateServerErrors(String server, Exception e) {
-    somethingFailed = true;
+    somethingFailed.set(true);
     this.serverSideErrors.add(server);
     this.notifyAll();
     log.error("Server side error on {}", server, e);
   }
 
   private synchronized void updateUnknownErrors(String msg, Exception t) {
-    somethingFailed = true;
+    somethingFailed.set(true);
     unknownErrors++;
     this.lastUnknownError = t;
     this.notifyAll();
@@ -554,7 +561,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
   }
 
   private void checkForFailures() throws MutationsRejectedException {
-    if (somethingFailed) {
+    if (somethingFailed.get()) {
       List<ConstraintViolationSummary> cvsList = violations.asList();
       
HashMap<TabletId,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>
 af =
           new HashMap<>();
@@ -871,7 +878,15 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
             }
 
             long st1 = System.currentTimeMillis();
-            failures = sendMutationsToTabletServer(location, mutationBatch, 
timeoutTracker);
+            try (SessionCloser sessionCloser = new SessionCloser(location)) {
+              failures = sendMutationsToTabletServer(location, mutationBatch, 
timeoutTracker,
+                  sessionCloser);
+            } catch (ThriftSecurityException e) {
+              updateAuthorizationFailures(
+                  mutationBatch.keySet().stream().collect(toMap(identity(), ke 
-> e.code)));
+              throw new AccumuloSecurityException(e.user, e.code, e);
+            }
+
             long st2 = System.currentTimeMillis();
             if (log.isTraceEnabled()) {
               log.trace("sent " + String.format("%,d", count) + " mutations to 
" + location + " in "
@@ -901,9 +916,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
             span.end();
           }
         } catch (IOException e) {
-          if (log.isTraceEnabled()) {
-            log.trace("failed to send mutations to {} : {}", location, 
e.getMessage());
-          }
+          log.debug("failed to send mutations to {} : {}", location, 
e.getMessage());
 
           HashSet<TableId> tables = new HashSet<>();
           for (KeyExtent ke : mutationBatch.keySet()) {
@@ -922,7 +935,8 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
     }
 
     private MutationSet sendMutationsToTabletServer(String location,
-        Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker)
+        Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker,
+        SessionCloser sessionCloser)
         throws IOException, AccumuloSecurityException, AccumuloServerException 
{
       if (tabMuts.isEmpty()) {
         return new MutationSet();
@@ -931,6 +945,8 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
 
       timeoutTracker.startingWrite();
 
+      // If there is an open session, must close it before the batchwriter 
closes or writes could
+      // happen after the batch writer closes. See #3721
       try {
         final HostAndPort parsedServer = HostAndPort.fromString(location);
         final TabletClientService.Iface client;
@@ -945,81 +961,71 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
         try {
           MutationSet allFailures = new MutationSet();
 
-          if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() 
== 1) {
-            Entry<KeyExtent,List<Mutation>> entry = 
tabMuts.entrySet().iterator().next();
+          // set the session on the sessionCloser so that any failures after 
this point will close
+          // the session if needed
+          sessionCloser.setSession(
+              client.startUpdate(tinfo, context.rpcCreds(), 
DurabilityImpl.toThrift(durability)));
+
+          List<TMutation> updates = new ArrayList<>();
+          for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
+            long size = 0;
+            Iterator<Mutation> iter = entry.getValue().iterator();
+            while (iter.hasNext()) {
+              while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
+                Mutation mutation = iter.next();
+                updates.add(mutation.toThrift());
+                size += mutation.numBytes();
+              }
 
-            try {
-              client.update(tinfo, context.rpcCreds(), 
entry.getKey().toThrift(),
-                  entry.getValue().get(0).toThrift(), 
DurabilityImpl.toThrift(durability));
-            } catch (NotServingTabletException e) {
-              allFailures.addAll(entry.getKey().tableId(), entry.getValue());
-              
getLocator(entry.getKey().tableId()).invalidateCache(entry.getKey());
-            } catch (ConstraintViolationException e) {
-              updatedConstraintViolations(e.violationSummaries.stream()
-                  .map(ConstraintViolationSummary::new).collect(toList()));
+              client.applyUpdates(tinfo, sessionCloser.getSession(), 
entry.getKey().toThrift(),
+                  updates);
+              updates.clear();
+              size = 0;
             }
-            timeoutTracker.madeProgress();
-          } else {
+          }
 
-            long usid =
-                client.startUpdate(tinfo, context.rpcCreds(), 
DurabilityImpl.toThrift(durability));
-
-            List<TMutation> updates = new ArrayList<>();
-            for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
-              long size = 0;
-              Iterator<Mutation> iter = entry.getValue().iterator();
-              while (iter.hasNext()) {
-                while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
-                  Mutation mutation = iter.next();
-                  updates.add(mutation.toThrift());
-                  size += mutation.numBytes();
-                }
+          UpdateErrors updateErrors = client.closeUpdate(tinfo, 
sessionCloser.getSession());
 
-                client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), 
updates);
-                updates.clear();
-                size = 0;
-              }
-            }
+          // the write completed successfully so no need to close the session
+          sessionCloser.clearSession();
 
-            UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
-
-            // @formatter:off
+          // @formatter:off
             Map<KeyExtent,Long> failures = 
updateErrors.failedExtents.entrySet().stream().collect(toMap(
                             entry -> KeyExtent.fromThrift(entry.getKey()),
                             Entry::getValue
             ));
             // @formatter:on
-            
updatedConstraintViolations(updateErrors.violationSummaries.stream()
-                .map(ConstraintViolationSummary::new).collect(toList()));
-            // @formatter:off
+          updatedConstraintViolations(updateErrors.violationSummaries.stream()
+              .map(ConstraintViolationSummary::new).collect(toList()));
+          // @formatter:off
             
updateAuthorizationFailures(updateErrors.authorizationFailures.entrySet().stream().collect(toMap(
                             entry -> KeyExtent.fromThrift(entry.getKey()),
                             Entry::getValue
             )));
             // @formatter:on
-            long totalCommitted = 0;
+          long totalCommitted = 0;
 
-            for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
-              KeyExtent failedExtent = entry.getKey();
-              int numCommitted = (int) (long) entry.getValue();
-              totalCommitted += numCommitted;
+          for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
+            KeyExtent failedExtent = entry.getKey();
+            int numCommitted = (int) (long) entry.getValue();
+            totalCommitted += numCommitted;
 
-              TableId tableId = failedExtent.tableId();
+            TableId tableId = failedExtent.tableId();
 
-              getLocator(tableId).invalidateCache(failedExtent);
+            getLocator(tableId).invalidateCache(failedExtent);
 
-              List<Mutation> mutations = tabMuts.get(failedExtent);
-              allFailures.addAll(tableId, mutations.subList(numCommitted, 
mutations.size()));
-            }
+            List<Mutation> mutations = tabMuts.get(failedExtent);
+            allFailures.addAll(tableId, mutations.subList(numCommitted, 
mutations.size()));
+          }
 
-            if (failures.keySet().containsAll(tabMuts.keySet()) && 
totalCommitted == 0) {
-              // nothing was successfully written
-              timeoutTracker.wroteNothing();
-            } else {
-              // successfully wrote something to tablet server
-              timeoutTracker.madeProgress();
-            }
+          if (failures.keySet().containsAll(tabMuts.keySet()) && 
totalCommitted == 0) {
+            // nothing was successfully written
+            timeoutTracker.wroteNothing();
+          } else {
+            // successfully wrote something to tablet server
+            timeoutTracker.madeProgress();
           }
+
           return allFailures;
         } finally {
           ThriftUtil.returnClient((TServiceClient) client, context);
@@ -1028,9 +1034,13 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
         timeoutTracker.errorOccured();
         throw new IOException(e);
       } catch (TApplicationException tae) {
+        // no need to close the session when unretryable errors happen
+        sessionCloser.clearSession();
         updateServerErrors(location, tae);
         throw new AccumuloServerException(location, tae);
       } catch (ThriftSecurityException e) {
+        // no need to close the session when unretryable errors happen
+        sessionCloser.clearSession();
         updateAuthorizationFailures(
             tabMuts.keySet().stream().collect(toMap(identity(), ke -> 
e.code)));
         throw new AccumuloSecurityException(e.user, e.code, e);
@@ -1038,6 +1048,114 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
         throw new IOException(e);
       }
     }
+
+    class SessionCloser implements AutoCloseable {
+
+      private final String location;
+      private OptionalLong usid;
+
+      SessionCloser(String location) {
+        this.location = location;
+        usid = OptionalLong.empty();
+      }
+
+      void setSession(long usid) {
+        this.usid = OptionalLong.of(usid);
+      }
+
+      public long getSession() {
+        return usid.getAsLong();
+      }
+
+      void clearSession() {
+        usid = OptionalLong.empty();
+      }
+
+      @Override
+      public void close() throws ThriftSecurityException {
+        if (usid.isPresent()) {
+          try {
+            closeSession();
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
+          }
+        }
+      }
+
+      /**
+       * Checks if there is a lock held by a tserver at a specific host and 
port.
+       */
+      private boolean isALockHeld(String tserver) {
+        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
+        var zLockPath = ServiceLock.path(root + "/" + tserver);
+        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+      }
+
+      private void closeSession() throws InterruptedException, 
ThriftSecurityException {
+
+        Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
+            .incrementBy(100, MILLISECONDS).maxWait(60, 
SECONDS).backOffFactor(1.5)
+            .logInterval(3, MINUTES).createRetry();
+
+        final HostAndPort parsedServer = HostAndPort.fromString(location);
+
+        long startTime = System.nanoTime();
+
+        // If somethingFailed is true then the batch writer will throw an 
exception on close or
+        // flush, so no need to close this session. Only want to close the 
session for retryable
+        // exceptions.
+        while (!somethingFailed.get()) {
+
+          TabletClientService.Client client = null;
+
+          // Check if a lock is held by any tserver at the host and port. It 
does not need to be the
+          // exact tserver instance that existed when the session was created 
because if a new
+          // tserver instance comes up then the session will not exist there. 
Trying to get the
+          // exact tserver instance that created the session would require 
changes to the RPC that
+          // creates the session and this is not needed.
+          if (!isALockHeld(location)) {
+            retry.logCompletion(log,
+                "No tserver for failed write session " + location + " " + 
usid);
+            break;
+          }
+
+          try {
+            if (timeout < context.getClientTimeoutInMillis()) {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
parsedServer, context,
+                  timeout);
+            } else {
+              client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
parsedServer, context);
+            }
+
+            client.closeUpdate(TraceUtil.traceInfo(), usid.getAsLong());
+            retry.logCompletion(log, "Closed failed write session " + location 
+ " " + usid);
+            break;
+          } catch (NoSuchScanIDException e) {
+            retry.logCompletion(log,
+                "Failed write session no longer exists " + location + " " + 
usid);
+            // The session no longer exists, so done
+            break;
+          } catch (TApplicationException tae) {
+            // no need to bother closing session in this case
+            updateServerErrors(location, tae);
+            break;
+          } catch (ThriftSecurityException e) {
+            throw e;
+          } catch (TException e) {
+            retry.waitForNextAttempt(log, "Attempting to close failed write 
session " + location
+                + " " + usid + " " + e.getMessage());
+          } finally {
+            ThriftUtil.returnClient(client, context);
+          }
+
+          // if a timeout is set on the batch writer, then do not retry longer 
than the timeout
+          if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) > 
timeout) {
+            log.debug("Giving up on closing session {} {} and timing out.", 
location, usid);
+            throw new TimedOutException(Set.of(location));
+          }
+        }
+      }
+    }
   }
 
   // END code for sending mutations to tablet servers using background threads
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 92c06c85e1..7e19515de6 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -528,57 +528,66 @@ public class TabletClientHandler implements 
TabletClientService.Iface {
 
   @Override
   public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws 
NoSuchScanIDException {
-    final UpdateSession us = (UpdateSession) 
server.sessionManager.removeSession(updateID);
+    // Reserve the session and wait for any write that may currently have it 
reserved. Once reserved
+    // no write stragglers can start against this session id.
+    final UpdateSession us = (UpdateSession) 
server.sessionManager.reserveSession(updateID, true);
     if (us == null) {
       throw new NoSuchScanIDException();
     }
 
-    // clients may or may not see data from an update session while
-    // it is in progress, however when the update session is closed
-    // want to ensure that reads wait for the write to finish
-    long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-
     try {
-      flush(us);
-    } catch (HoldTimeoutException e) {
-      // Assumption is that the client has timed out and is gone. If that's 
not the case throw an
-      // exception that will cause it to retry.
-      log.debug("HoldTimeoutException during closeUpdate, reporting no such 
session");
-      throw new NoSuchScanIDException();
+      // clients may or may not see data from an update session while
+      // it is in progress, however when the update session is closed
+      // want to ensure that reads wait for the write to finish
+      long opid = writeTracker.startWrite(us.queuedMutations.keySet());
+
+      try {
+        flush(us);
+      } catch (HoldTimeoutException e) {
+        // Assumption is that the client has timed out and is gone. If that's 
not the case throw an
+        // exception that will cause it to retry.
+        log.debug("HoldTimeoutException during closeUpdate, reporting no such 
session");
+        throw new NoSuchScanIDException();
+      } finally {
+        writeTracker.finishWrite(opid);
+      }
+
+      if (log.isTraceEnabled()) {
+        log.trace(
+            String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs 
lt=%.3fs ct=%.3fs)",
+                TServerUtils.clientAddress.get(), us.totalUpdates,
+                (System.currentTimeMillis() - us.startTime) / 1000.0, 
us.authTimes,
+                us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, 
us.walogTimes.sum() / 1000.0,
+                us.commitTimes.sum() / 1000.0));
+      }
+      if (!us.failures.isEmpty()) {
+        Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
+        log.debug(String.format("Failures: %d, first extent %s successful 
commits: %d",
+            us.failures.size(), first.getKey().toString(), first.getValue()));
+      }
+      List<ConstraintViolationSummary> violations = us.violations.asList();
+      if (!violations.isEmpty()) {
+        ConstraintViolationSummary first = 
us.violations.asList().iterator().next();
+        log.debug(String.format("Violations: %d, first %s occurs %d", 
violations.size(),
+            first.violationDescription, first.numberOfViolatingMutations));
+      }
+      if (!us.authFailures.isEmpty()) {
+        KeyExtent first = us.authFailures.keySet().iterator().next();
+        log.debug(String.format("Authentication Failures: %d, first %s", 
us.authFailures.size(),
+            first.toString()));
+      }
+      return new UpdateErrors(
+          us.failures.entrySet().stream()
+              .collect(Collectors.toMap(e -> e.getKey().toThrift(), 
Entry::getValue)),
+          violations.stream().map(ConstraintViolationSummary::toThrift)
+              .collect(Collectors.toList()),
+          us.authFailures.entrySet().stream()
+              .collect(Collectors.toMap(e -> e.getKey().toThrift(), 
Entry::getValue)));
     } finally {
-      writeTracker.finishWrite(opid);
+      // Atomically unreserve and delete the session. If there any write 
stragglers, they will fail
+      // after this point.
+      server.sessionManager.removeSession(updateID, true);
     }
-
-    if (log.isTraceEnabled()) {
-      log.trace(
-          String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs 
lt=%.3fs ct=%.3fs)",
-              TServerUtils.clientAddress.get(), us.totalUpdates,
-              (System.currentTimeMillis() - us.startTime) / 1000.0, 
us.authTimes,
-              us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0, 
us.walogTimes.sum() / 1000.0,
-              us.commitTimes.sum() / 1000.0));
-    }
-    if (!us.failures.isEmpty()) {
-      Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
-      log.debug(String.format("Failures: %d, first extent %s successful 
commits: %d",
-          us.failures.size(), first.getKey().toString(), first.getValue()));
-    }
-    List<ConstraintViolationSummary> violations = us.violations.asList();
-    if (!violations.isEmpty()) {
-      ConstraintViolationSummary first = 
us.violations.asList().iterator().next();
-      log.debug(String.format("Violations: %d, first %s occurs %d", 
violations.size(),
-          first.violationDescription, first.numberOfViolatingMutations));
-    }
-    if (!us.authFailures.isEmpty()) {
-      KeyExtent first = us.authFailures.keySet().iterator().next();
-      log.debug(String.format("Authentication Failures: %d, first %s", 
us.authFailures.size(),
-          first.toString()));
-    }
-    return new UpdateErrors(
-        us.failures.entrySet().stream()
-            .collect(Collectors.toMap(e -> e.getKey().toThrift(), 
Entry::getValue)),
-        
violations.stream().map(ConstraintViolationSummary::toThrift).collect(Collectors.toList()),
-        us.authFailures.entrySet().stream()
-            .collect(Collectors.toMap(e -> e.getKey().toThrift(), 
Entry::getValue)));
   }
 
   @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java 
b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
new file mode 100644
index 0000000000..7c5324c80b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.security.SecureRandom;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.constraints.Constraint;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.jupiter.api.Test;
+
+public class WriteAfterCloseIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "1s");
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofSeconds(300);
+  }
+
+  public static class SleepyConstraint implements Constraint {
+
+    private static final SecureRandom rand = new SecureRandom();
+
+    @Override
+    public String getViolationDescription(short violationCode) {
+      return "No such violation";
+    }
+
+    @Override
+    public List<Short> check(Environment env, Mutation mutation) {
+
+      if (mutation.getUpdates().stream().anyMatch(ColumnUpdate::isDeleted)) {
+        // only want to randomly sleep for inserts, not deletes
+        return null;
+      }
+
+      // the purpose of this constraint is to just randomly hold up inserts on 
the server side
+      if (rand.nextBoolean()) {
+        UtilWaitThread.sleep(4000);
+      }
+
+      return null;
+    }
+  }
+
+  @Test
+  public void testWriteAfterCloseMillisTime() throws Exception {
+    runTest(TimeType.MILLIS, false, 0, false);
+  }
+
+  @Test
+  public void testWriteAfterCloseLogicalTime() throws Exception {
+    runTest(TimeType.LOGICAL, false, 0, false);
+  }
+
+  @Test
+  public void testWriteAfterCloseKillTservers() throws Exception {
+    runTest(TimeType.MILLIS, true, 0, false);
+  }
+
+  @Test
+  public void testWriteAfterCloseTimeout() throws Exception {
+    // ensure that trying to close seesions does not interfere with timeout
+    runTest(TimeType.MILLIS, false, 2000, true);
+  }
+
+  private void runTest(TimeType timeType, boolean killTservers, long timeout, 
boolean expectErrors)
+      throws Exception {
+    // re #3721 test that tries to cause a write event to happen after a batch 
writer is closed
+    String table = getUniqueNames(1)[0];
+    var props = new Properties();
+    props.putAll(getClientProps());
+    props.setProperty(Property.GENERAL_RPC_TIMEOUT.getKey(), "1s");
+
+    NewTableConfiguration ntc = new 
NewTableConfiguration().setTimeType(timeType);
+    ntc.setProperties(
+        Map.of(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", 
SleepyConstraint.class.getName()));
+
+    // The short rpc timeout and the random sleep in the constraint can cause 
some of the writes
+    // done by a batch writer to timeout. The batch writer will internally 
retry the write, but the
+    // timed out write could still go through at a later time.
+
+    var executor = Executors.newCachedThreadPool();
+
+    try (AccumuloClient c = Accumulo.newClient().from(props).build()) {
+      c.tableOperations().create(table, ntc);
+
+      List<Future<?>> futures = new ArrayList<>();
+
+      for (int i = 0; i < 100; i++) {
+        futures.add(executor.submit(createWriteTask(i * 1000, c, table, 
timeout)));
+      }
+
+      if (killTservers) {
+        Thread.sleep(250);
+        
getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+        // sleep longer than ZK timeout to let ephemeral lock nodes expire in 
ZK
+        Thread.sleep(11000);
+        
getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+      }
+
+      int errorCount = 0;
+
+      // wait for all futures to complete
+      for (var future : futures) {
+        try {
+          future.get();
+        } catch (ExecutionException e) {
+          errorCount++;
+        }
+      }
+
+      if (expectErrors) {
+        assertTrue(errorCount > 0);
+      } else {
+        assertEquals(0, errorCount);
+
+        try (Scanner scanner = c.createScanner(table)) {
+          // every insertion was deleted so table should be empty unless there 
were out of order
+          // writes
+          assertEquals(0, scanner.stream().count());
+        }
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+  }
+
+  private static Callable<Void> createWriteTask(int row, AccumuloClient c, 
String table,
+      long timeout) {
+    return () -> {
+
+      BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout, 
TimeUnit.MILLISECONDS);
+
+      try (BatchWriter writer = c.createBatchWriter(table, bwc)) {
+        Mutation m = new Mutation("r" + row);
+        m.put("f1", "q1", new Value("v1"));
+        writer.addMutation(m);
+      }
+
+      // Relying on the internal retries of the batch writer, trying to create 
a situation where
+      // some of the writes from above actually happen after the delete below 
which would negate the
+      // delete.
+
+      try (BatchWriter writer = c.createBatchWriter(table)) {
+        Mutation m = new Mutation("r" + row);
+        m.putDelete("f1", "q1");
+        writer.addMutation(m);
+      }
+      return null;
+    };
+  }
+}

Reply via email to