This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit b6954a7a39b3db423b518f383a401145471cb038
Merge: 1e866ff09d 751c549108
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Jun 11 17:25:13 2024 -0400

    Merge branch '2.1'

 .../accumulo/core/client/ConditionalWriter.java    |  2 +-
 .../core/clientImpl/ConditionalWriterImpl.java     | 11 ++-
 .../accumulo/tserver/TabletClientHandler.java      | 56 ++++++++-----
 .../apache/accumulo/test/WriteAfterCloseIT.java    | 95 ++++++++++++++--------
 .../compaction/ExternalCompactionProgressIT.java   |  6 +-
 5 files changed, 107 insertions(+), 63 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index 02aeef6bf6,3beedd49bf..3137a19f00
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@@ -86,11 -87,13 +86,15 @@@ import org.apache.thrift.TApplicationEx
  import org.apache.thrift.TException;
  import org.apache.thrift.TServiceClient;
  import org.apache.thrift.transport.TTransportException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
 +import com.google.common.net.HostAndPort;
 +
  class ConditionalWriterImpl implements ConditionalWriter {
  
+   private static final Logger log = 
LoggerFactory.getLogger(ConditionalWriterImpl.class);
+ 
    private static final int MAX_SLEEP = 30000;
  
    private Authorizations auths;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index af16beed4c,424b399140..af51429e09
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@@ -788,28 -923,29 +788,30 @@@ public class TabletClientHandler implem
        Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> 
symbols)
        throws NoSuchScanIDException, TException {
  
-     ConditionalSession cs = (ConditionalSession) 
server.sessionManager.reserveSession(sessID);
+     ConditionalSession cs = null;
+     Long opid = null;
  
-     if (cs == null || cs.interruptFlag.get()) {
-       throw new NoSuchScanIDException();
-     }
- 
-     if (!cs.tableId.equals(AccumuloTable.METADATA.tableId())
-         && !cs.tableId.equals(AccumuloTable.ROOT.tableId())) {
-       try {
-         server.resourceManager.waitUntilCommitsAreEnabled();
-       } catch (HoldTimeoutException hte) {
-         // Assumption is that the client has timed out and is gone. If that's 
not the case throw
-         // an exception that will cause it to retry.
-         log.debug("HoldTimeoutException during conditionalUpdate, reporting 
no such session");
+     try {
+       cs = (ConditionalSession) server.sessionManager.reserveSession(sessID);
+       if (cs == null || cs.interruptFlag.get()) {
          throw new NoSuchScanIDException();
        }
-     }
  
-     TableId tid = cs.tableId;
-     long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, 
null, null)));
 -      if (!cs.tableId.equals(MetadataTable.ID) && 
!cs.tableId.equals(RootTable.ID)) {
++      if (!cs.tableId.equals(AccumuloTable.METADATA.tableId())
++          && !cs.tableId.equals(AccumuloTable.ROOT.tableId())) {
+         try {
+           server.resourceManager.waitUntilCommitsAreEnabled();
+         } catch (HoldTimeoutException hte) {
+           // Assumption is that the client has timed out and is gone. If 
that's not the case throw
+           // an exception that will cause it to retry.
+           log.debug("HoldTimeoutException during conditionalUpdate, reporting 
no such session");
+           throw new NoSuchScanIDException();
+         }
+       }
+ 
+       TableId tid = cs.tableId;
+       opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, 
null)));
  
-     try {
        // @formatter:off
        Map<KeyExtent, List<ServerConditionalMutation>> updates = 
mutations.entrySet().stream().collect(Collectors.toMap(
                        entry -> KeyExtent.fromThrift(entry.getKey()),
@@@ -835,11 -971,15 +837,16 @@@
      } catch (IOException ioe) {
        throw new TException(ioe);
      } catch (Exception e) {
-       log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: 
{}", tid, opid, e);
 -      log.warn("Exception returned for conditionalUpdate {}", e);
++      log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: 
{}",
++          cs == null ? null : cs.tableId, opid, e);
        throw e;
      } finally {
-       writeTracker.finishWrite(opid);
-       server.sessionManager.unreserveSession(sessID);
+       if (opid != null) {
+         writeTracker.finishWrite(opid);
+       }
+       if (cs != null) {
+         server.sessionManager.unreserveSession(sessID);
+       }
      }
    }
  
diff --cc test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
index 34aa1de519,9193b5c24a..6892a35e34
--- a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
@@@ -85,11 -96,7 +95,11 @@@ public class WriteAfterCloseIT extends 
  
        // the purpose of this constraint is to just randomly hold up inserts 
on the server side
        if (rand.nextBoolean()) {
 -        UtilWaitThread.sleep(SLEEP_TIME);
 +        try {
-           Thread.sleep(4000);
++          Thread.sleep(SLEEP_TIME);
 +        } catch (InterruptedException ex) {
 +          throw new IllegalStateException("Interrupted during sleep", ex);
 +        }
        }
  
        return null;

Reply via email to