This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 10f45c4112 Backport FlakyFate and idempotent tests for Merge and DeleteRows (#4656) 10f45c4112 is described below commit 10f45c4112458786df137d4125f0e1fd32ca8262 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat Jun 8 12:32:23 2024 -0400 Backport FlakyFate and idempotent tests for Merge and DeleteRows (#4656) This backports the FlakyFate and FlakyFateManager impl from elasticity that was added in #4643 so that fate operations can be easily tested to check if they are idempotent. DeleteRowsFlakyFateIT and MergeFlakyFateIT were also backported and pass verifying the operations are idempotent. --- .../java/org/apache/accumulo/core/fate/Fate.java | 7 +++ .../MiniAccumuloClusterControl.java | 31 ++++++------ .../miniclusterImpl/MiniAccumuloConfigImpl.java | 40 +++++++++++++++ .../java/org/apache/accumulo/manager/Manager.java | 9 +++- .../org/apache/accumulo/test/fate/FlakyFate.java | 57 ++++++++++++++++++++++ .../accumulo/test/fate/FlakyFateManager.java | 47 ++++++++++++++++++ .../test/functional/DeleteRowsFlakyFateIT.java | 35 +++++++++++++ .../accumulo/test/functional/MergeFlakyFateIT.java | 36 ++++++++++++++ 8 files changed, 243 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 8dadac916e..4eb690e5b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -215,7 +215,14 @@ public class Fate<T> { log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e); } } + } + + protected long executeIsReady(Long tid, Repo<T> op) throws Exception { + return op.isReady(tid, environment); + } + protected Repo<T> executeCall(Long tid, Repo<T> op) throws Exception { + return op.call(tid, environment); } /** diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 69f6f64297..e40679ebb0 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -44,17 +44,12 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; -import org.apache.accumulo.gc.SimpleGarbageCollector; -import org.apache.accumulo.manager.Manager; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; -import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.tserver.ScanServer; -import org.apache.accumulo.tserver.TabletServer; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; -import org.apache.zookeeper.server.ZooKeeperServerMain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -200,13 +195,15 @@ public class MiniAccumuloClusterControl implements ClusterControl { start(server, Collections.emptyMap(), Integer.MAX_VALUE); } - @SuppressWarnings("removal") + @SuppressWarnings(value = {"removal", "unchecked"}) public synchronized void start(ServerType server, Map<String,String> configOverrides, int limit) throws IOException { if (limit <= 0) { return; } + Class<?> classToUse = cluster.getConfig().getServerClass(server); + switch (server) { case TABLET_SERVER: synchronized (tabletServerProcesses) { @@ -214,31 +211,31 @@ public class MiniAccumuloClusterControl implements ClusterControl { for (int i = tabletServerProcesses.size(); count < limit && i < cluster.getConfig().getNumTservers(); i++, ++count) { tabletServerProcesses - .add(cluster._exec(TabletServer.class, server, configOverrides).getProcess()); + .add(cluster._exec(classToUse, server, configOverrides).getProcess()); } } break; case MASTER: case MANAGER: if (managerProcess == null) { - managerProcess = cluster._exec(Manager.class, server, configOverrides).getProcess(); + managerProcess = cluster._exec(classToUse, server, configOverrides).getProcess(); } break; case ZOOKEEPER: if (zooKeeperProcess == null) { - zooKeeperProcess = cluster._exec(ZooKeeperServerMain.class, server, configOverrides, - cluster.getZooCfgFile().getAbsolutePath()).getProcess(); + zooKeeperProcess = cluster + ._exec(classToUse, server, configOverrides, cluster.getZooCfgFile().getAbsolutePath()) + .getProcess(); } break; case GARBAGE_COLLECTOR: if (gcProcess == null) { - gcProcess = - cluster._exec(SimpleGarbageCollector.class, server, configOverrides).getProcess(); + gcProcess = cluster._exec(classToUse, server, configOverrides).getProcess(); } break; case MONITOR: if (monitor == null) { - monitor = cluster._exec(Monitor.class, server, configOverrides).getProcess(); + monitor = cluster._exec(classToUse, server, configOverrides).getProcess(); } break; case SCAN_SERVER: @@ -247,16 +244,16 @@ public class MiniAccumuloClusterControl implements ClusterControl { for (int i = scanServerProcesses.size(); count < limit && i < cluster.getConfig().getNumScanServers(); i++, ++count) { scanServerProcesses - .add(cluster._exec(ScanServer.class, server, configOverrides).getProcess()); + .add(cluster._exec(classToUse, server, configOverrides).getProcess()); } } break; case COMPACTION_COORDINATOR: - startCoordinator(CompactionCoordinator.class); + startCoordinator((Class<? extends CompactionCoordinator>) classToUse); break; case COMPACTOR: - startCompactors(Compactor.class, cluster.getConfig().getNumCompactors(), - configOverrides.get("QUEUE_NAME")); + startCompactors((Class<? extends Compactor>) classToUse, + cluster.getConfig().getNumCompactors(), configOverrides.get("QUEUE_NAME")); break; default: throw new UnsupportedOperationException("Cannot start process for " + server); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index 025b320f74..f457908a61 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@ -18,26 +18,43 @@ */ package org.apache.accumulo.miniclusterImpl; +import static org.apache.accumulo.minicluster.ServerType.COMPACTOR; +import static org.apache.accumulo.minicluster.ServerType.GARBAGE_COLLECTOR; +import static org.apache.accumulo.minicluster.ServerType.MANAGER; +import static org.apache.accumulo.minicluster.ServerType.MONITOR; +import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER; +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; +import static org.apache.accumulo.minicluster.ServerType.ZOOKEEPER; + import java.io.File; import java.io.IOException; import java.net.MalformedURLException; +import java.util.EnumMap; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.function.Consumer; +import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.HadoopCredentialProvider; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.manager.Manager; import org.apache.accumulo.minicluster.MemoryUnit; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.accumulo.tserver.TabletServer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.zookeeper.server.ZooKeeperServerMain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +79,10 @@ public class MiniAccumuloConfigImpl { private int numScanServers = 0; private int numCompactors = 1; private Map<ServerType,Long> memoryConfig = new HashMap<>(); + private final EnumMap<ServerType,Class<?>> serverTypeClasses = + new EnumMap<>(Map.of(MANAGER, Manager.class, GARBAGE_COLLECTOR, SimpleGarbageCollector.class, + MONITOR, Monitor.class, ZOOKEEPER, ZooKeeperServerMain.class, TABLET_SERVER, + TabletServer.class, SCAN_SERVER, ScanServer.class, COMPACTOR, Compactor.class)); private boolean jdwpEnabled = false; private Map<String,String> systemProperties = new HashMap<>(); @@ -414,6 +435,25 @@ public class MiniAccumuloConfigImpl { return this; } + /** + * Sets the class that will be used to instantiate this server type. + */ + public MiniAccumuloConfigImpl setServerClass(ServerType type, Class<?> serverClass) { + serverTypeClasses.put(type, Objects.requireNonNull(serverClass)); + return this; + } + + /** + * @return the class to use to instantiate this server type. + */ + public Class<?> getServerClass(ServerType type) { + var clazz = serverTypeClasses.get(type); + if (clazz == null) { + throw new IllegalStateException("Server type " + type + " has no class"); + } + return clazz; + } + /** * @return a copy of the site config */ diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 84e8e68519..92c3453e45 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.TStore; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; @@ -415,7 +416,7 @@ public class Manager extends AbstractServer } } - Manager(ServerOpts opts, String[] args) throws IOException { + protected Manager(ServerOpts opts, String[] args) throws IOException { super("manager", opts, args); ServerContext context = super.getContext(); balancerEnvironment = new BalancerEnvironmentImpl(context); @@ -1239,7 +1240,7 @@ public class Manager extends AbstractServer context.getZooReaderWriter()), HOURS.toMillis(8), System::currentTimeMillis); - Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString); + Fate<Manager> f = initializeFateInstance(store); f.startTransactionRunners(getConfiguration()); fateRef.set(f); fateReadyLatch.countDown(); @@ -1372,6 +1373,10 @@ public class Manager extends AbstractServer } } + protected Fate<Manager> initializeFateInstance(TStore<Manager> store) { + return new Fate<>(this, store, TraceRepo::toLogString); + } + /** * Allows property configuration to block manager start-up waiting for a minimum number of * tservers to register in zookeeper. It also accepts a maximum time to wait - if the time diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java new file mode 100644 index 0000000000..424248b590 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.util.function.Function; + +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.TStore; + +import com.google.common.base.Preconditions; + +/** + * An implementation of fate that runs fate steps multiple times to ensure they are idempotent. + */ +public class FlakyFate<T> extends Fate<T> { + + public FlakyFate(T environment, TStore<T> store, Function<Repo<T>,String> toLogStrFunc) { + super(environment, store, toLogStrFunc); + } + + @Override + protected Repo<T> executeCall(Long tid, Repo<T> repo) throws Exception { + /* + * This function call assumes that isRead was already called once. So it runs + * call(),isReady(),call() to simulate a situation like isReady(), call(), fault, isReady() + * again, call() again. + */ + var next1 = super.executeCall(tid, repo); + Preconditions.checkState(super.executeIsReady(tid, repo) == 0); + var next2 = super.executeCall(tid, repo); + // do some basic checks to ensure similar things were returned + if (next1 == null) { + Preconditions.checkState(next2 == null); + } else { + Preconditions.checkState(next2 != null); + Preconditions.checkState(next1.getClass().equals(next2.getClass())); + } + return next2; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java new file mode 100644 index 0000000000..090d4769d9 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.io.IOException; + +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.TStore; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.TraceRepo; +import org.apache.accumulo.server.ServerOpts; +import org.slf4j.LoggerFactory; + +public class FlakyFateManager extends Manager { + + protected FlakyFateManager(ServerOpts opts, String[] args) throws IOException { + super(opts, args); + } + + @Override + protected Fate<Manager> initializeFateInstance(TStore<Manager> store) { + LoggerFactory.getLogger(FlakyFateManager.class).info("Creating Flaky Fate"); + return new FlakyFate<>(this, store, TraceRepo::toLogString); + } + + public static void main(String[] args) throws Exception { + try (FlakyFateManager manager = new FlakyFateManager(new ServerOpts(), args)) { + manager.runServer(); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java new file mode 100644 index 0000000000..79e8e4895d --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/DeleteRowsFlakyFateIT.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.fate.FlakyFateManager; +import org.apache.hadoop.conf.Configuration; + +/** + * Run all delete rows using {@link org.apache.accumulo.test.fate.FlakyFate} to verify delete rows + * fate steps are idempotent. + */ +public class DeleteRowsFlakyFateIT extends DeleteRowsIT { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java new file mode 100644 index 0000000000..e6435f1266 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeFlakyFateIT.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.fate.FlakyFateManager; +import org.apache.hadoop.conf.Configuration; + +/** + * Run all of the merge test using a flaky Fate impl that will run merge fate steps multiple times + * to ensure idempotent. + */ +public class MergeFlakyFateIT extends MergeIT { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setServerClass(ServerType.MANAGER, FlakyFateManager.class); + } +}