This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new ee81a3c30d3 Adds some missing FATE functionality (#5263) ee81a3c30d3 is described below commit ee81a3c30d3a34604c33c042abb631db5952388b Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Tue Jan 21 10:09:00 2025 -0500 Adds some missing FATE functionality (#5263) - Previously, if a user configured fewer FATE threads to work on transactions (via MANAGER_FATE_THREADPOOL_SIZE property), the pool size would not actually be decreased. These changes safely stop excess workers in the case where the property value is decreased. - Added test FatePoolResizeIT (tests both META and USER transactions: MetaFatePoolResizeIT and UserFatePoolResizeIT) to ensure the pool size is correctly increased and decreased with configuration changes. - Fate.shutdown() was not waiting on termination of the fate pool watcher when needed. Added the missing wait. --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../java/org/apache/accumulo/core/fate/Fate.java | 150 ++++++++++++------ .../org/apache/accumulo/test/fate/FastFate.java | 14 +- .../accumulo/test/fate/FatePoolResizeIT.java | 167 +++++++++++++++++++++ .../test/fate/meta/MetaFatePoolResizeIT.java | 66 ++++++++ .../test/fate/user/UserFatePoolResizeIT.java | 58 +++++++ 5 files changed, 402 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index ac579176293..566dc5e9eed 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -32,8 +32,11 @@ import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; import static org.apache.accumulo.core.util.ShutdownUtil.isIOException; import java.time.Duration; +import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedTransferQueue; @@ -64,6 +67,9 @@ import org.apache.thrift.TApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * Fault tolerant executor */ @@ -76,6 +82,7 @@ public class Fate<T> { private final T environment; private final ScheduledThreadPoolExecutor fatePoolWatcher; private final ExecutorService transactionExecutor; + private final Set<TransactionRunner> runningTxRunners; private final ExecutorService deadResCleanerExecutor; private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); @@ -179,9 +186,12 @@ public class Fate<T> { } private class TransactionRunner implements Runnable { + // used to signal a TransactionRunner to stop in the case where there are too many running + // i.e., the property for the pool size decreased and we have excess TransactionRunners + private final AtomicBoolean stop = new AtomicBoolean(false); private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException { - while (keepRunning.get()) { + while (keepRunning.get() && !stop.get()) { FateId unreservedFateId = workQueue.poll(100, MILLISECONDS); if (unreservedFateId == null) { @@ -198,55 +208,61 @@ public class Fate<T> { @Override public void run() { - while (keepRunning.get()) { - FateTxStore<T> txStore = null; - ExecutionState state = new ExecutionState(); - try { - var optionalopStore = reserveFateTx(); - if (optionalopStore.isPresent()) { - txStore = optionalopStore.orElseThrow(); - } else { - continue; - } - state.status = txStore.getStatus(); - state.op = txStore.top(); - if (state.status == FAILED_IN_PROGRESS) { - processFailed(txStore, state.op); - } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) { - try { - execute(txStore, state); - if (state.op != null && state.deferTime != 0) { - // The current op is not ready to execute - continue; - } - } catch (StackOverflowException e) { - // the op that failed to push onto the stack was never executed, so no need to undo - // it just transition to failed and undo the ops that executed - transitionToFailed(txStore, e); - continue; - } catch (Exception e) { - blockIfHadoopShutdown(txStore.getID(), e); - transitionToFailed(txStore, e); + runningTxRunners.add(this); + try { + while (keepRunning.get() && !stop.get()) { + FateTxStore<T> txStore = null; + ExecutionState state = new ExecutionState(); + try { + var optionalopStore = reserveFateTx(); + if (optionalopStore.isPresent()) { + txStore = optionalopStore.orElseThrow(); + } else { continue; } + state.status = txStore.getStatus(); + state.op = txStore.top(); + if (state.status == FAILED_IN_PROGRESS) { + processFailed(txStore, state.op); + } else if (state.status == SUBMITTED || state.status == IN_PROGRESS) { + try { + execute(txStore, state); + if (state.op != null && state.deferTime != 0) { + // The current op is not ready to execute + continue; + } + } catch (StackOverflowException e) { + // the op that failed to push onto the stack was never executed, so no need to undo + // it just transition to failed and undo the ops that executed + transitionToFailed(txStore, e); + continue; + } catch (Exception e) { + blockIfHadoopShutdown(txStore.getID(), e); + transitionToFailed(txStore, e); + continue; + } - if (state.op == null) { - // transaction is finished - String ret = state.prevOp.getReturn(); - if (ret != null) { - txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret); + if (state.op == null) { + // transaction is finished + String ret = state.prevOp.getReturn(); + if (ret != null) { + txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret); + } + txStore.setStatus(SUCCESSFUL); + doCleanUp(txStore); } - txStore.setStatus(SUCCESSFUL); - doCleanUp(txStore); } - } - } catch (Exception e) { - runnerLog.error("Uncaught exception in FATE runner thread.", e); - } finally { - if (txStore != null) { - txStore.unreserve(Duration.ofMillis(state.deferTime)); + } catch (Exception e) { + runnerLog.error("Uncaught exception in FATE runner thread.", e); + } finally { + if (txStore != null) { + txStore.unreserve(Duration.ofMillis(state.deferTime)); + } } } + } finally { + log.trace("A TransactionRunner is exiting..."); + Preconditions.checkState(runningTxRunners.remove(this)); } } @@ -357,6 +373,14 @@ public class Fate<T> { } } + private boolean flagStop() { + return stop.compareAndSet(false, true); + } + + private boolean isFlaggedToStop() { + return stop.get(); + } + } protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception { @@ -400,15 +424,16 @@ public class Fate<T> { final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); this.workQueue = new LinkedTransferQueue<>(); + this.runningTxRunners = Collections.synchronizedSet(new HashSet<>()); this.fatePoolWatcher = ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(() -> { // resize the pool if the property changed ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE); - // If the pool grew, then ensure that there is a TransactionRunner for each thread final int configured = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE); - final int needed = configured - pool.getActiveCount(); + final int needed = configured - runningTxRunners.size(); if (needed > 0) { + // If the pool grew, then ensure that there is a TransactionRunner for each thread for (int i = 0; i < needed; i++) { try { pool.execute(new TransactionRunner()); @@ -425,11 +450,26 @@ public class Fate<T> { } } idleCountHistory.clear(); + } else if (needed < 0) { + // If we need the pool to shrink, then ensure excess TransactionRunners are safely stopped. + // Flag the necessary number of TransactionRunners to safely stop when they are done work + // on a transaction. + int numFlagged = + (int) runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count(); + int numToStop = -1 * (numFlagged + needed); + for (var runner : runningTxRunners) { + if (numToStop <= 0) { + break; + } + if (runner.flagStop()) { + log.trace("Flagging a TransactionRunner to stop..."); + numToStop--; + } + } } else { // The property did not change, but should it based on idle Fate threads? Maintain // count of the last X minutes of idle Fate threads. If zero 95% of the time, then suggest - // that the - // MANAGER_FATE_THREADPOOL_SIZE be increased. + // that the MANAGER_FATE_THREADPOOL_SIZE be increased. final long interval = Math.min(60, TimeUnit.MILLISECONDS .toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL))); if (interval == 0) { @@ -488,6 +528,11 @@ public class Fate<T> { return POOL_WATCHER_DELAY; } + @VisibleForTesting + public int getTxRunnersActive() { + return runningTxRunners.size(); + } + // get a transaction id back to the requester before doing any work public FateId startTransaction() { return store.create(); @@ -622,10 +667,15 @@ public class Fate<T> { if (timeout > 0) { long start = System.nanoTime(); - while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) - && (workFinder.isAlive() || !transactionExecutor.isTerminated() - || (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()))) { + while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) && (workFinder.isAlive() + || !transactionExecutor.isTerminated() || !fatePoolWatcher.isTerminated() + || (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()))) { try { + if (!fatePoolWatcher.awaitTermination(1, SECONDS)) { + log.debug("Fate {} is waiting for pool watcher to terminate", store.type()); + continue; + } + if (!transactionExecutor.awaitTermination(1, SECONDS)) { log.debug("Fate {} is waiting for worker threads to terminate", store.type()); continue; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java index f15fb6caaa1..e33906f29c1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -27,10 +27,13 @@ import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.Repo; /** - * A FATE which performs the dead reservation cleanup with a much shorter delay between. Useful for - * shortening test times for tests that are waiting for a cleanup to occur. + * A FATE which performs the dead reservation cleanup and the check on the pool size with a much + * shorter delay between. Useful for shortening test times for tests that are waiting for one of + * these actions to occur. */ public class FastFate<T> extends Fate<T> { + private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofSeconds(5); + private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(5); public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner, Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) { @@ -39,6 +42,11 @@ public class FastFate<T> extends Fate<T> { @Override public Duration getDeadResCleanupDelay() { - return Duration.ofSeconds(5); + return DEAD_RES_CLEANUP_DELAY; + } + + @Override + public Duration getPoolWatcherDelay() { + return POOL_WATCHER_DELAY; } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java new file mode 100644 index 00000000000..7b429fdde02 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FatePoolResizeIT.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.util.Wait; +import org.junit.jupiter.api.Test; + +public abstract class FatePoolResizeIT extends SharedMiniClusterBase + implements FateTestRunner<FatePoolResizeIT.PoolResizeTestEnv> { + + @Test + public void testIncreaseSize() throws Exception { + executeTest(this::testIncreaseSize); + } + + protected void testIncreaseSize(FateStore<PoolResizeTestEnv> store, ServerContext sctx) + throws Exception { + final int numThreads = 10; + final int newNumThreads = 20; + final ConfigurationCopy config = initConfig(numThreads); + final var env = new PoolResizeTestEnv(); + final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r -> r + "", config); + try { + // create a tx for all future threads. For now, only some of the txns will be workable + for (int i = 0; i < newNumThreads; i++) { + var fateId = fate.startTransaction(); + fate.seedTransaction(TEST_FATE_OP, fateId, new PoolResizeTestRepo(), true, "testing"); + } + // wait for all available threads to be working on a tx + Wait.waitFor(() -> env.numWorkers.get() == numThreads); + assertEquals(numThreads, fate.getTxRunnersActive()); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, String.valueOf(newNumThreads)); + // wait for new config to be detected, new runners to be created, and for these runners to + // pick up the rest of the available txns + Wait.waitFor(() -> env.numWorkers.get() == newNumThreads); + assertEquals(newNumThreads, fate.getTxRunnersActive()); + // finish work + env.isReadyLatch.countDown(); + Wait.waitFor(() -> env.numWorkers.get() == 0); + // workers should still be running since we haven't shutdown, just not working on anything + assertEquals(newNumThreads, fate.getTxRunnersActive()); + } finally { + fate.shutdown(1, TimeUnit.MINUTES); + assertEquals(0, fate.getTxRunnersActive()); + } + } + + @Test + public void testDecreaseSize() throws Exception { + executeTest(this::testDecreaseSize); + } + + protected void testDecreaseSize(FateStore<PoolResizeTestEnv> store, ServerContext sctx) + throws Exception { + final int numThreads = 20; + final int newNumThreads = 10; + final ConfigurationCopy config = initConfig(numThreads); + final var env = new PoolResizeTestEnv(); + final Fate<PoolResizeTestEnv> fate = new FastFate<>(env, store, false, r -> r + "", config); + try { + // create a tx for each thread + for (int i = 0; i < numThreads; i++) { + var fateId = fate.startTransaction(); + fate.seedTransaction(TEST_FATE_OP, fateId, new PoolResizeTestRepo(), true, "testing"); + } + // wait for all threads to be working on a tx + Wait.waitFor(() -> env.numWorkers.get() == numThreads); + assertEquals(numThreads, fate.getTxRunnersActive()); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, String.valueOf(newNumThreads)); + // ensure another execution of the pool watcher task occurs after we change the size + Thread.sleep(fate.getPoolWatcherDelay().toMillis() * 2); + // at this point, FATE should detect that there are more transaction runners running than + // configured. None can be safely stopped yet, (still in progress - haven't passed isReady). + // We ensure none have been unexpectedly stopped, then we allow isReady to pass and the txns + // to complete + assertEquals(numThreads, env.numWorkers.get()); + env.isReadyLatch.countDown(); + // wait for the pool size to be decreased to the expected value + Wait.waitFor(() -> fate.getTxRunnersActive() == newNumThreads); + // wait for all threads to have completed their tx + Wait.waitFor(() -> env.numWorkers.get() == 0); + // wait a bit longer to ensure no more workers than expected were stopped + Thread.sleep(5_000); + assertEquals(newNumThreads, fate.getTxRunnersActive()); + } finally { + fate.shutdown(1, TimeUnit.MINUTES); + assertEquals(0, fate.getTxRunnersActive()); + } + } + + private ConfigurationCopy initConfig(int numThreads) { + ConfigurationCopy config = new ConfigurationCopy(); + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, String.valueOf(numThreads)); + config.set(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL, "60m"); + return config; + } + + public static class PoolResizeTestRepo implements Repo<PoolResizeTestEnv> { + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId fateId, PoolResizeTestEnv environment) throws Exception { + environment.numWorkers.incrementAndGet(); + environment.isReadyLatch.await(); + return 0; + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public Repo<PoolResizeTestEnv> call(FateId fateId, PoolResizeTestEnv environment) + throws Exception { + environment.numWorkers.decrementAndGet(); + return null; + } + + @Override + public void undo(FateId fateId, PoolResizeTestEnv environment) throws Exception { + + } + + @Override + public String getReturn() { + return null; + } + } + + public static class PoolResizeTestEnv extends TestEnv { + private final AtomicInteger numWorkers = new AtomicInteger(0); + private final CountDownLatch isReadyLatch = new CountDownLatch(1); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java new file mode 100644 index 00000000000..5ae5242a7c7 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolResizeIT.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.meta; + +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import java.io.File; + +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FatePoolResizeIT; +import org.apache.accumulo.test.fate.FateStoreUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; + +public class MetaFatePoolResizeIT extends FatePoolResizeIT { + @TempDir + private static File tempDir; + + @BeforeAll + public static void setup() throws Exception { + FateStoreUtil.MetaFateZKSetup.setup(tempDir); + } + + @AfterAll + public static void teardown() throws Exception { + FateStoreUtil.MetaFateZKSetup.teardown(); + } + + @Override + public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int maxDeferred, + AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { + String zkRoot = FateStoreUtil.MetaFateZKSetup.getZkRoot(); + var zk = FateStoreUtil.MetaFateZKSetup.getZk(); + String fatePath = FateStoreUtil.MetaFateZKSetup.getZkFatePath(); + ServerContext sctx = createMock(ServerContext.class); + expect(sctx.getZooKeeperRoot()).andReturn(zkRoot).anyTimes(); + expect(sctx.getZooSession()).andReturn(zk).anyTimes(); + replay(sctx); + + testMethod.execute( + new MetaFateStore<>(fatePath, zk, createDummyLockID(), null, maxDeferred, fateIdGenerator), + sctx); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java new file mode 100644 index 00000000000..77ae75cd63b --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolResizeIT.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.user; + +import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.fate.FatePoolResizeIT; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class UserFatePoolResizeIT extends FatePoolResizeIT { + + private String table; + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Override + public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int maxDeferred, + AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { + table = getUniqueNames(1)[0]; + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + createFateTable(client, table); + testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, + fateIdGenerator), getCluster().getServerContext()); + } + } +}