This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f9d8afebba Several misc Fate changes (#4912) f9d8afebba is described below commit f9d8afebbad00d2db476e23b8bedff839441b2c1 Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Thu Oct 3 10:04:41 2024 -0400 Several misc Fate changes (#4912) - Add a toString() to FateKey - Move MetaFateStore to org.apache.accumulo.core.fate.zookeeper - Periodic clean up of dead reservations increased from every 30 seconds to every few minutes - New fate test case added to FateIT that ensures no write ops can be performed on a transaction after it has been deleted - Added new check to verifyReserved() that checks whether the transaction is deleted - Fixed UserFateStoreIT to work with new change and misc cleanup to the class - created new class FastFate which performs the dead reservation cleanup more often (used in testing) --- .../accumulo/core/fate/AbstractFateStore.java | 14 +++---- .../java/org/apache/accumulo/core/fate/Fate.java | 8 +++- .../org/apache/accumulo/core/fate/FateKey.java | 11 +++++ .../accumulo/core/fate/user/UserFateStore.java | 18 ++++----- .../core/fate/{ => zookeeper}/MetaFateStore.java | 30 ++++++++------ .../org/apache/accumulo/server/util/Admin.java | 2 +- .../java/org/apache/accumulo/manager/Manager.java | 2 +- .../manager/metrics/fate/meta/MetaFateMetrics.java | 2 +- .../test/compaction/ExternalCompaction_1_IT.java | 2 +- .../org/apache/accumulo/test/fate/FastFate.java | 43 ++++++++++++++++++++ .../java/org/apache/accumulo/test/fate/FateIT.java | 32 +++++++++++++++ .../accumulo/test/fate/FateOpsCommandsIT.java | 2 +- .../accumulo/test/fate/MultipleStoresIT.java | 24 +++++------ .../apache/accumulo/test/fate/meta/MetaFateIT.java | 2 +- .../test/fate/meta/MetaFateInterleavingIT.java | 2 +- .../test/fate/meta/MetaFateOpsCommandsIT.java | 2 +- .../test/fate/meta/MetaFateStoreFateIT.java | 2 +- .../accumulo/test/fate/user/UserFateStoreIT.java | 47 +++++++--------------- .../test/functional/FateConcurrencyIT.java | 2 +- .../test/functional/FunctionalTestUtils.java | 2 +- 20 files changed, 162 insertions(+), 87 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 97b391e218..ff5e45d310 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -310,7 +310,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { public TStatus waitForStatusChange(EnumSet<TStatus> expected) { Preconditions.checkState(!isReserved(), "Attempted to wait for status change while reserved: " + fateId); - verifyReserved(false); + verifyReservedAndNotDeleted(false); int currNumCallers = concurrentStatusChangeCallers.incrementAndGet(); @@ -375,16 +375,14 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { protected abstract void unreserve(); - protected void verifyReserved(boolean isWrite) { - if (!isReserved() && isWrite) { - throw new IllegalStateException( - "Attempted write on unreserved FATE transaction: " + fateId); - } + protected void verifyReservedAndNotDeleted(boolean isWrite) { + Preconditions.checkState(!isWrite || (isReserved() && !deleted), + "Attempted write on unreserved or deleted FATE transaction: " + fateId); } @Override public TStatus getStatus() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); var status = _getStatus(fateId); observedStatus = status; return status; @@ -392,7 +390,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { @Override public Optional<FateKey> getKey() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return AbstractFateStore.this.getKey(fateId); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index e2d4e7cbe5..b6860c557d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -383,8 +383,8 @@ public class Fate<T> { // reservations held by dead processes, if they exist. deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, store.type() + "-dead-reservation-cleaner-pool"); - ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor - .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, SECONDS); + ScheduledFuture<?> deadReservationCleaner = deadResCleanerExecutor.scheduleWithFixedDelay( + new DeadReservationCleaner(), 3, getDeadResCleanupDelay().toSeconds(), SECONDS); ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); } this.deadResCleanerExecutor = deadResCleanerExecutor; @@ -393,6 +393,10 @@ public class Fate<T> { this.workFinder.start(); } + public Duration getDeadResCleanupDelay() { + return Duration.ofMinutes(3); + } + // get a transaction id back to the requester before doing any work public FateId startTransaction() { return store.create(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java index 6c1663627c..8942149a6f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -28,6 +28,8 @@ import java.util.Optional; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.hadoop.io.DataInputBuffer; public class FateKey { @@ -168,4 +170,13 @@ public class FateKey { throw new IllegalStateException("Unexpected FateInstanceType found " + type); } } + + @Override + public String toString() { + var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE); + buf.append("FateKeyType", type); + keyExtent.ifPresentOrElse(keyExtent -> buf.append("KeyExtent", keyExtent), + () -> buf.append("ExternalCompactionID", compactionId.orElseThrow())); + return buf.toString(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index f1f82758ff..e1cb4d6405 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -420,7 +420,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public Repo<T> top() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return scanTx(scanner -> { scanner.setRange(getRow(fateId)); @@ -436,7 +436,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public List<ReadOnlyRepo<T>> getStack() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return scanTx(scanner -> { scanner.setRange(getRow(fateId)); @@ -451,7 +451,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public Serializable getTransactionInfo(TxInfo txInfo) { - verifyReserved(false); + verifyReservedAndNotDeleted(false); try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { scanner.setRange(getRow(fateId)); @@ -487,7 +487,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public long timeCreated() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return scanTx(scanner -> { scanner.setRange(getRow(fateId)); @@ -499,7 +499,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public void push(Repo<T> repo) throws StackOverflowException { - verifyReserved(true); + verifyReservedAndNotDeleted(true); Optional<Integer> top = findTop(); @@ -514,7 +514,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public void pop() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); Optional<Integer> top = findTop(); top.ifPresent(t -> newMutator(fateId) @@ -523,7 +523,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public void setStatus(TStatus status) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); newMutator(fateId).putStatus(status).mutate(); observedStatus = status; @@ -531,7 +531,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public void setTransactionInfo(TxInfo txInfo, Serializable so) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); final byte[] serialized = serializeTxInfo(so); @@ -540,7 +540,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> { @Override public void delete() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); var mutator = newMutator(fateId); mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java similarity index 96% rename from core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java rename to core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index 08247e7441..d19db17004 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.core.fate; +package org.apache.accumulo.core.fate.zookeeper; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -39,9 +39,15 @@ import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.Fate.TxInfo; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.StackOverflowException; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.hadoop.io.DataInputBuffer; @@ -239,7 +245,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public Repo<T> top() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); for (int i = 0; i < RETRIES; i++) { String txpath = getTXPath(fateId); @@ -291,7 +297,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public void push(Repo<T> repo) throws StackOverflowException { - verifyReserved(true); + verifyReservedAndNotDeleted(true); String txpath = getTXPath(fateId); try { @@ -310,7 +316,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public void pop() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { String txpath = getTXPath(fateId); @@ -326,7 +332,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public void setStatus(TStatus status) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { zk.mutateExisting(getTXPath(fateId), currSerializedData -> { @@ -353,7 +359,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public void delete() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP); @@ -365,7 +371,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { zk.putPersistentData(getTXPath(fateId) + "/" + txInfo, serializeTxInfo(so), @@ -377,14 +383,14 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public Serializable getTransactionInfo(Fate.TxInfo txInfo) { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return MetaFateStore.this.getTransactionInfo(txInfo, fateId); } @Override public long timeCreated() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); try { Stat stat = zk.getZooKeeper().exists(getTXPath(fateId), false); @@ -396,7 +402,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> { @Override public List<ReadOnlyRepo<T>> getStack() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); String txpath = getTXPath(fateId); outer: while (true) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index f2aa438661..5e26567ac4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -71,9 +71,9 @@ import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.lock.ServiceLock; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 0af05eb5b7..ae75437225 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -76,8 +76,8 @@ import org.apache.accumulo.core.fate.FateCleaner; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java index 1087cf1b9b..02aa3a28f4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java @@ -26,8 +26,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.manager.metrics.fate.FateMetrics; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 314212693a..e8955e465a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -78,8 +78,8 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.iterators.DevNull; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java new file mode 100644 index 0000000000..71b198c0ac --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.time.Duration; +import java.util.function.Function; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; + +/** + * A FATE which performs the dead reservation cleanup with a much shorter delay between + */ +public class FastFate<T> extends Fate<T> { + + public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner, + Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) { + super(environment, store, runDeadResCleaner, toLogStrFunc, conf); + } + + @Override + public Duration getDeadResCleanupDelay() { + return Duration.ofSeconds(15); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index bd7c4a2395..d36e98bdec 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -24,8 +24,10 @@ import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRES import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -43,6 +45,7 @@ import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -479,6 +482,35 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu } } + @Test + @Timeout(30) + public void testNoWriteAfterDelete() throws Exception { + executeTest(this::testNoWriteAfterDelete); + } + + protected void testNoWriteAfterDelete(FateStore<TestEnv> store, ServerContext sctx) + throws Exception { + final String tableName = getUniqueNames(1)[0]; + final FateId fateId = store.create(); + final Repo<TestEnv> repo = new TestRepo("testNoWriteAfterDelete"); + + var txStore = store.reserve(fateId); + + // all write ops should be ok after reservation + assertDoesNotThrow(() -> txStore.push(repo)); + assertDoesNotThrow(() -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertDoesNotThrow(txStore::pop); + assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name")); + assertDoesNotThrow(txStore::delete); + + // test that all write ops result in an exception since the tx has been deleted + assertThrows(Exception.class, () -> txStore.push(repo)); + assertThrows(Exception.class, () -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertThrows(Exception.class, txStore::pop); + assertThrows(Exception.class, () -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name")); + assertThrows(Exception.class, txStore::delete); + } + private void submitDeferred(Fate<TestEnv> fate, ServerContext sctx, Set<FateId> transactions) { FateId fateId = fate.startTransaction(); transactions.add(fateId); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index 8b52f88f97..0db83c044a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -59,9 +59,9 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.IteratorUtil; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index 57070bacde..f5e537394d 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -46,10 +46,10 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -111,7 +111,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { final int numFateIds = 500; final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID()); final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set<FateId> allIds = new HashSet<>(); final FateStore<SleepingTestEnv> store1, store2; final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); @@ -182,7 +182,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { // Tests that reserve() doesn't hang indefinitely and instead throws an error // on reserve() a non-existent transaction. final FateStore<SleepingTestEnv> store; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final String tableName = getUniqueNames(1)[0]; final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID()); final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); @@ -208,7 +208,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { throws Exception { final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set<FateId> allIds = new HashSet<>(); final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); final FateStore<SleepingTestEnv> store; @@ -256,7 +256,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { throws Exception { final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set<FateId> allIds = new HashSet<>(); final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>(); final FateStore<SleepingTestEnv> store; @@ -312,7 +312,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { private void testMultipleFateInstances(FateInstanceType storeType) throws Exception { final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set<FateId> allIds = new HashSet<>(); final FateStore<SleepingTestEnv> store1, store2; final SleepingTestEnv testEnv1 = new SleepingTestEnv(50); @@ -380,7 +380,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { // One transaction for each FATE worker thread final int numFateIds = Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue()); - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set<FateId> allIds = new HashSet<>(); final FateStore<LatchTestEnv> store1, store2; final LatchTestEnv testEnv1 = new LatchTestEnv(); @@ -399,8 +399,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase { } liveLocks.add(lock1); - Fate<LatchTestEnv> fate1 = - new Fate<>(testEnv1, store1, true, Object::toString, DefaultConfiguration.getInstance()); + FastFate<LatchTestEnv> fate1 = new FastFate<>(testEnv1, store1, true, Object::toString, + DefaultConfiguration.getInstance()); // Ensure nothing is reserved yet assertTrue(store1.getActiveReservations().isEmpty()); @@ -445,8 +445,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase { // Create the new Fate/start the Fate threads (the work finder and the workers). // Don't run another dead reservation cleaner since we already have one running from fate1. - Fate<LatchTestEnv> fate2 = - new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); + FastFate<LatchTestEnv> fate2 = new FastFate<>(testEnv2, store2, false, Object::toString, + DefaultConfiguration.getInstance()); // Wait for the "dead" reservations to be deleted and picked up again (reserved using // fate2/store2/lock2 now). @@ -458,7 +458,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase { boolean allReservedWithLock2 = store2Reservations.values().stream() .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; - }, 60_000); + }, fate1.getDeadResCleanupDelay().toMillis() * 2); // Finish work and shutdown testEnv1.workersLatch.countDown(); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index a23dde0644..c5f541b5e9 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -32,8 +32,8 @@ import java.util.UUID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java index bfd267630f..d306e0bfef 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@ -24,7 +24,7 @@ import java.util.UUID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; -import org.apache.accumulo.core.fate.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateInterleavingIT; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index 994c7af2eb..c4c1e5b24a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -22,7 +22,7 @@ import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; -import org.apache.accumulo.core.fate.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateOpsCommandsIT; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index beb48a5304..af8b98db0f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -36,8 +36,8 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.ServerContext; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java index 55f89cd605..c82662182c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java @@ -24,9 +24,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.EnumSet; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; @@ -40,6 +37,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; @@ -76,29 +74,6 @@ public class UserFateStoreIT extends SharedMiniClusterBase { SharedMiniClusterBase.stopMiniCluster(); } - private static class TestUserFateStore extends UserFateStore<TestEnv> { - private final Iterator<FateId> fateIdIterator; - - // use the list of fateIds to simulate collisions on fateIds - public TestUserFateStore(ClientContext context, String tableName, List<FateId> fateIds) { - super(context, tableName, createDummyLockID(), null); - this.fateIdIterator = fateIds.iterator(); - } - - @Override - public FateId getFateId() { - if (fateIdIterator.hasNext()) { - return fateIdIterator.next(); - } else { - return FateId.from(fateInstanceType, UUID.randomUUID()); - } - } - - public TStatus getStatus(FateId fateId) { - return _getStatus(fateId); - } - } - // Test that configs related to the correctness of the FATE instance user table // are initialized correctly @Test @@ -151,7 +126,7 @@ public class UserFateStoreIT extends SharedMiniClusterBase { String tableName; ClientContext client; FateId fateId; - TestUserFateStore store; + UserFateStore<TestEnv> store; FateStore.FateTxStore<FateIT.TestEnv> txStore; @BeforeEach @@ -159,9 +134,8 @@ public class UserFateStoreIT extends SharedMiniClusterBase { client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); tableName = getUniqueNames(1)[0]; createFateTable(client, tableName); - fateId = FateId.from(fateInstanceType, UUID.randomUUID()); - store = new TestUserFateStore(client, tableName, List.of(fateId)); - store.create(); + store = new UserFateStore<>(client, tableName, AbstractFateStore.createDummyLockID(), null); + fateId = store.create(); txStore = store.reserve(fateId); } @@ -177,7 +151,10 @@ public class UserFateStoreIT extends SharedMiniClusterBase { beforeOperation.run(); injectStatus(client, tableName, fateId, status); - assertEquals(status, store.getStatus(fateId)); + var fateIdStatus = + store.list().filter(statusEntry -> statusEntry.getFateId().equals(fateId)).findFirst() + .orElseThrow(); + assertEquals(status, fateIdStatus.getStatus()); if (!acceptableStatuses.contains(status)) { assertThrows(IllegalStateException.class, operation, "Expected operation to fail with status " + status + " but it did not"); @@ -210,8 +187,12 @@ public class UserFateStoreIT extends SharedMiniClusterBase { @Test public void delete() throws Exception { - testOperationWithStatuses(() -> {}, // No special setup needed for delete - txStore::delete, + testOperationWithStatuses(() -> { + // Setup for delete: Create a new txStore before each delete since delete cannot be called + // on the same txStore more than once + fateId = store.create(); + txStore = store.reserve(fateId); + }, () -> txStore.delete(), EnumSet.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index c5e6e5eea1..5e5775110f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -51,9 +51,9 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.manager.state.tables.TableState; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 9172a2d7b4..28b08dbbf0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -62,9 +62,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.AdminUtil.FateStatus; import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.StoredTabletFile;