This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 8e78562a1d modifies FATE to use a single thread to find work (#4042) 8e78562a1d is described below commit 8e78562a1d82419858622e9874ed7e1f84d3c497 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Dec 8 16:12:21 2023 -0500 modifies FATE to use a single thread to find work (#4042) This change modifies FATE to use singe thread to find work. It also cleans up some of the signaling between threads in FATE and fixes a synchronization bug in FATE that was introduced in #4017. The bug introduced in #4017 is that somethings are syncronizing on the wrong object because a new inner class was introduced. These changes were pulled from #3964 and cleaned up and improved. --- .../org/apache/accumulo/core/fate/AgeOffStore.java | 12 +- .../java/org/apache/accumulo/core/fate/Fate.java | 84 ++++++- .../org/apache/accumulo/core/fate/FateStore.java | 17 +- .../accumulo/core/fate/ReadOnlyFateStore.java | 9 + .../org/apache/accumulo/core/fate/SignalCount.java | 70 ++++++ .../org/apache/accumulo/core/fate/ZooStore.java | 266 +++++++++------------ .../apache/accumulo/core/logging/FateLogger.java | 12 +- .../org/apache/accumulo/core/fate/TestStore.java | 12 +- 8 files changed, 305 insertions(+), 177 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index c8be589aef..f61c06028c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -20,10 +20,12 @@ package org.apache.accumulo.core.fate; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,11 +150,6 @@ public class AgeOffStore<T> implements FateStore<T> { return txid; } - @Override - public FateTxStore<T> reserve() { - return new AgeOffFateTxStore(store.reserve()); - } - @Override public FateTxStore<T> reserve(long tid) { return new AgeOffFateTxStore(store.reserve(tid)); @@ -204,4 +201,9 @@ public class AgeOffStore<T> implements FateStore<T> { public List<Long> list() { return store.list(); } + + @Override + public Iterator<Long> runnable(AtomicBoolean keepWaiting) { + return store.runnable(keepWaiting); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index a7ad8ce243..a54ad734ee 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.fate; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; @@ -33,9 +34,11 @@ import static org.apache.accumulo.core.util.ShutdownUtil.isIOException; import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -48,6 +51,7 @@ import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.util.ShutdownUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,25 +72,91 @@ public class Fate<T> { private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); private final AtomicBoolean keepRunning = new AtomicBoolean(true); + private final TransferQueue<Long> workQueue; + private final Thread workFinder; public enum TxInfo { TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE } + /** + * A single thread that finds transactions to work on and queues them up. Do not want each worker + * thread going to the store and looking for work as it would place more load on the store. + */ + private class WorkFinder implements Runnable { + + @Override + public void run() { + while (keepRunning.get()) { + try { + var iter = store.runnable(keepRunning); + + while (iter.hasNext() && keepRunning.get()) { + Long txid = iter.next(); + try { + while (keepRunning.get()) { + // The reason for calling transfer instead of queueing is avoid rescanning the + // storage layer and adding the same thing over and over. For example if all threads + // were busy, the queue size was 100, and there are three runnable things in the + // store. Do not want to keep scanning the store adding those same 3 runnable things + // until the queue is full. + if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) { + break; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + } catch (Exception e) { + if (keepRunning.get()) { + log.warn("Failure while attempting to find work for fate", e); + } else { + log.debug("Failure while attempting to find work for fate", e); + } + + workQueue.clear(); + } + } + } + } + private class TransactionRunner implements Runnable { + private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException { + while (keepRunning.get()) { + var unreservedTid = workQueue.poll(100, MILLISECONDS); + + if (unreservedTid == null) { + continue; + } + var optionalopStore = store.tryReserve(unreservedTid); + if (optionalopStore.isPresent()) { + return optionalopStore; + } + } + + return Optional.empty(); + } + @Override public void run() { while (keepRunning.get()) { long deferTime = 0; FateTxStore<T> txStore = null; try { - txStore = store.reserve(); + var optionalopStore = reserveFateTx(); + if (optionalopStore.isPresent()) { + txStore = optionalopStore.orElseThrow(); + } else { + continue; + } TStatus status = txStore.getStatus(); Repo<T> op = txStore.top(); if (status == FAILED_IN_PROGRESS) { processFailed(txStore, op); - } else { + } else if (status == SUBMITTED || status == IN_PROGRESS) { Repo<T> prevOp = null; try { deferTime = op.isReady(txStore.getID(), environment); @@ -231,6 +301,7 @@ public class Fate<T> { this.environment = environment; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); + this.workQueue = new LinkedTransferQueue<>(); this.fatePoolWatcher = ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> { @@ -257,6 +328,9 @@ public class Fate<T> { } }, 3, SECONDS)); this.executor = pool; + + this.workFinder = Threads.createThread("Fate work finder", new WorkFinder()); + this.workFinder.start(); } // get a transaction id back to the requester before doing any work @@ -399,6 +473,12 @@ public class Fate<T> { if (executor != null) { executor.shutdown(); } + workFinder.interrupt(); + try { + workFinder.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 834a2fa6e5..7db5766e81 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -38,6 +38,9 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { */ long create(); + /** + * An interface that allows read/write access to the data related to a single fate operation. + */ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> { @Override Repo<T> top(); @@ -81,8 +84,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { * upon successful return the store now controls the referenced transaction id. caller should no * longer interact with it. * - * @param deferTime time in millis to keep this transaction out of the pool used in the - * {@link #reserve() reserve} method. must be non-negative. + * @param deferTime time in millis to keep this transaction from being returned by + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative. */ void unreserve(long deferTime); } @@ -104,14 +107,4 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> { */ FateTxStore<T> reserve(long tid); - /** - * Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS. - * - * Reserving a transaction id ensures that nothing else in-process interacting via the same - * instance will be operating on that transaction id. - * - * @return a transaction id that is safe to interact with, chosen by the store. - */ - FateTxStore<T> reserve(); - } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index 4e06ab0f9e..f0140de367 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -20,7 +20,9 @@ package org.apache.accumulo.core.fate; import java.io.Serializable; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * Read only access to a Transaction Store. @@ -121,4 +123,11 @@ public interface ReadOnlyFateStore<T> { * @return all outstanding transactions, including those reserved by others. */ List<Long> list(); + + /** + * @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and + * unreserved. This method will block until it finds something that is runnable or until + * the keepWaiting parameter is false. + */ + Iterator<Long> runnable(AtomicBoolean keepWaiting); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java b/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java new file mode 100644 index 0000000000..4bad48a6af --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; +import java.util.function.LongPredicate; + +import com.google.common.base.Preconditions; + +class SignalCount { + private long count = 0; + + synchronized void increment() { + count++; + this.notifyAll(); + } + + synchronized void decrement() { + Preconditions.checkState(count > 0); + count--; + this.notifyAll(); + } + + synchronized long getCount() { + return count; + } + + synchronized boolean waitFor(LongPredicate predicate, BooleanSupplier keepWaiting) { + return waitFor(predicate, Long.MAX_VALUE, keepWaiting); + } + + synchronized boolean waitFor(LongPredicate predicate, long maxWait, BooleanSupplier keepWaiting) { + Preconditions.checkArgument(maxWait >= 0); + + if (maxWait == 0) { + return predicate.test(count); + } + + long start = System.nanoTime(); + + while (!predicate.test(count) && keepWaiting.getAsBoolean() + && TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) < maxWait) { + try { + wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + return predicate.test(count); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 683f17d958..38071ef182 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -35,10 +35,12 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -51,6 +53,8 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; //TODO use zoocache? - ACCUMULO-1297 @@ -61,11 +65,14 @@ public class ZooStore<T> implements FateStore<T> { private static final Logger log = LoggerFactory.getLogger(ZooStore.class); private String path; private ZooReaderWriter zk; - private String lastReserved = ""; private Set<Long> reserved; private Map<Long,Long> defered; - private long statusChangeEvents = 0; - private int reservationsWaiting = 0; + + // This is incremented each time a transaction was unreserved that was non new + private final SignalCount unreservedNonNewCount = new SignalCount(); + + // This is incremented each time a transaction is unreserved that was runnable + private final SignalCount unreservedRunnableCount = new SignalCount(); private byte[] serialize(Object o) { @@ -136,108 +143,20 @@ public class ZooStore<T> implements FateStore<T> { } } - @Override - public FateTxStore<T> reserve() { - try { - while (true) { - - long events; - synchronized (this) { - events = statusChangeEvents; - } - - List<String> txdirs = new ArrayList<>(zk.getChildren(path)); - Collections.sort(txdirs); - - synchronized (this) { - if (!txdirs.isEmpty() && txdirs.get(txdirs.size() - 1).compareTo(lastReserved) <= 0) { - lastReserved = ""; - } - } - - for (String txdir : txdirs) { - long tid = parseTid(txdir); - - synchronized (this) { - // this check makes reserve pick up where it left off, so that it cycles through all as - // it is repeatedly called.... failing to do so can lead to - // starvation where fate ops that sort higher and hold a lock are never reserved. - if (txdir.compareTo(lastReserved) <= 0) { - continue; - } - - if (defered.containsKey(tid)) { - if (defered.get(tid) < System.currentTimeMillis()) { - defered.remove(tid); - } else { - continue; - } - } - if (reserved.contains(tid)) { - continue; - } else { - reserved.add(tid); - lastReserved = txdir; - } - } - - // have reserved id, status should not change - - try { - TStatus status = TStatus.valueOf(new String(zk.getData(path + "/" + txdir), UTF_8)); - if (status == TStatus.SUBMITTED || status == TStatus.IN_PROGRESS - || status == TStatus.FAILED_IN_PROGRESS) { - return new FateTxStoreImpl(tid, true); - } else { - unreserve(tid); - } - } catch (NoNodeException nne) { - // node deleted after we got the list of children, its ok - unreserve(tid); - } catch (KeeperException | InterruptedException | RuntimeException e) { - unreserve(tid); - throw e; - } - } - - synchronized (this) { - // suppress lgtm alert - synchronized variable is not always true - if (events == statusChangeEvents) { // lgtm [java/constant-comparison] - if (defered.isEmpty()) { - this.wait(5000); - } else { - Long minTime = Collections.min(defered.values()); - long waitTime = minTime - System.currentTimeMillis(); - if (waitTime > 0) { - this.wait(Math.min(waitTime, 5000)); - } - } - } - } - } - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - @Override public FateTxStore<T> reserve(long tid) { - synchronized (this) { - reservationsWaiting++; - try { - while (reserved.contains(tid)) { - try { - this.wait(1000); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } + synchronized (ZooStore.this) { + while (reserved.contains(tid)) { + try { + ZooStore.this.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); } - - reserved.add(tid); - return new FateTxStoreImpl(tid, true); - } finally { - reservationsWaiting--; } + + reserved.add(tid); + return new FateTxStoreImpl(tid, true); } } @@ -257,27 +176,13 @@ public class ZooStore<T> implements FateStore<T> { } } - private void unreserve(long tid) { - synchronized (this) { - if (!reserved.remove(tid)) { - throw new IllegalStateException( - "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); - } - - // do not want this unreserve to unesc wake up threads in reserve()... this leads to infinite - // loop when tx is stuck in NEW... - // only do this when something external has called reserve(tid)... - if (reservationsWaiting > 0) { - this.notifyAll(); - } - } - } - private class FateTxStoreImpl implements FateTxStore<T> { private final long tid; private final boolean isReserved; + private TStatus observedStatus = null; + private FateTxStoreImpl(long tid, boolean isReserved) { this.tid = tid; this.isReserved = isReserved; @@ -290,19 +195,27 @@ public class ZooStore<T> implements FateStore<T> { throw new IllegalArgumentException("deferTime < 0 : " + deferTime); } - synchronized (this) { + synchronized (ZooStore.this) { if (!reserved.remove(tid)) { throw new IllegalStateException( "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); } + // notify any threads waiting to reserve + ZooStore.this.notifyAll(); + if (deferTime > 0) { defered.put(tid, System.currentTimeMillis() + deferTime); } + } - this.notifyAll(); + if (observedStatus != null && isRunnable(observedStatus)) { + unreservedRunnableCount.increment(); } + if (observedStatus != TStatus.NEW) { + unreservedNonNewCount.increment(); + } } private void verifyReserved(boolean isWrite) { @@ -311,7 +224,7 @@ public class ZooStore<T> implements FateStore<T> { } if (isReserved) { - synchronized (this) { + synchronized (ZooStore.this) { if (!reserved.contains(tid)) { throw new IllegalStateException( "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); @@ -409,45 +322,28 @@ public class ZooStore<T> implements FateStore<T> { } } - private TStatus _getStatus(long tid) { - try { - return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); - } catch (NoNodeException nne) { - return TStatus.UNKNOWN; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - @Override public TStatus getStatus() { verifyReserved(false); + var status = _getStatus(tid); + observedStatus = status; return _getStatus(tid); } @Override public TStatus waitForStatusChange(EnumSet<TStatus> expected) { + Preconditions.checkState(!isReserved, + "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); while (true) { - long events; - synchronized (this) { - events = statusChangeEvents; - } + + long countBefore = unreservedNonNewCount.getCount(); TStatus status = _getStatus(tid); if (expected.contains(status)) { return status; } - synchronized (this) { - // suppress lgtm alert - synchronized variable is not always true - if (events == statusChangeEvents) { // lgtm [java/constant-comparison] - try { - this.wait(5000); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - } + unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); } } @@ -462,10 +358,7 @@ public class ZooStore<T> implements FateStore<T> { throw new IllegalStateException(e); } - synchronized (this) { - statusChangeEvents++; - } - + observedStatus = status; } @Override @@ -582,6 +475,16 @@ public class ZooStore<T> implements FateStore<T> { } } + private TStatus _getStatus(long tid) { + try { + return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); + } catch (NoNodeException nne) { + return TStatus.UNKNOWN; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + @Override public ReadOnlyFateTxStore<T> read(long tid) { return new FateTxStoreImpl(tid, false); @@ -600,4 +503,71 @@ public class ZooStore<T> implements FateStore<T> { throw new IllegalStateException(e); } } + + private boolean isRunnable(TStatus status) { + return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS + || status == TStatus.SUBMITTED; + } + + @Override + public Iterator<Long> runnable(AtomicBoolean keepWaiting) { + + while (keepWaiting.get()) { + ArrayList<Long> runnableTids = new ArrayList<>(); + + final long beforeCount = unreservedRunnableCount.getCount(); + + try { + + List<String> transactions = zk.getChildren(path); + for (String txidStr : transactions) { + long txid = parseTid(txidStr); + if (isRunnable(_getStatus(txid))) { + runnableTids.add(txid); + } + } + + synchronized (this) { + runnableTids.removeIf(txid -> { + var deferedTime = defered.get(txid); + if (deferedTime != null) { + if (deferedTime < System.currentTimeMillis()) { + return true; + } else { + defered.remove(txid); + } + } + + if (reserved.contains(txid)) { + return true; + } + + return false; + }); + } + + if (runnableTids.isEmpty()) { + if (beforeCount == unreservedRunnableCount.getCount()) { + long waitTime = 5000; + if (!defered.isEmpty()) { + Long minTime = Collections.min(defered.values()); + waitTime = minTime - System.currentTimeMillis(); + } + + if (waitTime > 0) { + unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, + keepWaiting::get); + } + } + } else { + return runnableTids.iterator(); + } + + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + + return List.<Long>of().iterator(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index ce8dda313b..d85e417650 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -21,8 +21,10 @@ package org.apache.accumulo.core.logging; import static org.apache.accumulo.core.fate.FateTxId.formatTid; import java.io.Serializable; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.accumulo.core.fate.Fate; @@ -97,11 +99,6 @@ public class FateLogger { // only logging operations that change the persisted data, not operations that only read data return new FateStore<>() { - @Override - public FateTxStore<T> reserve() { - return new LoggingFateTxStore<>(store.reserve(), toLogString); - } - @Override public FateTxStore<T> reserve(long tid) { return new LoggingFateTxStore<>(store.reserve(tid), toLogString); @@ -122,6 +119,11 @@ public class FateLogger { return store.list(); } + @Override + public Iterator<Long> runnable(AtomicBoolean keepWaiting) { + return store.runnable(keepWaiting); + } + @Override public long create() { long tid = store.create(); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 5bfd60d2bd..058b0c50a4 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -23,10 +23,12 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * Transient in memory store for transactions. @@ -53,11 +55,6 @@ public class TestStore implements FateStore<String> { return new TestFateTxStore(tid); } - @Override - public FateTxStore<String> reserve() { - throw new UnsupportedOperationException(); - } - @Override public Optional<FateTxStore<String>> tryReserve(long tid) { synchronized (this) { @@ -172,4 +169,9 @@ public class TestStore implements FateStore<String> { return new ArrayList<>(statuses.keySet()); } + @Override + public Iterator<Long> runnable(AtomicBoolean keepWaiting) { + throw new UnsupportedOperationException(); + } + }