This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit b6954a7a39b3db423b518f383a401145471cb038 Merge: 1e866ff09d 751c549108 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Jun 11 17:25:13 2024 -0400 Merge branch '2.1' .../accumulo/core/client/ConditionalWriter.java | 2 +- .../core/clientImpl/ConditionalWriterImpl.java | 11 ++- .../accumulo/tserver/TabletClientHandler.java | 56 ++++++++----- .../apache/accumulo/test/WriteAfterCloseIT.java | 95 ++++++++++++++-------- .../compaction/ExternalCompactionProgressIT.java | 6 +- 5 files changed, 107 insertions(+), 63 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index 02aeef6bf6,3beedd49bf..3137a19f00 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@@ -86,11 -87,13 +86,15 @@@ import org.apache.thrift.TApplicationEx import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransportException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; +import com.google.common.net.HostAndPort; + class ConditionalWriterImpl implements ConditionalWriter { + private static final Logger log = LoggerFactory.getLogger(ConditionalWriterImpl.class); + private static final int MAX_SLEEP = 30000; private Authorizations auths; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index af16beed4c,424b399140..af51429e09 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@@ -788,28 -923,29 +788,30 @@@ public class TabletClientHandler implem Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws NoSuchScanIDException, TException { - ConditionalSession cs = (ConditionalSession) server.sessionManager.reserveSession(sessID); + ConditionalSession cs = null; + Long opid = null; - if (cs == null || cs.interruptFlag.get()) { - throw new NoSuchScanIDException(); - } - - if (!cs.tableId.equals(AccumuloTable.METADATA.tableId()) - && !cs.tableId.equals(AccumuloTable.ROOT.tableId())) { - try { - server.resourceManager.waitUntilCommitsAreEnabled(); - } catch (HoldTimeoutException hte) { - // Assumption is that the client has timed out and is gone. If that's not the case throw - // an exception that will cause it to retry. - log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session"); + try { + cs = (ConditionalSession) server.sessionManager.reserveSession(sessID); + if (cs == null || cs.interruptFlag.get()) { throw new NoSuchScanIDException(); } - } - TableId tid = cs.tableId; - long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null))); - if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) { ++ if (!cs.tableId.equals(AccumuloTable.METADATA.tableId()) ++ && !cs.tableId.equals(AccumuloTable.ROOT.tableId())) { + try { + server.resourceManager.waitUntilCommitsAreEnabled(); + } catch (HoldTimeoutException hte) { + // Assumption is that the client has timed out and is gone. If that's not the case throw + // an exception that will cause it to retry. + log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session"); + throw new NoSuchScanIDException(); + } + } + + TableId tid = cs.tableId; + opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null))); - try { // @formatter:off Map<KeyExtent, List<ServerConditionalMutation>> updates = mutations.entrySet().stream().collect(Collectors.toMap( entry -> KeyExtent.fromThrift(entry.getKey()), @@@ -835,11 -971,15 +837,16 @@@ } catch (IOException ioe) { throw new TException(ioe); } catch (Exception e) { - log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", tid, opid, e); - log.warn("Exception returned for conditionalUpdate {}", e); ++ log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", ++ cs == null ? null : cs.tableId, opid, e); throw e; } finally { - writeTracker.finishWrite(opid); - server.sessionManager.unreserveSession(sessID); + if (opid != null) { + writeTracker.finishWrite(opid); + } + if (cs != null) { + server.sessionManager.unreserveSession(sessID); + } } } diff --cc test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java index 34aa1de519,9193b5c24a..6892a35e34 --- a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java +++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java @@@ -85,11 -96,7 +95,11 @@@ public class WriteAfterCloseIT extends // the purpose of this constraint is to just randomly hold up inserts on the server side if (rand.nextBoolean()) { - UtilWaitThread.sleep(SLEEP_TIME); + try { - Thread.sleep(4000); ++ Thread.sleep(SLEEP_TIME); + } catch (InterruptedException ex) { + throw new IllegalStateException("Interrupted during sleep", ex); + } } return null;