This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new f046289c1b tests merge idempotence and fixes found bug (#4643)
f046289c1b is described below

commit f046289c1bf9d363ce51a8004510cd452e67167b
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Jun 6 16:23:01 2024 -0400

    tests merge idempotence and fixes found bug (#4643)
    
    Adds a way to make Fate in the manager always runs steps multiple times to 
test
    idempotence. Created versions of merge and delete rows ITs that uses this 
test
    manager. Found and fixed a bug in the merge code when running these new 
test.
    Changed mini accumulo to allow specifying a different manager class to use 
prio
    to startup.  There was an existing method that would allow starting a 
manager
    with a different class, but using it would have meant letting the actuall
    manager start and then killing it and starting another which would have 
added
    time to test.  The change to mini accumulo was not made to its public API, 
only
    its implementation.
    
    fixes #4642
---
 .../java/org/apache/accumulo/core/fate/Fate.java   | 28 ++++++---
 .../MiniAccumuloClusterControl.java                | 33 +----------
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    | 41 ++++++++++++++
 .../java/org/apache/accumulo/manager/Manager.java  |  4 +-
 .../manager/tableOps/merge/MergeTablets.java       | 18 ++++--
 .../org/apache/accumulo/test/fate/FlakyFate.java   | 66 ++++++++++++++++++++++
 .../accumulo/test/fate/FlakyFateManager.java       | 48 ++++++++++++++++
 .../test/functional/DeleteRowsFlakyFateIT.java     | 35 ++++++++++++
 .../accumulo/test/functional/MergeFlakyFateIT.java | 36 ++++++++++++
 9 files changed, 261 insertions(+), 48 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 6a08275003..329e432b9b 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -208,10 +208,7 @@ public class Fate<T> {
     private void execute(final FateTxStore<T> txStore, final ExecutionState 
state)
         throws Exception {
       while (state.op != null && state.deferTime == 0) {
-        var startTime = NanoTime.now();
-        state.deferTime = state.op.isReady(txStore.getID(), environment);
-        log.debug("Running {}.isReady() {} took {} ms and returned {}", 
state.op.getName(),
-            txStore.getID(), startTime.elapsed().toMillis(), state.deferTime);
+        state.deferTime = executeIsReady(txStore.getID(), state.op);
 
         if (state.deferTime == 0) {
           if (state.status == SUBMITTED) {
@@ -220,11 +217,7 @@ public class Fate<T> {
           }
 
           state.prevOp = state.op;
-          startTime = NanoTime.now();
-          state.op = state.op.call(txStore.getID(), environment);
-          log.debug("Running {}.call() {} took {} ms and returned {}", 
state.prevOp.getName(),
-              txStore.getID(), startTime.elapsed().toMillis(),
-              state.op == null ? "null" : state.op.getName());
+          state.op = executeCall(txStore.getID(), state.op);
 
           if (state.op != null) {
             // persist the completion of this step before starting to run the 
next so in the case of
@@ -313,6 +306,23 @@ public class Fate<T> {
 
   }
 
+  protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
+    var startTime = NanoTime.now();
+    var deferTime = op.isReady(fateId, environment);
+    log.debug("Running {}.isReady() {} took {} ms and returned {}", 
op.getName(), fateId,
+        startTime.elapsed().toMillis(), deferTime);
+    return deferTime;
+  }
+
+  protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception {
+    var startTime = NanoTime.now();
+    var next = op.call(fateId, environment);
+    log.debug("Running {}.call() {} took {} ms and returned {}", op.getName(), 
fateId,
+        startTime.elapsed().toMillis(), next == null ? "null" : 
next.getName());
+
+    return next;
+  }
+
   /**
    * Creates a Fault-tolerant executor.
    *
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 1e10be5faa..7c15d4c5ba 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -33,17 +33,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.accumulo.cluster.ClusterControl;
-import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.gc.SimpleGarbageCollector;
-import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
-import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.server.util.Admin;
-import org.apache.accumulo.tserver.ScanServer;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,31 +138,7 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
     if (classOverride != null) {
       classToUse = classOverride;
     } else {
-      switch (server) {
-        case COMPACTOR:
-          classToUse = Compactor.class;
-          break;
-        case SCAN_SERVER:
-          classToUse = ScanServer.class;
-          break;
-        case TABLET_SERVER:
-          classToUse = TabletServer.class;
-          break;
-        case GARBAGE_COLLECTOR:
-          classToUse = SimpleGarbageCollector.class;
-          break;
-        case MANAGER:
-          classToUse = Manager.class;
-          break;
-        case MONITOR:
-          classToUse = Monitor.class;
-          break;
-        case ZOOKEEPER:
-          classToUse = ZooKeeperServerMain.class;
-          break;
-        default:
-          throw new IllegalArgumentException("Unhandled server type: " + 
server);
-      }
+      classToUse = cluster.getConfig().getServerClass(server);
     }
 
     switch (server) {
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 48c51b61c7..36f242f59a 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -18,26 +18,43 @@
  */
 package org.apache.accumulo.miniclusterImpl;
 
+import static org.apache.accumulo.minicluster.ServerType.COMPACTOR;
+import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
+import static org.apache.accumulo.minicluster.ServerType.MANAGER;
+import static org.apache.accumulo.minicluster.ServerType.MONITOR;
+import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.ZOOKEEPER;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.function.Consumer;
 
+import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.HadoopCredentialProvider;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.minicluster.MemoryUnit;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +76,11 @@ public class MiniAccumuloConfigImpl {
   private Map<String,String> configuredSiteConig = new HashMap<>();
   private Map<String,String> clientProps = new HashMap<>();
   private Map<ServerType,Long> memoryConfig = new HashMap<>();
+  private final EnumMap<ServerType,
+      Class<?>> serverTypeClasses = new EnumMap<ServerType,Class<?>>(
+          Map.of(MANAGER, Manager.class, GARBAGE_COLLECTOR, 
SimpleGarbageCollector.class, MONITOR,
+              Monitor.class, ZOOKEEPER, ZooKeeperServerMain.class, 
TABLET_SERVER,
+              TabletServer.class, SCAN_SERVER, ScanServer.class, COMPACTOR, 
Compactor.class));
   private boolean jdwpEnabled = false;
   private Map<String,String> systemProperties = new HashMap<>();
 
@@ -384,6 +406,25 @@ public class MiniAccumuloConfigImpl {
     return this;
   }
 
+  /**
+   * Sets the class that will be used to instantiate this server type.
+   */
+  public MiniAccumuloConfigImpl setServerClass(ServerType type, Class<?> 
serverClass) {
+    serverTypeClasses.put(type, Objects.requireNonNull(serverClass));
+    return this;
+  }
+
+  /**
+   * @return the class to use to instantiate this server type.
+   */
+  public Class<?> getServerClass(ServerType type) {
+    var clazz = serverTypeClasses.get(type);
+    if (clazz == null) {
+      throw new IllegalStateException("Server type " + type + " has no class");
+    }
+    return clazz;
+  }
+
   /**
    * @return a copy of the site config
    */
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 404fe0ba7c..e5deae2b26 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -452,7 +452,7 @@ public class Manager extends AbstractServer
     }
   }
 
-  Manager(ConfigOpts opts, String[] args) throws IOException {
+  protected Manager(ConfigOpts opts, String[] args) throws IOException {
     super("manager", opts, args);
     ServerContext context = super.getContext();
     balancerEnvironment = new BalancerEnvironmentImpl(context);
@@ -1257,7 +1257,7 @@ public class Manager extends AbstractServer
     log.info("exiting");
   }
 
-  private Fate<Manager> initializeFateInstance(ServerContext context, 
FateStore<Manager> store) {
+  protected Fate<Manager> initializeFateInstance(ServerContext context, 
FateStore<Manager> store) {
 
     final Fate<Manager> fateInstance =
         new Fate<>(this, store, TraceRepo::toLogString, getConfiguration());
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
index 50703849c6..cdd54ac143 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java
@@ -93,11 +93,22 @@ public class MergeTablets extends ManagerRepo {
           firstTabletMeta = Objects.requireNonNull(tabletMeta);
         }
 
+        // determine if this is the last tablet in the merge range
+        boolean isLastTablet = (range.endRow() == null && 
tabletMeta.getExtent().endRow() == null)
+            || (range.endRow() != null && 
tabletMeta.getExtent().contains(range.endRow()));
+
         if (prevExtent == null) {
           prevExtent = tabletMeta.getExtent();
         } else {
-          Preconditions.checkState(
-              Objects.equals(prevExtent.endRow(), 
tabletMeta.getExtent().prevEndRow()),
+          boolean pointsToPrevious =
+              Objects.equals(prevExtent.endRow(), 
tabletMeta.getExtent().prevEndRow());
+          boolean isAlreadyMerged = isLastTablet && tabletMeta.hasMerged()
+              && Objects.equals(firstTabletMeta.getPrevEndRow(), 
tabletMeta.getPrevEndRow());
+
+          // Need to ensure the tablets being merged form a proper linked 
list. In the case where
+          // this operation is running a second time the last tablet will not 
form a linked list and
+          // that is ok.
+          Preconditions.checkState(pointsToPrevious || isAlreadyMerged,
               "%s unexpectedly saw a hole in the metadata table %s %s", 
fateId, prevExtent,
               tabletMeta.getExtent());
           prevExtent = tabletMeta.getExtent();
@@ -109,9 +120,6 @@ public class MergeTablets extends ManagerRepo {
         maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, 
tabletMeta.getTime());
         tabletAvailabilities.add(tabletMeta.getTabletAvailability());
 
-        // determine if this is the last tablet in the merge range
-        boolean isLastTablet = (range.endRow() == null && 
tabletMeta.getExtent().endRow() == null)
-            || (range.endRow() != null && 
tabletMeta.getExtent().contains(range.endRow()));
         if (isLastTablet) {
           lastTabletMeta = tabletMeta;
         } else {
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
new file mode 100644
index 0000000000..7ca86ec2a0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.util.function.Function;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of fate that runs fate steps multiple times to ensure 
they are idempotent.
+ */
+public class FlakyFate<T> extends Fate<T> {
+
+  public FlakyFate(T environment, FateStore<T> store, Function<Repo<T>,String> 
toLogStrFunc,
+      AccumuloConfiguration conf) {
+    super(environment, store, toLogStrFunc, conf);
+  }
+
+  @Override
+  protected long executeIsReady(FateId fateId, Repo<T> repo) throws Exception {
+    var deferTime = super.executeIsReady(fateId, repo);
+    if (deferTime == 0) {
+      // run again when zero is returned to ensure idempotent
+      deferTime = super.executeIsReady(fateId, repo);
+      Preconditions.checkState(deferTime == 0);
+    }
+    return deferTime;
+  }
+
+  @Override
+  protected Repo<T> executeCall(FateId fateId, Repo<T> repo) throws Exception {
+    // Always run the call function twice to ensure its idempotent
+    var next1 = super.executeCall(fateId, repo);
+    var next2 = super.executeCall(fateId, repo);
+    // do some basic checks to ensure similar things were returned
+    if (next1 == null) {
+      Preconditions.checkState(next2 == null);
+    } else {
+      Preconditions.checkState(next2 != null);
+      Preconditions.checkState(next1.getClass().equals(next2.getClass()));
+    }
+    return next2;
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java
new file mode 100644
index 0000000000..a91eb1af2e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.server.ServerContext;
+import org.slf4j.LoggerFactory;
+
+public class FlakyFateManager extends Manager {
+  protected FlakyFateManager(ConfigOpts opts, String[] args) throws 
IOException {
+    super(opts, args);
+  }
+
+  @Override
+  protected Fate<Manager> initializeFateInstance(ServerContext context, 
FateStore<Manager> store) {
+    LoggerFactory.getLogger(FlakyFateManager.class).info("Creating Flaky Fate 
for {}",
+        store.type());
+    return new FlakyFate<>(this, store, TraceRepo::toLogString, 
getConfiguration());
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (FlakyFateManager manager = new FlakyFateManager(new ConfigOpts(), 
args)) {
+      manager.runServer();
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
new file mode 100644
index 0000000000..79e8e4895d
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FlakyFateManager;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run all delete rows using {@link org.apache.accumulo.test.fate.FlakyFate} 
to verify delete rows
+ * fate steps are idempotent.
+ */
+public class DeleteRowsFlakyFateIT extends DeleteRowsIT {
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class);
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java
new file mode 100644
index 0000000000..e6435f1266
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FlakyFateManager;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run all of the merge test using a flaky Fate impl that will run merge fate 
steps multiple times
+ * to ensure idempotent.
+ */
+public class MergeFlakyFateIT extends MergeIT {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class);
+  }
+}

Reply via email to