This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 751c549108 updates WriteAfterCloseIT to also use ConditionalWriter
(#4662)
751c549108 is described below
commit 751c549108aff51cf55d93861cc254b743d54e5a
Author: Keith Turner <[email protected]>
AuthorDate: Tue Jun 11 17:08:15 2024 -0400
updates WriteAfterCloseIT to also use ConditionalWriter (#4662)
Updated WriteAfterCloseIT to use a conditional writer in addition to
a batch writer. Made a few improvements to the test.
While reviewing how the conditional writer handles timed out writes
noticed the tablet server code had an incorrect try/finally block.
There was code between session reservation and the beginning of
the try block. If this inbetween code threw an exception, then it
would never unreserve the session. Fixed this and added a few
comments to the code.
fixes #3742
Co-authored-by: Dom G. <[email protected]>
---
.../accumulo/core/client/ConditionalWriter.java | 2 +-
.../core/clientImpl/ConditionalWriterImpl.java | 11 ++-
.../accumulo/tserver/TabletClientHandler.java | 51 +++++++-----
test/pom.xml | 4 +
.../apache/accumulo/test/WriteAfterCloseIT.java | 95 ++++++++++++++--------
5 files changed, 105 insertions(+), 58 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 9f5fab7fe1..79968fe567 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -107,7 +107,7 @@ public interface ConditionalWriter extends AutoCloseable {
*/
VIOLATED,
/**
- * error occurred after mutation was sent to server, its unknown if the
mutation was written.
+ * Error occurred after mutation was sent to server, its unknown if the
mutation was written.
* Although the status of the mutation is unknown, Accumulo guarantees the
mutation will not be
* written at a later point in time.
*/
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
index e9802300a3..3beedd49bf 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java
@@ -87,9 +87,13 @@ import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class ConditionalWriterImpl implements ConditionalWriter {
+ private static final Logger log =
LoggerFactory.getLogger(ConditionalWriterImpl.class);
+
private static final int MAX_SLEEP = 30000;
private Authorizations auths;
@@ -683,19 +687,24 @@ class ConditionalWriterImpl implements ConditionalWriter {
// invalidation prevents future attempts to contact the
// tserver even its gone zombie and is still running w/o a lock
locator.invalidateCache(context, location.toString());
+ log.trace("tablet server {} {} is dead, so no need to invalidate {}",
location,
+ sessionId.lockId, sessionId.sessionID);
return;
}
try {
// if the mutation is currently processing, this method will block
until its done or times
// out
+ log.trace("Attempting to invalidate {} at {}", sessionId.sessionID,
location);
invalidateSession(sessionId.sessionID, location);
-
+ log.trace("Invalidated {} at {}", sessionId.sessionID, location);
return;
} catch (TApplicationException tae) {
throw new AccumuloServerException(location.toString(), tae);
} catch (TException e) {
locator.invalidateCache(context, location.toString());
+ log.trace("Failed to invalidate {} at {} {}", sessionId.sessionID,
location,
+ e.getMessage());
}
if ((System.currentTimeMillis() - startTime) + sleepTime > timeout) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index acd0a2f4c3..424b399140 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -923,27 +923,29 @@ public class TabletClientHandler implements
TabletClientService.Iface {
Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String>
symbols)
throws NoSuchScanIDException, TException {
- ConditionalSession cs = (ConditionalSession)
server.sessionManager.reserveSession(sessID);
+ ConditionalSession cs = null;
+ Long opid = null;
- if (cs == null || cs.interruptFlag.get()) {
- throw new NoSuchScanIDException();
- }
-
- if (!cs.tableId.equals(MetadataTable.ID) &&
!cs.tableId.equals(RootTable.ID)) {
- try {
- server.resourceManager.waitUntilCommitsAreEnabled();
- } catch (HoldTimeoutException hte) {
- // Assumption is that the client has timed out and is gone. If that's
not the case throw
- // an exception that will cause it to retry.
- log.debug("HoldTimeoutException during conditionalUpdate, reporting no
such session");
+ try {
+ cs = (ConditionalSession) server.sessionManager.reserveSession(sessID);
+ if (cs == null || cs.interruptFlag.get()) {
throw new NoSuchScanIDException();
}
- }
- TableId tid = cs.tableId;
- long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid,
null, null)));
+ if (!cs.tableId.equals(MetadataTable.ID) &&
!cs.tableId.equals(RootTable.ID)) {
+ try {
+ server.resourceManager.waitUntilCommitsAreEnabled();
+ } catch (HoldTimeoutException hte) {
+ // Assumption is that the client has timed out and is gone. If
that's not the case throw
+ // an exception that will cause it to retry.
+ log.debug("HoldTimeoutException during conditionalUpdate, reporting
no such session");
+ throw new NoSuchScanIDException();
+ }
+ }
+
+ TableId tid = cs.tableId;
+ opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null,
null)));
- try {
// @formatter:off
Map<KeyExtent, List<ServerConditionalMutation>> updates =
mutations.entrySet().stream().collect(Collectors.toMap(
entry -> KeyExtent.fromThrift(entry.getKey()),
@@ -972,21 +974,30 @@ public class TabletClientHandler implements
TabletClientService.Iface {
log.warn("Exception returned for conditionalUpdate {}", e);
throw e;
} finally {
- writeTracker.finishWrite(opid);
- server.sessionManager.unreserveSession(sessID);
+ if (opid != null) {
+ writeTracker.finishWrite(opid);
+ }
+ if (cs != null) {
+ server.sessionManager.unreserveSession(sessID);
+ }
}
}
@Override
public void invalidateConditionalUpdate(TInfo tinfo, long sessID) {
- // this method should wait for any running conditional update to complete
- // after this method returns a conditional update should not be able to
start
+ // For the given session, this method should wait for any running
conditional update to
+ // complete. After this method returns a conditional update should not be
able to start against
+ // this session and nothing should be running.
ConditionalSession cs = (ConditionalSession)
server.sessionManager.getSession(sessID);
if (cs != null) {
+ // Setting this may cause anything running to fail. Setting this will
prevent anything from
+ // starting.
cs.interruptFlag.set(true);
}
+ // If a thread is currently running and working on the update, then this
should block until it
+ // un-reserves the session.
cs = (ConditionalSession) server.sessionManager.reserveSession(sessID,
true);
if (cs != null) {
server.sessionManager.removeSession(sessID, true);
diff --git a/test/pom.xml b/test/pom.xml
index 07fb24f1ae..cd8181e2f8 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -206,6 +206,10 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
index 7c5324c80b..9193b5c24a 100644
--- a/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/WriteAfterCloseIT.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.security.SecureRandom;
@@ -37,11 +38,16 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.constraints.Constraint;
@@ -51,7 +57,8 @@ import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
public class WriteAfterCloseIT extends AccumuloClusterHarness {
@@ -59,6 +66,7 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness
{
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "1s");
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s");
+ cfg.setProperty(Property.TSERV_MINTHREADS, "256");
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@@ -71,6 +79,8 @@ public class WriteAfterCloseIT extends AccumuloClusterHarness
{
private static final SecureRandom rand = new SecureRandom();
+ private static final long SLEEP_TIME = 4000;
+
@Override
public String getViolationDescription(short violationCode) {
return "No such violation";
@@ -86,36 +96,29 @@ public class WriteAfterCloseIT extends
AccumuloClusterHarness {
// the purpose of this constraint is to just randomly hold up inserts on
the server side
if (rand.nextBoolean()) {
- UtilWaitThread.sleep(4000);
+ UtilWaitThread.sleep(SLEEP_TIME);
}
return null;
}
}
- @Test
- public void testWriteAfterCloseMillisTime() throws Exception {
- runTest(TimeType.MILLIS, false, 0, false);
- }
-
- @Test
- public void testWriteAfterCloseLogicalTime() throws Exception {
- runTest(TimeType.LOGICAL, false, 0, false);
- }
-
- @Test
- public void testWriteAfterCloseKillTservers() throws Exception {
- runTest(TimeType.MILLIS, true, 0, false);
- }
-
- @Test
- public void testWriteAfterCloseTimeout() throws Exception {
- // ensure that trying to close seesions does not interfere with timeout
- runTest(TimeType.MILLIS, false, 2000, true);
- }
-
- private void runTest(TimeType timeType, boolean killTservers, long timeout,
boolean expectErrors)
- throws Exception {
+ // @formatter:off
+ @ParameterizedTest
+ @CsvSource(
+ value = {"time, kill, timeout, conditional",
+ "MILLIS, false, 0, false",
+ "LOGICAL, false, 0, false",
+ "MILLIS, true, 0, false",
+ "MILLIS, false, 2000, false",
+ "MILLIS, false, 0, true",
+ "LOGICAL, false, 0, true",
+ "MILLIS, true, 0, true",
+ "MILLIS, false, 2000, true"},
+ useHeadersInDisplayName = true)
+ // @formatter:on
+ public void testWriteAfterClose(TimeType timeType, boolean killTservers,
long timeout,
+ boolean useConditionalWriter) throws Exception {
// re #3721 test that tries to cause a write event to happen after a batch
writer is closed
String table = getUniqueNames(1)[0];
var props = new Properties();
@@ -138,7 +141,8 @@ public class WriteAfterCloseIT extends
AccumuloClusterHarness {
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
- futures.add(executor.submit(createWriteTask(i * 1000, c, table,
timeout)));
+ futures.add(
+ executor.submit(createWriteTask(i * 1000, c, table, timeout,
useConditionalWriter)));
}
if (killTservers) {
@@ -156,15 +160,23 @@ public class WriteAfterCloseIT extends
AccumuloClusterHarness {
try {
future.get();
} catch (ExecutionException e) {
+ var cause = e.getCause();
+ while (cause != null && !(cause instanceof TimedOutException)) {
+ cause = cause.getCause();
+ }
+
+ assertNotNull(cause);
errorCount++;
}
}
+ boolean expectErrors = timeout > 0;
if (expectErrors) {
assertTrue(errorCount > 0);
} else {
assertEquals(0, errorCount);
-
+ // allow potential out-of-order writes on a tserver to run
+ Thread.sleep(SleepyConstraint.SLEEP_TIME);
try (Scanner scanner = c.createScanner(table)) {
// every insertion was deleted so table should be empty unless there
were out of order
// writes
@@ -177,17 +189,28 @@ public class WriteAfterCloseIT extends
AccumuloClusterHarness {
}
private static Callable<Void> createWriteTask(int row, AccumuloClient c,
String table,
- long timeout) {
+ long timeout, boolean useConditionalWriter) {
return () -> {
-
- BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout,
TimeUnit.MILLISECONDS);
-
- try (BatchWriter writer = c.createBatchWriter(table, bwc)) {
- Mutation m = new Mutation("r" + row);
- m.put("f1", "q1", new Value("v1"));
- writer.addMutation(m);
+ if (useConditionalWriter) {
+ ConditionalWriterConfig cwc =
+ new ConditionalWriterConfig().setTimeout(timeout,
TimeUnit.MILLISECONDS);
+ try (ConditionalWriter writer = c.createConditionalWriter(table, cwc))
{
+ ConditionalMutation m = new ConditionalMutation("r" + row);
+ m.addCondition(new Condition("f1", "q1"));
+ m.put("f1", "q1", new Value("v1"));
+ ConditionalWriter.Result result = writer.write(m);
+ var status = result.getStatus();
+ assertTrue(status == ConditionalWriter.Status.ACCEPTED
+ || status == ConditionalWriter.Status.UNKNOWN);
+ }
+ } else {
+ BatchWriterConfig bwc = new BatchWriterConfig().setTimeout(timeout,
TimeUnit.MILLISECONDS);
+ try (BatchWriter writer = c.createBatchWriter(table, bwc)) {
+ Mutation m = new Mutation("r" + row);
+ m.put("f1", "q1", new Value("v1"));
+ writer.addMutation(m);
+ }
}
-
// Relying on the internal retries of the batch writer, trying to create
a situation where
// some of the writes from above actually happen after the delete below
which would negate the
// delete.