This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 8e78562a1d modifies FATE to use a single thread to find work (#4042)
8e78562a1d is described below

commit 8e78562a1d82419858622e9874ed7e1f84d3c497
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Dec 8 16:12:21 2023 -0500

    modifies FATE to use a single thread to find work (#4042)
    
    This change modifies FATE to use singe thread to find work.  It also
    cleans up some of the signaling between threads in FATE and fixes a
    synchronization bug in FATE that was introduced in #4017.  The bug
    introduced in #4017 is that somethings are syncronizing on the wrong
    object because a new inner class was introduced.
    
    These changes were pulled from #3964 and cleaned up and improved.
---
 .../org/apache/accumulo/core/fate/AgeOffStore.java |  12 +-
 .../java/org/apache/accumulo/core/fate/Fate.java   |  84 ++++++-
 .../org/apache/accumulo/core/fate/FateStore.java   |  17 +-
 .../accumulo/core/fate/ReadOnlyFateStore.java      |   9 +
 .../org/apache/accumulo/core/fate/SignalCount.java |  70 ++++++
 .../org/apache/accumulo/core/fate/ZooStore.java    | 266 +++++++++------------
 .../apache/accumulo/core/logging/FateLogger.java   |  12 +-
 .../org/apache/accumulo/core/fate/TestStore.java   |  12 +-
 8 files changed, 305 insertions(+), 177 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
index c8be589aef..f61c06028c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
@@ -20,10 +20,12 @@ package org.apache.accumulo.core.fate;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -148,11 +150,6 @@ public class AgeOffStore<T> implements FateStore<T> {
     return txid;
   }
 
-  @Override
-  public FateTxStore<T> reserve() {
-    return new AgeOffFateTxStore(store.reserve());
-  }
-
   @Override
   public FateTxStore<T> reserve(long tid) {
     return new AgeOffFateTxStore(store.reserve(tid));
@@ -204,4 +201,9 @@ public class AgeOffStore<T> implements FateStore<T> {
   public List<Long> list() {
     return store.list();
   }
+
+  @Override
+  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
+    return store.runnable(keepWaiting);
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index a7ad8ce243..a54ad734ee 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.core.fate;
 
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
@@ -33,9 +34,11 @@ import static 
org.apache.accumulo.core.util.ShutdownUtil.isIOException;
 import java.util.EnumSet;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TransferQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
@@ -48,6 +51,7 @@ import org.apache.accumulo.core.logging.FateLogger;
 import org.apache.accumulo.core.util.ShutdownUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.thrift.TApplicationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,25 +72,91 @@ public class Fate<T> {
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
 
   private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+  private final TransferQueue<Long> workQueue;
+  private final Thread workFinder;
 
   public enum TxInfo {
     TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
   }
 
+  /**
+   * A single thread that finds transactions to work on and queues them up. Do 
not want each worker
+   * thread going to the store and looking for work as it would place more 
load on the store.
+   */
+  private class WorkFinder implements Runnable {
+
+    @Override
+    public void run() {
+      while (keepRunning.get()) {
+        try {
+          var iter = store.runnable(keepRunning);
+
+          while (iter.hasNext() && keepRunning.get()) {
+            Long txid = iter.next();
+            try {
+              while (keepRunning.get()) {
+                // The reason for calling transfer instead of queueing is 
avoid rescanning the
+                // storage layer and adding the same thing over and over. For 
example if all threads
+                // were busy, the queue size was 100, and there are three 
runnable things in the
+                // store. Do not want to keep scanning the store adding those 
same 3 runnable things
+                // until the queue is full.
+                if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) {
+                  break;
+                }
+              }
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new IllegalStateException(e);
+            }
+          }
+        } catch (Exception e) {
+          if (keepRunning.get()) {
+            log.warn("Failure while attempting to find work for fate", e);
+          } else {
+            log.debug("Failure while attempting to find work for fate", e);
+          }
+
+          workQueue.clear();
+        }
+      }
+    }
+  }
+
   private class TransactionRunner implements Runnable {
 
+    private Optional<FateTxStore<T>> reserveFateTx() throws 
InterruptedException {
+      while (keepRunning.get()) {
+        var unreservedTid = workQueue.poll(100, MILLISECONDS);
+
+        if (unreservedTid == null) {
+          continue;
+        }
+        var optionalopStore = store.tryReserve(unreservedTid);
+        if (optionalopStore.isPresent()) {
+          return optionalopStore;
+        }
+      }
+
+      return Optional.empty();
+    }
+
     @Override
     public void run() {
       while (keepRunning.get()) {
         long deferTime = 0;
         FateTxStore<T> txStore = null;
         try {
-          txStore = store.reserve();
+          var optionalopStore = reserveFateTx();
+          if (optionalopStore.isPresent()) {
+            txStore = optionalopStore.orElseThrow();
+          } else {
+            continue;
+          }
           TStatus status = txStore.getStatus();
           Repo<T> op = txStore.top();
           if (status == FAILED_IN_PROGRESS) {
             processFailed(txStore, op);
-          } else {
+          } else if (status == SUBMITTED || status == IN_PROGRESS) {
             Repo<T> prevOp = null;
             try {
               deferTime = op.isReady(txStore.getID(), environment);
@@ -231,6 +301,7 @@ public class Fate<T> {
     this.environment = environment;
     final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
         Property.MANAGER_FATE_THREADPOOL_SIZE, true);
+    this.workQueue = new LinkedTransferQueue<>();
     this.fatePoolWatcher =
         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
     ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
@@ -257,6 +328,9 @@ public class Fate<T> {
       }
     }, 3, SECONDS));
     this.executor = pool;
+
+    this.workFinder = Threads.createThread("Fate work finder", new 
WorkFinder());
+    this.workFinder.start();
   }
 
   // get a transaction id back to the requester before doing any work
@@ -399,6 +473,12 @@ public class Fate<T> {
     if (executor != null) {
       executor.shutdown();
     }
+    workFinder.interrupt();
+    try {
+      workFinder.join();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 834a2fa6e5..7db5766e81 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -38,6 +38,9 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
    */
   long create();
 
+  /**
+   * An interface that allows read/write access to the data related to a 
single fate operation.
+   */
   interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
     @Override
     Repo<T> top();
@@ -81,8 +84,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
      * upon successful return the store now controls the referenced 
transaction id. caller should no
      * longer interact with it.
      *
-     * @param deferTime time in millis to keep this transaction out of the 
pool used in the
-     *        {@link #reserve() reserve} method. must be non-negative.
+     * @param deferTime time in millis to keep this transaction from being 
returned by
+     *        {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. 
Must be non-negative.
      */
     void unreserve(long deferTime);
   }
@@ -104,14 +107,4 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> 
{
    */
   FateTxStore<T> reserve(long tid);
 
-  /**
-   * Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS.
-   *
-   * Reserving a transaction id ensures that nothing else in-process 
interacting via the same
-   * instance will be operating on that transaction id.
-   *
-   * @return a transaction id that is safe to interact with, chosen by the 
store.
-   */
-  FateTxStore<T> reserve();
-
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
index 4e06ab0f9e..f0140de367 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
@@ -20,7 +20,9 @@ package org.apache.accumulo.core.fate;
 
 import java.io.Serializable;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Read only access to a Transaction Store.
@@ -121,4 +123,11 @@ public interface ReadOnlyFateStore<T> {
    * @return all outstanding transactions, including those reserved by others.
    */
   List<Long> list();
+
+  /**
+   * @return an iterator over fate op ids that are (IN_PROGRESS or 
FAILED_IN_PROGRESS) and
+   *         unreserved. This method will block until it finds something that 
is runnable or until
+   *         the keepWaiting parameter is false.
+   */
+  Iterator<Long> runnable(AtomicBoolean keepWaiting);
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java 
b/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java
new file mode 100644
index 0000000000..4bad48a6af
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongPredicate;
+
+import com.google.common.base.Preconditions;
+
+class SignalCount {
+  private long count = 0;
+
+  synchronized void increment() {
+    count++;
+    this.notifyAll();
+  }
+
+  synchronized void decrement() {
+    Preconditions.checkState(count > 0);
+    count--;
+    this.notifyAll();
+  }
+
+  synchronized long getCount() {
+    return count;
+  }
+
+  synchronized boolean waitFor(LongPredicate predicate, BooleanSupplier 
keepWaiting) {
+    return waitFor(predicate, Long.MAX_VALUE, keepWaiting);
+  }
+
+  synchronized boolean waitFor(LongPredicate predicate, long maxWait, 
BooleanSupplier keepWaiting) {
+    Preconditions.checkArgument(maxWait >= 0);
+
+    if (maxWait == 0) {
+      return predicate.test(count);
+    }
+
+    long start = System.nanoTime();
+
+    while (!predicate.test(count) && keepWaiting.getAsBoolean()
+        && TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) < maxWait) 
{
+      try {
+        wait(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      }
+    }
+
+    return predicate.test(count);
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index 683f17d958..38071ef182 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -35,10 +35,12 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
@@ -51,6 +53,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 //TODO use zoocache? - ACCUMULO-1297
@@ -61,11 +65,14 @@ public class ZooStore<T> implements FateStore<T> {
   private static final Logger log = LoggerFactory.getLogger(ZooStore.class);
   private String path;
   private ZooReaderWriter zk;
-  private String lastReserved = "";
   private Set<Long> reserved;
   private Map<Long,Long> defered;
-  private long statusChangeEvents = 0;
-  private int reservationsWaiting = 0;
+
+  // This is incremented each time a transaction was unreserved that was non 
new
+  private final SignalCount unreservedNonNewCount = new SignalCount();
+
+  // This is incremented each time a transaction is unreserved that was 
runnable
+  private final SignalCount unreservedRunnableCount = new SignalCount();
 
   private byte[] serialize(Object o) {
 
@@ -136,108 +143,20 @@ public class ZooStore<T> implements FateStore<T> {
     }
   }
 
-  @Override
-  public FateTxStore<T> reserve() {
-    try {
-      while (true) {
-
-        long events;
-        synchronized (this) {
-          events = statusChangeEvents;
-        }
-
-        List<String> txdirs = new ArrayList<>(zk.getChildren(path));
-        Collections.sort(txdirs);
-
-        synchronized (this) {
-          if (!txdirs.isEmpty() && txdirs.get(txdirs.size() - 
1).compareTo(lastReserved) <= 0) {
-            lastReserved = "";
-          }
-        }
-
-        for (String txdir : txdirs) {
-          long tid = parseTid(txdir);
-
-          synchronized (this) {
-            // this check makes reserve pick up where it left off, so that it 
cycles through all as
-            // it is repeatedly called.... failing to do so can lead to
-            // starvation where fate ops that sort higher and hold a lock are 
never reserved.
-            if (txdir.compareTo(lastReserved) <= 0) {
-              continue;
-            }
-
-            if (defered.containsKey(tid)) {
-              if (defered.get(tid) < System.currentTimeMillis()) {
-                defered.remove(tid);
-              } else {
-                continue;
-              }
-            }
-            if (reserved.contains(tid)) {
-              continue;
-            } else {
-              reserved.add(tid);
-              lastReserved = txdir;
-            }
-          }
-
-          // have reserved id, status should not change
-
-          try {
-            TStatus status = TStatus.valueOf(new String(zk.getData(path + "/" 
+ txdir), UTF_8));
-            if (status == TStatus.SUBMITTED || status == TStatus.IN_PROGRESS
-                || status == TStatus.FAILED_IN_PROGRESS) {
-              return new FateTxStoreImpl(tid, true);
-            } else {
-              unreserve(tid);
-            }
-          } catch (NoNodeException nne) {
-            // node deleted after we got the list of children, its ok
-            unreserve(tid);
-          } catch (KeeperException | InterruptedException | RuntimeException 
e) {
-            unreserve(tid);
-            throw e;
-          }
-        }
-
-        synchronized (this) {
-          // suppress lgtm alert - synchronized variable is not always true
-          if (events == statusChangeEvents) { // lgtm 
[java/constant-comparison]
-            if (defered.isEmpty()) {
-              this.wait(5000);
-            } else {
-              Long minTime = Collections.min(defered.values());
-              long waitTime = minTime - System.currentTimeMillis();
-              if (waitTime > 0) {
-                this.wait(Math.min(waitTime, 5000));
-              }
-            }
-          }
-        }
-      }
-    } catch (KeeperException | InterruptedException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
   @Override
   public FateTxStore<T> reserve(long tid) {
-    synchronized (this) {
-      reservationsWaiting++;
-      try {
-        while (reserved.contains(tid)) {
-          try {
-            this.wait(1000);
-          } catch (InterruptedException e) {
-            throw new IllegalStateException(e);
-          }
+    synchronized (ZooStore.this) {
+      while (reserved.contains(tid)) {
+        try {
+          ZooStore.this.wait(100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
         }
-
-        reserved.add(tid);
-        return new FateTxStoreImpl(tid, true);
-      } finally {
-        reservationsWaiting--;
       }
+
+      reserved.add(tid);
+      return new FateTxStoreImpl(tid, true);
     }
   }
 
@@ -257,27 +176,13 @@ public class ZooStore<T> implements FateStore<T> {
     }
   }
 
-  private void unreserve(long tid) {
-    synchronized (this) {
-      if (!reserved.remove(tid)) {
-        throw new IllegalStateException(
-            "Tried to unreserve id that was not reserved " + 
FateTxId.formatTid(tid));
-      }
-
-      // do not want this unreserve to unesc wake up threads in reserve()... 
this leads to infinite
-      // loop when tx is stuck in NEW...
-      // only do this when something external has called reserve(tid)...
-      if (reservationsWaiting > 0) {
-        this.notifyAll();
-      }
-    }
-  }
-
   private class FateTxStoreImpl implements FateTxStore<T> {
 
     private final long tid;
     private final boolean isReserved;
 
+    private TStatus observedStatus = null;
+
     private FateTxStoreImpl(long tid, boolean isReserved) {
       this.tid = tid;
       this.isReserved = isReserved;
@@ -290,19 +195,27 @@ public class ZooStore<T> implements FateStore<T> {
         throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
       }
 
-      synchronized (this) {
+      synchronized (ZooStore.this) {
         if (!reserved.remove(tid)) {
           throw new IllegalStateException(
               "Tried to unreserve id that was not reserved " + 
FateTxId.formatTid(tid));
         }
 
+        // notify any threads waiting to reserve
+        ZooStore.this.notifyAll();
+
         if (deferTime > 0) {
           defered.put(tid, System.currentTimeMillis() + deferTime);
         }
+      }
 
-        this.notifyAll();
+      if (observedStatus != null && isRunnable(observedStatus)) {
+        unreservedRunnableCount.increment();
       }
 
+      if (observedStatus != TStatus.NEW) {
+        unreservedNonNewCount.increment();
+      }
     }
 
     private void verifyReserved(boolean isWrite) {
@@ -311,7 +224,7 @@ public class ZooStore<T> implements FateStore<T> {
       }
 
       if (isReserved) {
-        synchronized (this) {
+        synchronized (ZooStore.this) {
           if (!reserved.contains(tid)) {
             throw new IllegalStateException(
                 "Tried to operate on unreserved transaction " + 
FateTxId.formatTid(tid));
@@ -409,45 +322,28 @@ public class ZooStore<T> implements FateStore<T> {
       }
     }
 
-    private TStatus _getStatus(long tid) {
-      try {
-        return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8));
-      } catch (NoNodeException nne) {
-        return TStatus.UNKNOWN;
-      } catch (KeeperException | InterruptedException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-
     @Override
     public TStatus getStatus() {
       verifyReserved(false);
+      var status = _getStatus(tid);
+      observedStatus = status;
       return _getStatus(tid);
     }
 
     @Override
     public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
+      Preconditions.checkState(!isReserved,
+          "Attempted to wait for status change while reserved " + 
FateTxId.formatTid(getID()));
       while (true) {
-        long events;
-        synchronized (this) {
-          events = statusChangeEvents;
-        }
+
+        long countBefore = unreservedNonNewCount.getCount();
 
         TStatus status = _getStatus(tid);
         if (expected.contains(status)) {
           return status;
         }
 
-        synchronized (this) {
-          // suppress lgtm alert - synchronized variable is not always true
-          if (events == statusChangeEvents) { // lgtm 
[java/constant-comparison]
-            try {
-              this.wait(5000);
-            } catch (InterruptedException e) {
-              throw new IllegalStateException(e);
-            }
-          }
-        }
+        unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () 
-> true);
       }
     }
 
@@ -462,10 +358,7 @@ public class ZooStore<T> implements FateStore<T> {
         throw new IllegalStateException(e);
       }
 
-      synchronized (this) {
-        statusChangeEvents++;
-      }
-
+      observedStatus = status;
     }
 
     @Override
@@ -582,6 +475,16 @@ public class ZooStore<T> implements FateStore<T> {
     }
   }
 
+  private TStatus _getStatus(long tid) {
+    try {
+      return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8));
+    } catch (NoNodeException nne) {
+      return TStatus.UNKNOWN;
+    } catch (KeeperException | InterruptedException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
   @Override
   public ReadOnlyFateTxStore<T> read(long tid) {
     return new FateTxStoreImpl(tid, false);
@@ -600,4 +503,71 @@ public class ZooStore<T> implements FateStore<T> {
       throw new IllegalStateException(e);
     }
   }
+
+  private boolean isRunnable(TStatus status) {
+    return status == TStatus.IN_PROGRESS || status == 
TStatus.FAILED_IN_PROGRESS
+        || status == TStatus.SUBMITTED;
+  }
+
+  @Override
+  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
+
+    while (keepWaiting.get()) {
+      ArrayList<Long> runnableTids = new ArrayList<>();
+
+      final long beforeCount = unreservedRunnableCount.getCount();
+
+      try {
+
+        List<String> transactions = zk.getChildren(path);
+        for (String txidStr : transactions) {
+          long txid = parseTid(txidStr);
+          if (isRunnable(_getStatus(txid))) {
+            runnableTids.add(txid);
+          }
+        }
+
+        synchronized (this) {
+          runnableTids.removeIf(txid -> {
+            var deferedTime = defered.get(txid);
+            if (deferedTime != null) {
+              if (deferedTime < System.currentTimeMillis()) {
+                return true;
+              } else {
+                defered.remove(txid);
+              }
+            }
+
+            if (reserved.contains(txid)) {
+              return true;
+            }
+
+            return false;
+          });
+        }
+
+        if (runnableTids.isEmpty()) {
+          if (beforeCount == unreservedRunnableCount.getCount()) {
+            long waitTime = 5000;
+            if (!defered.isEmpty()) {
+              Long minTime = Collections.min(defered.values());
+              waitTime = minTime - System.currentTimeMillis();
+            }
+
+            if (waitTime > 0) {
+              unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime,
+                  keepWaiting::get);
+            }
+          }
+        } else {
+          return runnableTids.iterator();
+        }
+
+      } catch (KeeperException | InterruptedException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    return List.<Long>of().iterator();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index ce8dda313b..d85e417650 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -21,8 +21,10 @@ package org.apache.accumulo.core.logging;
 import static org.apache.accumulo.core.fate.FateTxId.formatTid;
 
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 import org.apache.accumulo.core.fate.Fate;
@@ -97,11 +99,6 @@ public class FateLogger {
     // only logging operations that change the persisted data, not operations 
that only read data
     return new FateStore<>() {
 
-      @Override
-      public FateTxStore<T> reserve() {
-        return new LoggingFateTxStore<>(store.reserve(), toLogString);
-      }
-
       @Override
       public FateTxStore<T> reserve(long tid) {
         return new LoggingFateTxStore<>(store.reserve(tid), toLogString);
@@ -122,6 +119,11 @@ public class FateLogger {
         return store.list();
       }
 
+      @Override
+      public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
+        return store.runnable(keepWaiting);
+      }
+
       @Override
       public long create() {
         long tid = store.create();
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java 
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 5bfd60d2bd..058b0c50a4 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -23,10 +23,12 @@ import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Transient in memory store for transactions.
@@ -53,11 +55,6 @@ public class TestStore implements FateStore<String> {
     return new TestFateTxStore(tid);
   }
 
-  @Override
-  public FateTxStore<String> reserve() {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public Optional<FateTxStore<String>> tryReserve(long tid) {
     synchronized (this) {
@@ -172,4 +169,9 @@ public class TestStore implements FateStore<String> {
     return new ArrayList<>(statuses.keySet());
   }
 
+  @Override
+  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
+    throw new UnsupportedOperationException();
+  }
+
 }

Reply via email to