This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a6ecfe0f3 Fixes bug with FateInterleavingIT.testInterleaving (#5214)
4a6ecfe0f3 is described below

commit 4a6ecfe0f34463039211311a2fa9872c65605ab9
Author: Kevin Rathbun <krath...@apache.org>
AuthorDate: Mon Jan 27 10:06:35 2025 -0500

    Fixes bug with FateInterleavingIT.testInterleaving (#5214)
    
    - Rewrote the test. Seemed to expect an interleave to occur at every 
opportunity when this may not always occur in 4.0. Now, just expect at least 
one interleave to occur. This still accomplishes the same goal of the test: 
test a fate thread interleaving work on multiple fate ids and ensure the order 
of operations for any given fate id is as expected even with interleaving.
    - Renamed FateInterleavingIT/UserFateInterleavingIT/MetaFateInterleavingIT 
to
    FateExecutionOrderIT to better indicate what is being tested.
---
 ...terleavingIT.java => FateExecutionOrderIT.java} | 157 ++++++++++++---------
 ...eavingIT.java => MetaFateExecutionOrderIT.java} |   6 +-
 ...eavingIT.java => UserFateExecutionOrderIT.java} |   6 +-
 3 files changed, 96 insertions(+), 73 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
similarity index 66%
rename from 
test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java
rename to 
test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
index b4bf414682..9c1234efd4 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderIT.java
@@ -26,7 +26,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
 import java.util.AbstractMap;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
@@ -54,7 +57,6 @@ import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -63,13 +65,13 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Iterators;
 
-public abstract class FateInterleavingIT extends SharedMiniClusterBase
-    implements FateTestRunner<FateInterleavingIT.FilTestEnv> {
+public abstract class FateExecutionOrderIT extends SharedMiniClusterBase
+    implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> {
 
-  public static class FilTestEnv extends TestEnv {
+  public static class FeoTestEnv extends TestEnv {
     private final AccumuloClient client;
 
-    public FilTestEnv(AccumuloClient client) {
+    public FeoTestEnv(AccumuloClient client) {
       this.client = client;
     }
 
@@ -78,11 +80,11 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     }
   }
 
-  public static class FirstOp implements Repo<FateInterleavingIT.FilTestEnv> {
+  public static class FirstOp implements Repo<FateExecutionOrderIT.FeoTestEnv> 
{
 
     private static final long serialVersionUID = 1L;
 
-    protected boolean isTrackingDataSet(FateId tid, FilTestEnv env, String 
step) throws Exception {
+    protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String 
step) throws Exception {
       try (Scanner scanner = 
env.getClient().createScanner(FATE_TRACKING_TABLE)) {
         return scanner.stream()
             .anyMatch(e -> 
e.getKey().getColumnFamily().toString().equals(tid.canonical())
@@ -90,7 +92,7 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
       }
     }
 
-    protected static void insertTrackingData(FateId tid, FilTestEnv env, 
String step)
+    protected static void insertTrackingData(FateId tid, FeoTestEnv env, 
String step)
         throws TableNotFoundException, MutationsRejectedException {
       try (BatchWriter bw = 
env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) {
         Mutation mut = new Mutation(Long.toString(System.currentTimeMillis()));
@@ -100,13 +102,18 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     }
 
     @Override
-    public long isReady(FateId tid, FilTestEnv env) throws Exception {
+    public long isReady(FateId tid, FeoTestEnv env) throws Exception {
+      // First call to isReady will return that it's not ready (defer time of 
100ms), inserting
+      // the data 'isReady1' so we know isReady was called once. The second 
attempt (after the
+      // deferral time) will pass as ready (return 0) and insert the data 
'isReady2' so we know
+      // the second call to isReady was made
       Thread.sleep(50);
       var step = this.getName() + "::isReady";
-      if (isTrackingDataSet(tid, env, step)) {
+      if (isTrackingDataSet(tid, env, step + "1")) {
+        insertTrackingData(tid, env, step + "2");
         return 0;
       } else {
-        insertTrackingData(tid, env, step);
+        insertTrackingData(tid, env, step + "1");
         return 100;
       }
     }
@@ -117,14 +124,14 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     }
 
     @Override
-    public Repo<FilTestEnv> call(FateId tid, FilTestEnv env) throws Exception {
+    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception {
       Thread.sleep(50);
       insertTrackingData(tid, env, this.getName() + "::call");
       return new SecondOp();
     }
 
     @Override
-    public void undo(FateId fateId, FilTestEnv environment) throws Exception {
+    public void undo(FateId fateId, FeoTestEnv environment) throws Exception {
       throw new UnsupportedOperationException();
     }
 
@@ -138,7 +145,7 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     private static final long serialVersionUID = 1L;
 
     @Override
-    public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws 
Exception {
+    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
       super.call(tid, environment);
       return new LastOp();
     }
@@ -149,7 +156,7 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     private static final long serialVersionUID = 1L;
 
     @Override
-    public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws 
Exception {
+    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
       super.call(tid, environment);
       return null;
     }
@@ -179,22 +186,22 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     }
   }
 
-  private void waitFor(FateStore<FilTestEnv> store, FateId txid) throws 
Exception {
+  private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws 
Exception {
     while (store.read(txid).getStatus() != SUCCESSFUL) {
       Thread.sleep(50);
     }
   }
 
-  protected Fate<FilTestEnv> initializeFate(AccumuloClient client, 
FateStore<FilTestEnv> store) {
+  protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, 
FateStore<FeoTestEnv> store) {
     ConfigurationCopy config = new ConfigurationCopy();
     config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
     config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-    return new Fate<>(new FilTestEnv(client), store, false, r -> r + "", 
config);
+    return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", 
config);
   }
 
-  private static Entry<String,String> toIdStep(Entry<Key,Value> e) {
-    return new 
AbstractMap.SimpleImmutableEntry<>(e.getKey().getColumnFamily().toString(),
-        e.getValue().toString());
+  private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
+    return new AbstractMap.SimpleImmutableEntry<>(
+        FateId.from(e.getKey().getColumnFamily().toString()), 
e.getValue().toString());
   }
 
   @Test
@@ -202,15 +209,19 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     executeTest(this::testInterleaving);
   }
 
-  protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext 
sctx)
+  protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext 
sctx)
       throws Exception {
 
-    // This test verifies that fates will interleave in time when their 
isReady() returns >0 and
-    // then 0.
+    // This test verifies that FATE will interleave at least once between fate 
operations when
+    // their isReady() returns > 0. Interleaving is not guaranteed, so we just 
check for one
+    // occurrence which is highly unlikely to fail unless something is broken 
with FATE.
+    // This test also ensures that the expected order of operations occurs per 
fate op.
+    // Interleaving should have no effect on this.
 
-    FateId[] fateIds = new FateId[3];
+    final int numFateIds = 3;
+    FateId[] fateIds = new FateId[numFateIds];
 
-    for (int i = 0; i < 3; i++) {
+    for (int i = 0; i < numFateIds; i++) {
       fateIds[i] = store.create();
       var txStore = store.reserve(fateIds[i]);
       try {
@@ -222,7 +233,7 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
       }
     }
 
-    Fate<FilTestEnv> fate = null;
+    Fate<FeoTestEnv> fate = null;
 
     // The execution order of the transactions is not according to their 
insertion
     // order. However, we do know that the first step of each transaction will 
be
@@ -235,38 +246,48 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
         waitFor(store, fateId);
       }
 
-      var expectedIds =
-          Set.of(fateIds[0].canonical(), fateIds[1].canonical(), 
fateIds[2].canonical());
-
       Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
-      Iterator<Entry<String,String>> iter = 
scanner.stream().map(FateInterleavingIT::toIdStep)
-          .filter(e -> e.getValue().contains("::call")).iterator();
-
-      SortedMap<String,String> subset = new TreeMap<>();
-
-      Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), 
e.getValue()));
-
-      // Should see the call() for the first steps of all three fates come 
first in time
-      assertTrue(subset.values().stream().allMatch(v -> 
v.startsWith("FirstOp")));
-      assertEquals(expectedIds, subset.keySet());
-
-      subset.clear();
-
-      Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), 
e.getValue()));
-
-      // Should see the call() for the second steps of all three fates come 
second in time
-      assertTrue(subset.values().stream().allMatch(v -> 
v.startsWith("SecondOp")));
-      assertEquals(expectedIds, subset.keySet());
-
-      subset.clear();
-
-      Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), 
e.getValue()));
-
-      // Should see the call() for the last steps of all three fates come last 
in time
-      assertTrue(subset.values().stream().allMatch(v -> 
v.startsWith("LastOp")));
-      assertEquals(expectedIds, subset.keySet());
+      var iter = 
scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator();
+
+      // we should see the following execution order for all fate ids:
+      // FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
+      // SecondOp::isReady1, SecondOp::isReady2, SecondOp::call,
+      // LastOp::isReady1, LastOp::isReady2, LastOp::call
+      // the first isReady of each op will defer the op to be executed later, 
allowing for the FATE
+      // thread to interleave and work on another fate id, but may not always 
interleave.
+      // It is unlikely that the FATE will not interleave at least once in a 
run, so we will check
+      // for at least one occurrence.
+      int interleaves = 0;
+      int i = 0;
+      Map.Entry<FateId,String> prevOp = null;
+      var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2", 
"FirstOp::call",
+          "SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call", 
"LastOp::isReady1",
+          "LastOp::isReady2", "LastOp::call");
+      var fateIdsToExpRunOrder = Map.of(fateIds[0], new 
ArrayList<>(expRunOrder), fateIds[1],
+          new ArrayList<>(expRunOrder), fateIds[2], new 
ArrayList<>(expRunOrder));
+
+      while (iter.hasNext()) {
+        var currOp = iter.next();
+        FateId fateId = currOp.getKey();
+        String currStep = currOp.getValue();
+        var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId);
+
+        boolean passedFirstStep = !currStep.equals(expRunOrder.get(0));
+        boolean prevFateIdDiffered = prevOp != null && 
!prevOp.getKey().equals(fateId);
+        if (passedFirstStep && prevFateIdDiffered) {
+          interleaves++;
+        }
+        assertEquals(currStep, expRunOrderFateId.remove(0));
+        prevOp = currOp;
+        i++;
+      }
 
-      assertFalse(iter.hasNext());
+      assertTrue(interleaves > 0);
+      assertEquals(i, expRunOrder.size() * numFateIds);
+      assertEquals(numFateIds, fateIdsToExpRunOrder.size());
+      for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) {
+        assertTrue(expRunOrderFateId.isEmpty());
+      }
 
     } finally {
       if (fate != null) {
@@ -280,14 +301,14 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     private static final long serialVersionUID = 1L;
 
     @Override
-    public long isReady(FateId tid, FilTestEnv env) throws Exception {
+    public long isReady(FateId tid, FeoTestEnv env) throws Exception {
       Thread.sleep(50);
       insertTrackingData(tid, env, this.getName() + "::isReady");
       return 0;
     }
 
     @Override
-    public Repo<FilTestEnv> call(FateId tid, FilTestEnv manager) throws 
Exception {
+    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws 
Exception {
       Thread.sleep(50);
       insertTrackingData(tid, manager, this.getName() + "::call");
       return new SecondNonInterleavingOp();
@@ -299,7 +320,7 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     private static final long serialVersionUID = 1L;
 
     @Override
-    public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws 
Exception {
+    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
       super.call(tid, environment);
       return new LastNonInterleavingOp();
     }
@@ -311,7 +332,7 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     private static final long serialVersionUID = 1L;
 
     @Override
-    public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws 
Exception {
+    public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws 
Exception {
       super.call(tid, environment);
       return null;
     }
@@ -323,15 +344,16 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     executeTest(this::testNonInterleaving);
   }
 
-  protected void testNonInterleaving(FateStore<FilTestEnv> store, 
ServerContext sctx)
+  protected void testNonInterleaving(FateStore<FeoTestEnv> store, 
ServerContext sctx)
       throws Exception {
 
     // This test ensures that when isReady() always returns zero that all the 
fate steps will
     // execute immediately
 
-    FateId[] fateIds = new FateId[3];
+    final int numFateIds = 3;
+    FateId[] fateIds = new FateId[numFateIds];
 
-    for (int i = 0; i < 3; i++) {
+    for (int i = 0; i < numFateIds; i++) {
       fateIds[i] = store.create();
       var txStore = store.reserve(fateIds[i]);
       try {
@@ -343,7 +365,7 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
       }
     }
 
-    Fate<FilTestEnv> fate = null;
+    Fate<FeoTestEnv> fate = null;
 
     // The execution order of the transactions is not according to their 
insertion
     // order. In this case, without interleaving, a transaction will run start 
to finish
@@ -386,10 +408,11 @@ public abstract class FateInterleavingIT extends 
SharedMiniClusterBase
     Text fateId = subset.keySet().iterator().next().getColumnFamily();
     assertTrue(subset.keySet().stream().allMatch(k -> 
k.getColumnFamily().equals(fateId)));
 
-    var expectedVals = Set.of("FirstNonInterleavingOp::isReady", 
"FirstNonInterleavingOp::call",
+    // list is used to ensure correct operations and correct order of 
operations
+    var expectedVals = List.of("FirstNonInterleavingOp::isReady", 
"FirstNonInterleavingOp::call",
         "SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call",
         "LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call");
-    var actualVals = 
subset.values().stream().map(Value::toString).collect(Collectors.toSet());
+    var actualVals = 
subset.values().stream().map(Value::toString).collect(Collectors.toList());
     assertEquals(expectedVals, actualVals);
 
     return FateId.from(fateId.toString());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java
similarity index 90%
rename from 
test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
rename to 
test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java
index e24fc0af5b..99d14656ec 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT.java
@@ -28,16 +28,16 @@ import org.apache.accumulo.core.fate.AbstractFateStore;
 import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.test.fate.FateInterleavingIT;
+import org.apache.accumulo.test.fate.FateExecutionOrderIT;
 
-public class MetaFateInterleavingIT extends FateInterleavingIT {
+public class MetaFateExecutionOrderIT extends FateExecutionOrderIT {
 
   // put the fate data for the test in a different location than what accumulo 
is using
   private static final InstanceId IID = InstanceId.of(UUID.randomUUID());
   private static final String ZK_ROOT = ZooUtil.getRoot(IID);
 
   @Override
-  public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int 
maxDeferred,
+  public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int 
maxDeferred,
       AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
     ServerContext sctx = getCluster().getServerContext();
     String path = ZK_ROOT + Constants.ZFATE;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java
similarity index 90%
rename from 
test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
rename to 
test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java
index 3a0aaeecd5..ec16596df6 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT.java
@@ -25,11 +25,11 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.fate.AbstractFateStore;
 import org.apache.accumulo.core.fate.user.UserFateStore;
-import org.apache.accumulo.test.fate.FateInterleavingIT;
+import org.apache.accumulo.test.fate.FateExecutionOrderIT;
 
-public class UserFateInterleavingIT extends FateInterleavingIT {
+public class UserFateExecutionOrderIT extends FateExecutionOrderIT {
   @Override
-  public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int 
maxDeferred,
+  public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int 
maxDeferred,
       AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
     var table = getUniqueNames(1)[0];
     try (ClientContext client =

Reply via email to