This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 10f45c4112 Backport FlakyFate and idempotent tests for Merge and 
DeleteRows (#4656)
10f45c4112 is described below

commit 10f45c4112458786df137d4125f0e1fd32ca8262
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Sat Jun 8 12:32:23 2024 -0400

    Backport FlakyFate and idempotent tests for Merge and DeleteRows (#4656)
    
    This backports the FlakyFate and FlakyFateManager impl from elasticity
    that was added in #4643 so that fate operations can be easily tested to
    check if they are idempotent. DeleteRowsFlakyFateIT and MergeFlakyFateIT
    were also backported and pass verifying the operations are idempotent.
---
 .../java/org/apache/accumulo/core/fate/Fate.java   |  7 +++
 .../MiniAccumuloClusterControl.java                | 31 ++++++------
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    | 40 +++++++++++++++
 .../java/org/apache/accumulo/manager/Manager.java  |  9 +++-
 .../org/apache/accumulo/test/fate/FlakyFate.java   | 57 ++++++++++++++++++++++
 .../accumulo/test/fate/FlakyFateManager.java       | 47 ++++++++++++++++++
 .../test/functional/DeleteRowsFlakyFateIT.java     | 35 +++++++++++++
 .../accumulo/test/functional/MergeFlakyFateIT.java | 36 ++++++++++++++
 8 files changed, 243 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 8dadac916e..4eb690e5b2 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -215,7 +215,14 @@ public class Fate<T> {
         log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e);
       }
     }
+  }
+
+  protected long executeIsReady(Long tid, Repo<T> op) throws Exception {
+    return op.isReady(tid, environment);
+  }
 
+  protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception {
+    return op.call(tid, environment);
   }
 
   /**
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 69f6f64297..e40679ebb0 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -44,17 +44,12 @@ import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
-import org.apache.accumulo.gc.SimpleGarbageCollector;
-import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
-import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.server.util.Admin;
 import org.apache.accumulo.tserver.ScanServer;
-import org.apache.accumulo.tserver.TabletServer;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -200,13 +195,15 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
     start(server, Collections.emptyMap(), Integer.MAX_VALUE);
   }
 
-  @SuppressWarnings("removal")
+  @SuppressWarnings(value = {"removal", "unchecked"})
   public synchronized void start(ServerType server, Map<String,String> 
configOverrides, int limit)
       throws IOException {
     if (limit <= 0) {
       return;
     }
 
+    Class<?> classToUse = cluster.getConfig().getServerClass(server);
+
     switch (server) {
       case TABLET_SERVER:
         synchronized (tabletServerProcesses) {
@@ -214,31 +211,31 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
           for (int i = tabletServerProcesses.size();
               count < limit && i < cluster.getConfig().getNumTservers(); i++, 
++count) {
             tabletServerProcesses
-                .add(cluster._exec(TabletServer.class, server, 
configOverrides).getProcess());
+                .add(cluster._exec(classToUse, server, 
configOverrides).getProcess());
           }
         }
         break;
       case MASTER:
       case MANAGER:
         if (managerProcess == null) {
-          managerProcess = cluster._exec(Manager.class, server, 
configOverrides).getProcess();
+          managerProcess = cluster._exec(classToUse, server, 
configOverrides).getProcess();
         }
         break;
       case ZOOKEEPER:
         if (zooKeeperProcess == null) {
-          zooKeeperProcess = cluster._exec(ZooKeeperServerMain.class, server, 
configOverrides,
-              cluster.getZooCfgFile().getAbsolutePath()).getProcess();
+          zooKeeperProcess = cluster
+              ._exec(classToUse, server, configOverrides, 
cluster.getZooCfgFile().getAbsolutePath())
+              .getProcess();
         }
         break;
       case GARBAGE_COLLECTOR:
         if (gcProcess == null) {
-          gcProcess =
-              cluster._exec(SimpleGarbageCollector.class, server, 
configOverrides).getProcess();
+          gcProcess = cluster._exec(classToUse, server, 
configOverrides).getProcess();
         }
         break;
       case MONITOR:
         if (monitor == null) {
-          monitor = cluster._exec(Monitor.class, server, 
configOverrides).getProcess();
+          monitor = cluster._exec(classToUse, server, 
configOverrides).getProcess();
         }
         break;
       case SCAN_SERVER:
@@ -247,16 +244,16 @@ public class MiniAccumuloClusterControl implements 
ClusterControl {
           for (int i = scanServerProcesses.size();
               count < limit && i < cluster.getConfig().getNumScanServers(); 
i++, ++count) {
             scanServerProcesses
-                .add(cluster._exec(ScanServer.class, server, 
configOverrides).getProcess());
+                .add(cluster._exec(classToUse, server, 
configOverrides).getProcess());
           }
         }
         break;
       case COMPACTION_COORDINATOR:
-        startCoordinator(CompactionCoordinator.class);
+        startCoordinator((Class<? extends CompactionCoordinator>) classToUse);
         break;
       case COMPACTOR:
-        startCompactors(Compactor.class, 
cluster.getConfig().getNumCompactors(),
-            configOverrides.get("QUEUE_NAME"));
+        startCompactors((Class<? extends Compactor>) classToUse,
+            cluster.getConfig().getNumCompactors(), 
configOverrides.get("QUEUE_NAME"));
         break;
       default:
         throw new UnsupportedOperationException("Cannot start process for " + 
server);
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 025b320f74..f457908a61 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -18,26 +18,43 @@
  */
 package org.apache.accumulo.miniclusterImpl;
 
+import static org.apache.accumulo.minicluster.ServerType.COMPACTOR;
+import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
+import static org.apache.accumulo.minicluster.ServerType.MANAGER;
+import static org.apache.accumulo.minicluster.ServerType.MONITOR;
+import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.ZOOKEEPER;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.function.Consumer;
 
+import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.HadoopCredentialProvider;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.minicluster.MemoryUnit;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.accumulo.tserver.TabletServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +79,10 @@ public class MiniAccumuloConfigImpl {
   private int numScanServers = 0;
   private int numCompactors = 1;
   private Map<ServerType,Long> memoryConfig = new HashMap<>();
+  private final EnumMap<ServerType,Class<?>> serverTypeClasses =
+      new EnumMap<>(Map.of(MANAGER, Manager.class, GARBAGE_COLLECTOR, 
SimpleGarbageCollector.class,
+          MONITOR, Monitor.class, ZOOKEEPER, ZooKeeperServerMain.class, 
TABLET_SERVER,
+          TabletServer.class, SCAN_SERVER, ScanServer.class, COMPACTOR, 
Compactor.class));
   private boolean jdwpEnabled = false;
   private Map<String,String> systemProperties = new HashMap<>();
 
@@ -414,6 +435,25 @@ public class MiniAccumuloConfigImpl {
     return this;
   }
 
+  /**
+   * Sets the class that will be used to instantiate this server type.
+   */
+  public MiniAccumuloConfigImpl setServerClass(ServerType type, Class<?> 
serverClass) {
+    serverTypeClasses.put(type, Objects.requireNonNull(serverClass));
+    return this;
+  }
+
+  /**
+   * @return the class to use to instantiate this server type.
+   */
+  public Class<?> getServerClass(ServerType type) {
+    var clazz = serverTypeClasses.get(type);
+    if (clazz == null) {
+      throw new IllegalStateException("Server type " + type + " has no class");
+    }
+    return clazz;
+  }
+
   /**
    * @return a copy of the site config
    */
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 84e8e68519..92c3453e45 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -69,6 +69,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.AgeOffStore;
 import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.TStore;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
@@ -415,7 +416,7 @@ public class Manager extends AbstractServer
     }
   }
 
-  Manager(ServerOpts opts, String[] args) throws IOException {
+  protected Manager(ServerOpts opts, String[] args) throws IOException {
     super("manager", opts, args);
     ServerContext context = super.getContext();
     balancerEnvironment = new BalancerEnvironmentImpl(context);
@@ -1239,7 +1240,7 @@ public class Manager extends AbstractServer
               context.getZooReaderWriter()),
           HOURS.toMillis(8), System::currentTimeMillis);
 
-      Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString);
+      Fate<Manager> f = initializeFateInstance(store);
       f.startTransactionRunners(getConfiguration());
       fateRef.set(f);
       fateReadyLatch.countDown();
@@ -1372,6 +1373,10 @@ public class Manager extends AbstractServer
     }
   }
 
+  protected Fate<Manager> initializeFateInstance(TStore<Manager> store) {
+    return new Fate<>(this, store, TraceRepo::toLogString);
+  }
+
   /**
    * Allows property configuration to block manager start-up waiting for a 
minimum number of
    * tservers to register in zookeeper. It also accepts a maximum time to wait 
- if the time
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
new file mode 100644
index 0000000000..424248b590
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.util.function.Function;
+
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.TStore;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of fate that runs fate steps multiple times to ensure 
they are idempotent.
+ */
+public class FlakyFate<T> extends Fate<T> {
+
+  public FlakyFate(T environment, TStore<T> store, Function<Repo<T>,String> 
toLogStrFunc) {
+    super(environment, store, toLogStrFunc);
+  }
+
+  @Override
+  protected Repo<T> executeCall(Long tid, Repo<T> repo) throws Exception {
+    /*
+     * This function call assumes that isRead was already called once. So it 
runs
+     * call(),isReady(),call() to simulate a situation like isReady(), call(), 
fault, isReady()
+     * again, call() again.
+     */
+    var next1 = super.executeCall(tid, repo);
+    Preconditions.checkState(super.executeIsReady(tid, repo) == 0);
+    var next2 = super.executeCall(tid, repo);
+    // do some basic checks to ensure similar things were returned
+    if (next1 == null) {
+      Preconditions.checkState(next2 == null);
+    } else {
+      Preconditions.checkState(next2 != null);
+      Preconditions.checkState(next1.getClass().equals(next2.getClass()));
+    }
+    return next2;
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java
new file mode 100644
index 0000000000..090d4769d9
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.TStore;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.server.ServerOpts;
+import org.slf4j.LoggerFactory;
+
+public class FlakyFateManager extends Manager {
+
+  protected FlakyFateManager(ServerOpts opts, String[] args) throws 
IOException {
+    super(opts, args);
+  }
+
+  @Override
+  protected Fate<Manager> initializeFateInstance(TStore<Manager> store) {
+    LoggerFactory.getLogger(FlakyFateManager.class).info("Creating Flaky 
Fate");
+    return new FlakyFate<>(this, store, TraceRepo::toLogString);
+  }
+
+  public static void main(String[] args) throws Exception {
+    try (FlakyFateManager manager = new FlakyFateManager(new ServerOpts(), 
args)) {
+      manager.runServer();
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
new file mode 100644
index 0000000000..79e8e4895d
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FlakyFateManager;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run all delete rows using {@link org.apache.accumulo.test.fate.FlakyFate} 
to verify delete rows
+ * fate steps are idempotent.
+ */
+public class DeleteRowsFlakyFateIT extends DeleteRowsIT {
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class);
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java
new file mode 100644
index 0000000000..e6435f1266
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FlakyFateManager;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run all of the merge test using a flaky Fate impl that will run merge fate 
steps multiple times
+ * to ensure idempotent.
+ */
+public class MergeFlakyFateIT extends MergeIT {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class);
+  }
+}

Reply via email to