This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 751c549108 updates WriteAfterCloseIT to also use ConditionalWriter 
(#4662)
751c549108 is described below

commit 751c549108aff51cf55d93861cc254b743d54e5a
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Jun 11 17:08:15 2024 -0400

    updates WriteAfterCloseIT to also use ConditionalWriter (#4662)
    
    Updated WriteAfterCloseIT to use a conditional writer in addition to
    a batch writer.  Made a few improvements to the test.
    
    While reviewing how the conditional writer handles timed out writes
    noticed the tablet server code had an incorrect try/finally block.
    There was code between session reservation and the beginning of
    the try block.  If this inbetween code threw an exception, then it
    would never unreserve the session.  Fixed this and added a few
    comments to the code.
    
    fixes #3742
    
    Co-authored-by: Dom G. <domgargu...@apache.org>
---
 .../accumulo/core/client/ConditionalWriter.java    |  2 +-
 .../core/clientImpl/ConditionalWriterImpl.java     | 11 ++-
 .../accumulo/tserver/TabletClientHandler.java      | 51 +++++++-----
 test/pom.xml                                       |  4 +
 .../apache/accumulo/test/WriteAfterCloseIT.java    | 95 ++++++++++++++--------
 5 files changed, 105 insertions(+), 58 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java 
b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 9f5fab7fe1..79968fe567 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -107,7 +107,7 @@ public interface ConditionalWriter extends AutoCloseable {
      */
     VIOLATED,
     /**
-     * error occurred after mutation was sent to server, its unknown if the 
mutation was written.
+     * Error occurred after mutation was sent to server, its unknown if the 
mutation was written.
      * Although the status of the mutation is unknown, Accumulo guarantees the 
mutation will not be
      * written at a later point in time.
      */
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index e9802300a3..3beedd49bf 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -87,9 +87,13 @@ import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 import org.apache.thrift.TServiceClient;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class ConditionalWriterImpl implements ConditionalWriter {
 
+  private static final Logger log = 
LoggerFactory.getLogger(ConditionalWriterImpl.class);
+
   private static final int MAX_SLEEP = 30000;
 
   private Authorizations auths;
@@ -683,19 +687,24 @@ class ConditionalWriterImpl implements ConditionalWriter {
         // invalidation prevents future attempts to contact the
         // tserver even its gone zombie and is still running w/o a lock
         locator.invalidateCache(context, location.toString());
+        log.trace("tablet server {} {} is dead, so no need to invalidate {}", 
location,
+            sessionId.lockId, sessionId.sessionID);
         return;
       }
 
       try {
         // if the mutation is currently processing, this method will block 
until its done or times
         // out
+        log.trace("Attempting to invalidate {} at {}", sessionId.sessionID, 
location);
         invalidateSession(sessionId.sessionID, location);
-
+        log.trace("Invalidated {} at {}", sessionId.sessionID, location);
         return;
       } catch (TApplicationException tae) {
         throw new AccumuloServerException(location.toString(), tae);
       } catch (TException e) {
         locator.invalidateCache(context, location.toString());
+        log.trace("Failed to invalidate {} at {} {}", sessionId.sessionID, 
location,
+            e.getMessage());
       }
 
       if ((System.currentTimeMillis() - startTime) + sleepTime > timeout) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index acd0a2f4c3..424b399140 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -923,27 +923,29 @@ public class TabletClientHandler implements 
TabletClientService.Iface {
       Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> 
symbols)
       throws NoSuchScanIDException, TException {
 
-    ConditionalSession cs = (ConditionalSession) 
server.sessionManager.reserveSession(sessID);
+    ConditionalSession cs = null;
+    Long opid = null;
 
-    if (cs == null || cs.interruptFlag.get()) {
-      throw new NoSuchScanIDException();
-    }
-
-    if (!cs.tableId.equals(MetadataTable.ID) && 
!cs.tableId.equals(RootTable.ID)) {
-      try {
-        server.resourceManager.waitUntilCommitsAreEnabled();
-      } catch (HoldTimeoutException hte) {
-        // Assumption is that the client has timed out and is gone. If that's 
not the case throw
-        // an exception that will cause it to retry.
-        log.debug("HoldTimeoutException during conditionalUpdate, reporting no 
such session");
+    try {
+      cs = (ConditionalSession) server.sessionManager.reserveSession(sessID);
+      if (cs == null || cs.interruptFlag.get()) {
         throw new NoSuchScanIDException();
       }
-    }
 
-    TableId tid = cs.tableId;
-    long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, 
null, null)));
+      if (!cs.tableId.equals(MetadataTable.ID) && 
!cs.tableId.equals(RootTable.ID)) {
+        try {
+          server.resourceManager.waitUntilCommitsAreEnabled();
+        } catch (HoldTimeoutException hte) {
+          // Assumption is that the client has timed out and is gone. If 
that's not the case throw
+          // an exception that will cause it to retry.
+          log.debug("HoldTimeoutException during conditionalUpdate, reporting 
no such session");
+          throw new NoSuchScanIDException();
+        }
+      }
+
+      TableId tid = cs.tableId;
+      opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, 
null)));
 
-    try {
       // @formatter:off
       Map<KeyExtent, List<ServerConditionalMutation>> updates = 
mutations.entrySet().stream().collect(Collectors.toMap(
                       entry -> KeyExtent.fromThrift(entry.getKey()),
@@ -972,21 +974,30 @@ public class TabletClientHandler implements 
TabletClientService.Iface {
       log.warn("Exception returned for conditionalUpdate {}", e);
       throw e;
     } finally {
-      writeTracker.finishWrite(opid);
-      server.sessionManager.unreserveSession(sessID);
+      if (opid != null) {
+        writeTracker.finishWrite(opid);
+      }
+      if (cs != null) {
+        server.sessionManager.unreserveSession(sessID);
+      }
     }
   }
 
   @Override
   public void invalidateConditionalUpdate(TInfo tinfo, long sessID) {
-    // this method should wait for any running conditional update to complete
-    // after this method returns a conditional update should not be able to 
start
+    // For the given session, this method should wait for any running 
conditional update to
+    // complete. After this method returns a conditional update should not be 
able to start against
+    // this session and nothing should be running.
 
     ConditionalSession cs = (ConditionalSession) 
server.sessionManager.getSession(sessID);
     if (cs != null) {
+      // Setting this may cause anything running to fail. Setting this will 
prevent anything from
+      // starting.
       cs.interruptFlag.set(true);
     }
 
+    // If a thread is currently running and working on the update, then this 
should block until it
+    // un-reserves the session.
     cs = (ConditionalSession) server.sessionManager.reserveSession(sessID, 
true);
     if (cs != null) {
       server.sessionManager.removeSession(sessID, true);
diff --git a/test/pom.xml b/test/pom.xml
index 07fb24f1ae..cd8181e2f8 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -206,6 +206,10 @@
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-engine</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java 
b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
index 7c5324c80b..9193b5c24a 100644
--- a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.security.SecureRandom;
@@ -37,11 +38,16 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.constraints.Constraint;
@@ -51,7 +57,8 @@ import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 public class WriteAfterCloseIT extends AccumuloClusterHarness {
 
@@ -59,6 +66,7 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness 
{
   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
     cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "1s");
     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
+    cfg.setProperty(Property.TSERV_MINTHREADS, "256");
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
@@ -71,6 +79,8 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness 
{
 
     private static final SecureRandom rand = new SecureRandom();
 
+    private static final long SLEEP_TIME = 4000;
+
     @Override
     public String getViolationDescription(short violationCode) {
       return "No such violation";
@@ -86,36 +96,29 @@ public class WriteAfterCloseIT extends 
AccumuloClusterHarness {
 
       // the purpose of this constraint is to just randomly hold up inserts on 
the server side
       if (rand.nextBoolean()) {
-        UtilWaitThread.sleep(4000);
+        UtilWaitThread.sleep(SLEEP_TIME);
       }
 
       return null;
     }
   }
 
-  @Test
-  public void testWriteAfterCloseMillisTime() throws Exception {
-    runTest(TimeType.MILLIS, false, 0, false);
-  }
-
-  @Test
-  public void testWriteAfterCloseLogicalTime() throws Exception {
-    runTest(TimeType.LOGICAL, false, 0, false);
-  }
-
-  @Test
-  public void testWriteAfterCloseKillTservers() throws Exception {
-    runTest(TimeType.MILLIS, true, 0, false);
-  }
-
-  @Test
-  public void testWriteAfterCloseTimeout() throws Exception {
-    // ensure that trying to close seesions does not interfere with timeout
-    runTest(TimeType.MILLIS, false, 2000, true);
-  }
-
-  private void runTest(TimeType timeType, boolean killTservers, long timeout, 
boolean expectErrors)
-      throws Exception {
+  // @formatter:off
+  @ParameterizedTest
+  @CsvSource(
+      value = {"time,   kill,  timeout, conditional",
+              "MILLIS,  false, 0,       false",
+              "LOGICAL, false, 0,       false",
+              "MILLIS,  true,  0,       false",
+              "MILLIS,  false, 2000,    false",
+              "MILLIS,  false, 0,       true",
+              "LOGICAL, false, 0,       true",
+              "MILLIS,  true,  0,       true",
+              "MILLIS,  false, 2000,    true"},
+      useHeadersInDisplayName = true)
+  // @formatter:on
+  public void testWriteAfterClose(TimeType timeType, boolean killTservers, 
long timeout,
+      boolean useConditionalWriter) throws Exception {
     // re #3721 test that tries to cause a write event to happen after a batch 
writer is closed
     String table = getUniqueNames(1)[0];
     var props = new Properties();
@@ -138,7 +141,8 @@ public class WriteAfterCloseIT extends 
AccumuloClusterHarness {
       List<Future<?>> futures = new ArrayList<>();
 
       for (int i = 0; i < 100; i++) {
-        futures.add(executor.submit(createWriteTask(i * 1000, c, table, 
timeout)));
+        futures.add(
+            executor.submit(createWriteTask(i * 1000, c, table, timeout, 
useConditionalWriter)));
       }
 
       if (killTservers) {
@@ -156,15 +160,23 @@ public class WriteAfterCloseIT extends 
AccumuloClusterHarness {
         try {
           future.get();
         } catch (ExecutionException e) {
+          var cause = e.getCause();
+          while (cause != null && !(cause instanceof TimedOutException)) {
+            cause = cause.getCause();
+          }
+
+          assertNotNull(cause);
           errorCount++;
         }
       }
 
+      boolean expectErrors = timeout > 0;
       if (expectErrors) {
         assertTrue(errorCount > 0);
       } else {
         assertEquals(0, errorCount);
-
+        // allow potential out-of-order writes on a tserver to run
+        Thread.sleep(SleepyConstraint.SLEEP_TIME);
         try (Scanner scanner = c.createScanner(table)) {
           // every insertion was deleted so table should be empty unless there 
were out of order
           // writes
@@ -177,17 +189,28 @@ public class WriteAfterCloseIT extends 
AccumuloClusterHarness {
   }
 
   private static Callable<Void> createWriteTask(int row, AccumuloClient c, 
String table,
-      long timeout) {
+      long timeout, boolean useConditionalWriter) {
     return () -> {
-
-      BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout, 
TimeUnit.MILLISECONDS);
-
-      try (BatchWriter writer = c.createBatchWriter(table, bwc)) {
-        Mutation m = new Mutation("r" + row);
-        m.put("f1", "q1", new Value("v1"));
-        writer.addMutation(m);
+      if (useConditionalWriter) {
+        ConditionalWriterConfig cwc =
+            new ConditionalWriterConfig().setTimeout(timeout, 
TimeUnit.MILLISECONDS);
+        try (ConditionalWriter writer = c.createConditionalWriter(table, cwc)) 
{
+          ConditionalMutation m = new ConditionalMutation("r" + row);
+          m.addCondition(new Condition("f1", "q1"));
+          m.put("f1", "q1", new Value("v1"));
+          ConditionalWriter.Result result = writer.write(m);
+          var status = result.getStatus();
+          assertTrue(status == ConditionalWriter.Status.ACCEPTED
+              || status == ConditionalWriter.Status.UNKNOWN);
+        }
+      } else {
+        BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout, 
TimeUnit.MILLISECONDS);
+        try (BatchWriter writer = c.createBatchWriter(table, bwc)) {
+          Mutation m = new Mutation("r" + row);
+          m.put("f1", "q1", new Value("v1"));
+          writer.addMutation(m);
+        }
       }
-
       // Relying on the internal retries of the batch writer, trying to create 
a situation where
       // some of the writes from above actually happen after the delete below 
which would negate the
       // delete.

Reply via email to