This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 10f45c4112 Backport FlakyFate and idempotent tests for Merge and
DeleteRows (#4656)
10f45c4112 is described below
commit 10f45c4112458786df137d4125f0e1fd32ca8262
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Sat Jun 8 12:32:23 2024 -0400
Backport FlakyFate and idempotent tests for Merge and DeleteRows (#4656)
This backports the FlakyFate and FlakyFateManager impl from elasticity
that was added in #4643 so that fate operations can be easily tested to
check if they are idempotent. DeleteRowsFlakyFateIT and MergeFlakyFateIT
were also backported and pass verifying the operations are idempotent.
---
.../java/org/apache/accumulo/core/fate/Fate.java | 7 +++
.../MiniAccumuloClusterControl.java | 31 ++++++------
.../miniclusterImpl/MiniAccumuloConfigImpl.java | 40 +++++++++++++++
.../java/org/apache/accumulo/manager/Manager.java | 9 +++-
.../org/apache/accumulo/test/fate/FlakyFate.java | 57 ++++++++++++++++++++++
.../accumulo/test/fate/FlakyFateManager.java | 47 ++++++++++++++++++
.../test/functional/DeleteRowsFlakyFateIT.java | 35 +++++++++++++
.../accumulo/test/functional/MergeFlakyFateIT.java | 36 ++++++++++++++
8 files changed, 243 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 8dadac916e..4eb690e5b2 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -215,7 +215,14 @@ public class Fate<T> {
log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e);
}
}
+ }
+
+ protected long executeIsReady(Long tid, Repo<T> op) throws Exception {
+ return op.isReady(tid, environment);
+ }
+ protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception {
+ return op.call(tid, environment);
}
/**
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index 69f6f64297..e40679ebb0 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -44,17 +44,12 @@ import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
-import org.apache.accumulo.gc.SimpleGarbageCollector;
-import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
-import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.tserver.ScanServer;
-import org.apache.accumulo.tserver.TabletServer;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -200,13 +195,15 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
start(server, Collections.emptyMap(), Integer.MAX_VALUE);
}
- @SuppressWarnings("removal")
+ @SuppressWarnings(value = {"removal", "unchecked"})
public synchronized void start(ServerType server, Map<String,String>
configOverrides, int limit)
throws IOException {
if (limit <= 0) {
return;
}
+ Class<?> classToUse = cluster.getConfig().getServerClass(server);
+
switch (server) {
case TABLET_SERVER:
synchronized (tabletServerProcesses) {
@@ -214,31 +211,31 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
for (int i = tabletServerProcesses.size();
count < limit && i < cluster.getConfig().getNumTservers(); i++,
++count) {
tabletServerProcesses
- .add(cluster._exec(TabletServer.class, server,
configOverrides).getProcess());
+ .add(cluster._exec(classToUse, server,
configOverrides).getProcess());
}
}
break;
case MASTER:
case MANAGER:
if (managerProcess == null) {
- managerProcess = cluster._exec(Manager.class, server,
configOverrides).getProcess();
+ managerProcess = cluster._exec(classToUse, server,
configOverrides).getProcess();
}
break;
case ZOOKEEPER:
if (zooKeeperProcess == null) {
- zooKeeperProcess = cluster._exec(ZooKeeperServerMain.class, server,
configOverrides,
- cluster.getZooCfgFile().getAbsolutePath()).getProcess();
+ zooKeeperProcess = cluster
+ ._exec(classToUse, server, configOverrides,
cluster.getZooCfgFile().getAbsolutePath())
+ .getProcess();
}
break;
case GARBAGE_COLLECTOR:
if (gcProcess == null) {
- gcProcess =
- cluster._exec(SimpleGarbageCollector.class, server,
configOverrides).getProcess();
+ gcProcess = cluster._exec(classToUse, server,
configOverrides).getProcess();
}
break;
case MONITOR:
if (monitor == null) {
- monitor = cluster._exec(Monitor.class, server,
configOverrides).getProcess();
+ monitor = cluster._exec(classToUse, server,
configOverrides).getProcess();
}
break;
case SCAN_SERVER:
@@ -247,16 +244,16 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
for (int i = scanServerProcesses.size();
count < limit && i < cluster.getConfig().getNumScanServers();
i++, ++count) {
scanServerProcesses
- .add(cluster._exec(ScanServer.class, server,
configOverrides).getProcess());
+ .add(cluster._exec(classToUse, server,
configOverrides).getProcess());
}
}
break;
case COMPACTION_COORDINATOR:
- startCoordinator(CompactionCoordinator.class);
+ startCoordinator((Class<? extends CompactionCoordinator>) classToUse);
break;
case COMPACTOR:
- startCompactors(Compactor.class,
cluster.getConfig().getNumCompactors(),
- configOverrides.get("QUEUE_NAME"));
+ startCompactors((Class<? extends Compactor>) classToUse,
+ cluster.getConfig().getNumCompactors(),
configOverrides.get("QUEUE_NAME"));
break;
default:
throw new UnsupportedOperationException("Cannot start process for " +
server);
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 025b320f74..f457908a61 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -18,26 +18,43 @@
*/
package org.apache.accumulo.miniclusterImpl;
+import static org.apache.accumulo.minicluster.ServerType.COMPACTOR;
+import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR;
+import static org.apache.accumulo.minicluster.ServerType.MANAGER;
+import static org.apache.accumulo.minicluster.ServerType.MONITOR;
+import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.apache.accumulo.minicluster.ServerType.ZOOKEEPER;
+
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.function.Consumer;
+import org.apache.accumulo.compactor.Compactor;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.HadoopCredentialProvider;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.accumulo.tserver.TabletServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +79,10 @@ public class MiniAccumuloConfigImpl {
private int numScanServers = 0;
private int numCompactors = 1;
private Map<ServerType,Long> memoryConfig = new HashMap<>();
+ private final EnumMap<ServerType,Class<?>> serverTypeClasses =
+ new EnumMap<>(Map.of(MANAGER, Manager.class, GARBAGE_COLLECTOR,
SimpleGarbageCollector.class,
+ MONITOR, Monitor.class, ZOOKEEPER, ZooKeeperServerMain.class,
TABLET_SERVER,
+ TabletServer.class, SCAN_SERVER, ScanServer.class, COMPACTOR,
Compactor.class));
private boolean jdwpEnabled = false;
private Map<String,String> systemProperties = new HashMap<>();
@@ -414,6 +435,25 @@ public class MiniAccumuloConfigImpl {
return this;
}
+ /**
+ * Sets the class that will be used to instantiate this server type.
+ */
+ public MiniAccumuloConfigImpl setServerClass(ServerType type, Class<?>
serverClass) {
+ serverTypeClasses.put(type, Objects.requireNonNull(serverClass));
+ return this;
+ }
+
+ /**
+ * @return the class to use to instantiate this server type.
+ */
+ public Class<?> getServerClass(ServerType type) {
+ var clazz = serverTypeClasses.get(type);
+ if (clazz == null) {
+ throw new IllegalStateException("Server type " + type + " has no class");
+ }
+ return clazz;
+ }
+
/**
* @return a copy of the site config
*/
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 84e8e68519..92c3453e45 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -69,6 +69,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.AgeOffStore;
import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.TStore;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
@@ -415,7 +416,7 @@ public class Manager extends AbstractServer
}
}
- Manager(ServerOpts opts, String[] args) throws IOException {
+ protected Manager(ServerOpts opts, String[] args) throws IOException {
super("manager", opts, args);
ServerContext context = super.getContext();
balancerEnvironment = new BalancerEnvironmentImpl(context);
@@ -1239,7 +1240,7 @@ public class Manager extends AbstractServer
context.getZooReaderWriter()),
HOURS.toMillis(8), System::currentTimeMillis);
- Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString);
+ Fate<Manager> f = initializeFateInstance(store);
f.startTransactionRunners(getConfiguration());
fateRef.set(f);
fateReadyLatch.countDown();
@@ -1372,6 +1373,10 @@ public class Manager extends AbstractServer
}
}
+ protected Fate<Manager> initializeFateInstance(TStore<Manager> store) {
+ return new Fate<>(this, store, TraceRepo::toLogString);
+ }
+
/**
* Allows property configuration to block manager start-up waiting for a
minimum number of
* tservers to register in zookeeper. It also accepts a maximum time to wait
- if the time
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
new file mode 100644
index 0000000000..424248b590
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.util.function.Function;
+
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.TStore;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of fate that runs fate steps multiple times to ensure
they are idempotent.
+ */
+public class FlakyFate<T> extends Fate<T> {
+
+ public FlakyFate(T environment, TStore<T> store, Function<Repo<T>,String>
toLogStrFunc) {
+ super(environment, store, toLogStrFunc);
+ }
+
+ @Override
+ protected Repo<T> executeCall(Long tid, Repo<T> repo) throws Exception {
+ /*
+ * This function call assumes that isRead was already called once. So it
runs
+ * call(),isReady(),call() to simulate a situation like isReady(), call(),
fault, isReady()
+ * again, call() again.
+ */
+ var next1 = super.executeCall(tid, repo);
+ Preconditions.checkState(super.executeIsReady(tid, repo) == 0);
+ var next2 = super.executeCall(tid, repo);
+ // do some basic checks to ensure similar things were returned
+ if (next1 == null) {
+ Preconditions.checkState(next2 == null);
+ } else {
+ Preconditions.checkState(next2 != null);
+ Preconditions.checkState(next1.getClass().equals(next2.getClass()));
+ }
+ return next2;
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java
b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java
new file mode 100644
index 0000000000..090d4769d9
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.TStore;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.TraceRepo;
+import org.apache.accumulo.server.ServerOpts;
+import org.slf4j.LoggerFactory;
+
+public class FlakyFateManager extends Manager {
+
+ protected FlakyFateManager(ServerOpts opts, String[] args) throws
IOException {
+ super(opts, args);
+ }
+
+ @Override
+ protected Fate<Manager> initializeFateInstance(TStore<Manager> store) {
+ LoggerFactory.getLogger(FlakyFateManager.class).info("Creating Flaky
Fate");
+ return new FlakyFate<>(this, store, TraceRepo::toLogString);
+ }
+
+ public static void main(String[] args) throws Exception {
+ try (FlakyFateManager manager = new FlakyFateManager(new ServerOpts(),
args)) {
+ manager.runServer();
+ }
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
new file mode 100644
index 0000000000..79e8e4895d
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FlakyFateManager;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run all delete rows using {@link org.apache.accumulo.test.fate.FlakyFate}
to verify delete rows
+ * fate steps are idempotent.
+ */
+public class DeleteRowsFlakyFateIT extends DeleteRowsIT {
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class);
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java
new file mode 100644
index 0000000000..e6435f1266
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.fate.FlakyFateManager;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run all of the merge test using a flaky Fate impl that will run merge fate
steps multiple times
+ * to ensure idempotent.
+ */
+public class MergeFlakyFateIT extends MergeIT {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class);
+ }
+}