This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 7724f6ad7bb3a4cb00c8d4c8af042f8bc5efcf0c Merge: 69584e2835 10f45c4112 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat Jun 8 12:34:22 2024 -0400 Merge branch '2.1' .../java/org/apache/accumulo/core/fate/Fate.java | 7 +++ .../MiniAccumuloClusterControl.java | 30 +++++------ .../miniclusterImpl/MiniAccumuloConfigImpl.java | 40 +++++++++++++++ .../java/org/apache/accumulo/manager/Manager.java | 10 +++- .../org/apache/accumulo/test/fate/FlakyFate.java | 59 ++++++++++++++++++++++ .../accumulo/test/fate/FlakyFateManager.java | 49 ++++++++++++++++++ .../test/functional/DeleteRowsFlakyFateIT.java | 35 +++++++++++++ .../accumulo/test/functional/MergeFlakyFateIT.java | 36 +++++++++++++ 8 files changed, 248 insertions(+), 18 deletions(-) diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 9e395bb47c,e40679ebb0..68321cdf4b --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@@ -42,19 -41,15 +42,14 @@@ import org.apache.accumulo.core.conf.Pr import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; - import org.apache.accumulo.gc.SimpleGarbageCollector; - import org.apache.accumulo.manager.Manager; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; - import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.tserver.ScanServer; - import org.apache.accumulo.tserver.TabletServer; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; - import org.apache.zookeeper.server.ZooKeeperServerMain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -205,6 -195,7 +200,7 @@@ public class MiniAccumuloClusterContro start(server, Collections.emptyMap(), Integer.MAX_VALUE); } - @SuppressWarnings(value = {"removal", "unchecked"}) ++ @SuppressWarnings("unchecked") public synchronized void start(ServerType server, Map<String,String> configOverrides, int limit) throws IOException { if (limit <= 0) { @@@ -222,9 -215,10 +220,9 @@@ } } break; - case MASTER: case MANAGER: if (managerProcess == null) { - managerProcess = cluster._exec(Manager.class, server, configOverrides).getProcess(); + managerProcess = cluster._exec(classToUse, server, configOverrides).getProcess(); } break; case ZOOKEEPER: diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a778744c0f,92c3453e45..82cf1ea91a --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -72,6 -69,10 +72,7 @@@ import org.apache.accumulo.core.data.Va import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.Fate; + import org.apache.accumulo.core.fate.TStore; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; @@@ -422,7 -416,7 +423,7 @@@ public class Manager extends AbstractSe } } - Manager(ConfigOpts opts, String[] args) throws IOException { - protected Manager(ServerOpts opts, String[] args) throws IOException { ++ protected Manager(ConfigOpts opts, String[] args) throws IOException { super("manager", opts, args); ServerContext context = super.getContext(); balancerEnvironment = new BalancerEnvironmentImpl(context); @@@ -1253,7 -1240,8 +1254,7 @@@ context.getZooReaderWriter()), HOURS.toMillis(8), System::currentTimeMillis); - Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); - Fate<Manager> f = initializeFateInstance(store); - f.startTransactionRunners(getConfiguration()); ++ Fate<Manager> f = initializeFateInstance(store, getConfiguration()); fateRef.set(f); fateReadyLatch.countDown(); @@@ -1348,6 -1363,20 +1349,11 @@@ log.info("exiting"); } - @Deprecated - private void initializeZkForReplication(ZooReaderWriter zReaderWriter, String zroot) { - try { - org.apache.accumulo.server.replication.ZooKeeperInitialization - .ensureZooKeeperInitialized(zReaderWriter, zroot); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException("Exception while ensuring ZooKeeper is initialized", e); - } - } - - protected Fate<Manager> initializeFateInstance(TStore<Manager> store) { - return new Fate<>(this, store, TraceRepo::toLogString); ++ protected Fate<Manager> initializeFateInstance(TStore<Manager> store, ++ AccumuloConfiguration conf) { ++ return new Fate<>(this, store, TraceRepo::toLogString, conf); + } + /** * Allows property configuration to block manager start-up waiting for a minimum number of * tservers to register in zookeeper. It also accepts a maximum time to wait - if the time diff --cc test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java index 0000000000,424248b590..8655601d0d mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java @@@ -1,0 -1,57 +1,59 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.fate; + + import java.util.function.Function; + ++import org.apache.accumulo.core.conf.AccumuloConfiguration; + import org.apache.accumulo.core.fate.Fate; + import org.apache.accumulo.core.fate.Repo; + import org.apache.accumulo.core.fate.TStore; + + import com.google.common.base.Preconditions; + + /** + * An implementation of fate that runs fate steps multiple times to ensure they are idempotent. + */ + public class FlakyFate<T> extends Fate<T> { + - public FlakyFate(T environment, TStore<T> store, Function<Repo<T>,String> toLogStrFunc) { - super(environment, store, toLogStrFunc); ++ public FlakyFate(T environment, TStore<T> store, Function<Repo<T>,String> toLogStrFunc, ++ AccumuloConfiguration conf) { ++ super(environment, store, toLogStrFunc, conf); + } + + @Override + protected Repo<T> executeCall(Long tid, Repo<T> repo) throws Exception { + /* + * This function call assumes that isRead was already called once. So it runs + * call(),isReady(),call() to simulate a situation like isReady(), call(), fault, isReady() + * again, call() again. + */ + var next1 = super.executeCall(tid, repo); + Preconditions.checkState(super.executeIsReady(tid, repo) == 0); + var next2 = super.executeCall(tid, repo); + // do some basic checks to ensure similar things were returned + if (next1 == null) { + Preconditions.checkState(next2 == null); + } else { + Preconditions.checkState(next2 != null); + Preconditions.checkState(next1.getClass().equals(next2.getClass())); + } + return next2; + } + } diff --cc test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java index 0000000000,090d4769d9..9b4cd325c2 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFateManager.java @@@ -1,0 -1,47 +1,49 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.fate; + + import java.io.IOException; + ++import org.apache.accumulo.core.cli.ConfigOpts; ++import org.apache.accumulo.core.conf.AccumuloConfiguration; + import org.apache.accumulo.core.fate.Fate; + import org.apache.accumulo.core.fate.TStore; + import org.apache.accumulo.manager.Manager; + import org.apache.accumulo.manager.tableOps.TraceRepo; -import org.apache.accumulo.server.ServerOpts; + import org.slf4j.LoggerFactory; + + public class FlakyFateManager extends Manager { + - protected FlakyFateManager(ServerOpts opts, String[] args) throws IOException { ++ protected FlakyFateManager(ConfigOpts opts, String[] args) throws IOException { + super(opts, args); + } + + @Override - protected Fate<Manager> initializeFateInstance(TStore<Manager> store) { ++ protected Fate<Manager> initializeFateInstance(TStore<Manager> store, ++ AccumuloConfiguration conf) { + LoggerFactory.getLogger(FlakyFateManager.class).info("Creating Flaky Fate"); - return new FlakyFate<>(this, store, TraceRepo::toLogString); ++ return new FlakyFate<>(this, store, TraceRepo::toLogString, conf); + } + + public static void main(String[] args) throws Exception { - try (FlakyFateManager manager = new FlakyFateManager(new ServerOpts(), args)) { ++ try (FlakyFateManager manager = new FlakyFateManager(new ConfigOpts(), args)) { + manager.runServer(); + } + } + }