This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 751c549108 updates WriteAfterCloseIT to also use ConditionalWriter (#4662) 751c549108 is described below commit 751c549108aff51cf55d93861cc254b743d54e5a Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Jun 11 17:08:15 2024 -0400 updates WriteAfterCloseIT to also use ConditionalWriter (#4662) Updated WriteAfterCloseIT to use a conditional writer in addition to a batch writer. Made a few improvements to the test. While reviewing how the conditional writer handles timed out writes noticed the tablet server code had an incorrect try/finally block. There was code between session reservation and the beginning of the try block. If this inbetween code threw an exception, then it would never unreserve the session. Fixed this and added a few comments to the code. fixes #3742 Co-authored-by: Dom G. <domgargu...@apache.org> --- .../accumulo/core/client/ConditionalWriter.java | 2 +- .../core/clientImpl/ConditionalWriterImpl.java | 11 ++- .../accumulo/tserver/TabletClientHandler.java | 51 +++++++----- test/pom.xml | 4 + .../apache/accumulo/test/WriteAfterCloseIT.java | 95 ++++++++++++++-------- 5 files changed, 105 insertions(+), 58 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java index 9f5fab7fe1..79968fe567 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java @@ -107,7 +107,7 @@ public interface ConditionalWriter extends AutoCloseable { */ VIOLATED, /** - * error occurred after mutation was sent to server, its unknown if the mutation was written. + * Error occurred after mutation was sent to server, its unknown if the mutation was written. * Although the status of the mutation is unknown, Accumulo guarantees the mutation will not be * written at a later point in time. */ diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index e9802300a3..3beedd49bf 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -87,9 +87,13 @@ import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class ConditionalWriterImpl implements ConditionalWriter { + private static final Logger log = LoggerFactory.getLogger(ConditionalWriterImpl.class); + private static final int MAX_SLEEP = 30000; private Authorizations auths; @@ -683,19 +687,24 @@ class ConditionalWriterImpl implements ConditionalWriter { // invalidation prevents future attempts to contact the // tserver even its gone zombie and is still running w/o a lock locator.invalidateCache(context, location.toString()); + log.trace("tablet server {} {} is dead, so no need to invalidate {}", location, + sessionId.lockId, sessionId.sessionID); return; } try { // if the mutation is currently processing, this method will block until its done or times // out + log.trace("Attempting to invalidate {} at {}", sessionId.sessionID, location); invalidateSession(sessionId.sessionID, location); - + log.trace("Invalidated {} at {}", sessionId.sessionID, location); return; } catch (TApplicationException tae) { throw new AccumuloServerException(location.toString(), tae); } catch (TException e) { locator.invalidateCache(context, location.toString()); + log.trace("Failed to invalidate {} at {} {}", sessionId.sessionID, location, + e.getMessage()); } if ((System.currentTimeMillis() - startTime) + sleepTime > timeout) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index acd0a2f4c3..424b399140 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -923,27 +923,29 @@ public class TabletClientHandler implements TabletClientService.Iface { Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols) throws NoSuchScanIDException, TException { - ConditionalSession cs = (ConditionalSession) server.sessionManager.reserveSession(sessID); + ConditionalSession cs = null; + Long opid = null; - if (cs == null || cs.interruptFlag.get()) { - throw new NoSuchScanIDException(); - } - - if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) { - try { - server.resourceManager.waitUntilCommitsAreEnabled(); - } catch (HoldTimeoutException hte) { - // Assumption is that the client has timed out and is gone. If that's not the case throw - // an exception that will cause it to retry. - log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session"); + try { + cs = (ConditionalSession) server.sessionManager.reserveSession(sessID); + if (cs == null || cs.interruptFlag.get()) { throw new NoSuchScanIDException(); } - } - TableId tid = cs.tableId; - long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null))); + if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) { + try { + server.resourceManager.waitUntilCommitsAreEnabled(); + } catch (HoldTimeoutException hte) { + // Assumption is that the client has timed out and is gone. If that's not the case throw + // an exception that will cause it to retry. + log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session"); + throw new NoSuchScanIDException(); + } + } + + TableId tid = cs.tableId; + opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null))); - try { // @formatter:off Map<KeyExtent, List<ServerConditionalMutation>> updates = mutations.entrySet().stream().collect(Collectors.toMap( entry -> KeyExtent.fromThrift(entry.getKey()), @@ -972,21 +974,30 @@ public class TabletClientHandler implements TabletClientService.Iface { log.warn("Exception returned for conditionalUpdate {}", e); throw e; } finally { - writeTracker.finishWrite(opid); - server.sessionManager.unreserveSession(sessID); + if (opid != null) { + writeTracker.finishWrite(opid); + } + if (cs != null) { + server.sessionManager.unreserveSession(sessID); + } } } @Override public void invalidateConditionalUpdate(TInfo tinfo, long sessID) { - // this method should wait for any running conditional update to complete - // after this method returns a conditional update should not be able to start + // For the given session, this method should wait for any running conditional update to + // complete. After this method returns a conditional update should not be able to start against + // this session and nothing should be running. ConditionalSession cs = (ConditionalSession) server.sessionManager.getSession(sessID); if (cs != null) { + // Setting this may cause anything running to fail. Setting this will prevent anything from + // starting. cs.interruptFlag.set(true); } + // If a thread is currently running and working on the update, then this should block until it + // un-reserves the session. cs = (ConditionalSession) server.sessionManager.reserveSession(sessID, true); if (cs != null) { server.sessionManager.removeSession(sessID, true); diff --git a/test/pom.xml b/test/pom.xml index 07fb24f1ae..cd8181e2f8 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -206,6 +206,10 @@ <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> diff --git a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java index 7c5324c80b..9193b5c24a 100644 --- a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java +++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.security.SecureRandom; @@ -37,11 +38,16 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.constraints.Constraint; @@ -51,7 +57,8 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; public class WriteAfterCloseIT extends AccumuloClusterHarness { @@ -59,6 +66,7 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness { public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "1s"); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); + cfg.setProperty(Property.TSERV_MINTHREADS, "256"); hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } @@ -71,6 +79,8 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness { private static final SecureRandom rand = new SecureRandom(); + private static final long SLEEP_TIME = 4000; + @Override public String getViolationDescription(short violationCode) { return "No such violation"; @@ -86,36 +96,29 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness { // the purpose of this constraint is to just randomly hold up inserts on the server side if (rand.nextBoolean()) { - UtilWaitThread.sleep(4000); + UtilWaitThread.sleep(SLEEP_TIME); } return null; } } - @Test - public void testWriteAfterCloseMillisTime() throws Exception { - runTest(TimeType.MILLIS, false, 0, false); - } - - @Test - public void testWriteAfterCloseLogicalTime() throws Exception { - runTest(TimeType.LOGICAL, false, 0, false); - } - - @Test - public void testWriteAfterCloseKillTservers() throws Exception { - runTest(TimeType.MILLIS, true, 0, false); - } - - @Test - public void testWriteAfterCloseTimeout() throws Exception { - // ensure that trying to close seesions does not interfere with timeout - runTest(TimeType.MILLIS, false, 2000, true); - } - - private void runTest(TimeType timeType, boolean killTservers, long timeout, boolean expectErrors) - throws Exception { + // @formatter:off + @ParameterizedTest + @CsvSource( + value = {"time, kill, timeout, conditional", + "MILLIS, false, 0, false", + "LOGICAL, false, 0, false", + "MILLIS, true, 0, false", + "MILLIS, false, 2000, false", + "MILLIS, false, 0, true", + "LOGICAL, false, 0, true", + "MILLIS, true, 0, true", + "MILLIS, false, 2000, true"}, + useHeadersInDisplayName = true) + // @formatter:on + public void testWriteAfterClose(TimeType timeType, boolean killTservers, long timeout, + boolean useConditionalWriter) throws Exception { // re #3721 test that tries to cause a write event to happen after a batch writer is closed String table = getUniqueNames(1)[0]; var props = new Properties(); @@ -138,7 +141,8 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness { List<Future<?>> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { - futures.add(executor.submit(createWriteTask(i * 1000, c, table, timeout))); + futures.add( + executor.submit(createWriteTask(i * 1000, c, table, timeout, useConditionalWriter))); } if (killTservers) { @@ -156,15 +160,23 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness { try { future.get(); } catch (ExecutionException e) { + var cause = e.getCause(); + while (cause != null && !(cause instanceof TimedOutException)) { + cause = cause.getCause(); + } + + assertNotNull(cause); errorCount++; } } + boolean expectErrors = timeout > 0; if (expectErrors) { assertTrue(errorCount > 0); } else { assertEquals(0, errorCount); - + // allow potential out-of-order writes on a tserver to run + Thread.sleep(SleepyConstraint.SLEEP_TIME); try (Scanner scanner = c.createScanner(table)) { // every insertion was deleted so table should be empty unless there were out of order // writes @@ -177,17 +189,28 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness { } private static Callable<Void> createWriteTask(int row, AccumuloClient c, String table, - long timeout) { + long timeout, boolean useConditionalWriter) { return () -> { - - BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout, TimeUnit.MILLISECONDS); - - try (BatchWriter writer = c.createBatchWriter(table, bwc)) { - Mutation m = new Mutation("r" + row); - m.put("f1", "q1", new Value("v1")); - writer.addMutation(m); + if (useConditionalWriter) { + ConditionalWriterConfig cwc = + new ConditionalWriterConfig().setTimeout(timeout, TimeUnit.MILLISECONDS); + try (ConditionalWriter writer = c.createConditionalWriter(table, cwc)) { + ConditionalMutation m = new ConditionalMutation("r" + row); + m.addCondition(new Condition("f1", "q1")); + m.put("f1", "q1", new Value("v1")); + ConditionalWriter.Result result = writer.write(m); + var status = result.getStatus(); + assertTrue(status == ConditionalWriter.Status.ACCEPTED + || status == ConditionalWriter.Status.UNKNOWN); + } + } else { + BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout, TimeUnit.MILLISECONDS); + try (BatchWriter writer = c.createBatchWriter(table, bwc)) { + Mutation m = new Mutation("r" + row); + m.put("f1", "q1", new Value("v1")); + writer.addMutation(m); + } } - // Relying on the internal retries of the batch writer, trying to create a situation where // some of the writes from above actually happen after the delete below which would negate the // delete.