This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 10f1eed  Remove two TODOs in ExternalCompactionExecutor
10f1eed is described below

commit 10f1eedafbd1569a8c232cd8d461d4d2a51d18d0
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon May 10 16:27:56 2021 +0000

    Remove two TODOs in ExternalCompactionExecutor
---
 .../compactions/ExternalCompactionExecutor.java    | 79 +++++++++++++---------
 1 file changed, 48 insertions(+), 31 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 894de4a..336978b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -49,17 +49,19 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
   private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new 
HashSet<>());
 
   private class ExternalJob extends SubmittedJob {
-    private AtomicReference<Status> status = new 
AtomicReference<>(Status.QUEUED);
-    private Compactable compactable;
-    private CompactionServiceId csid;
+    private final AtomicReference<Status> status = new 
AtomicReference<>(Status.QUEUED);
+    private final Compactable compactable;
+    private final CompactionServiceId csid;
     private volatile ExternalCompactionId ecid;
-    private AtomicLong cancelCount = new AtomicLong();
+    private final AtomicLong cancelCount = new AtomicLong();
+    private final long timeCreated;
 
-    public ExternalJob(CompactionJob job, Compactable compactable, 
CompactionServiceId csid) {
+    ExternalJob(CompactionJob job, Compactable compactable, 
CompactionServiceId csid) {
       super(job);
       this.compactable = compactable;
       this.csid = csid;
       queuedTask.add(this);
+      this.timeCreated = System.currentTimeMillis();
     }
 
     @Override
@@ -97,6 +99,10 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
     public KeyExtent getExtent() {
       return compactable.getExtent();
     }
+
+    public long getTimeCreated() {
+      return timeCreated;
+    }
   }
 
   private PriorityBlockingQueue<ExternalJob> queue;
@@ -104,10 +110,13 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
 
   public ExternalCompactionExecutor(CompactionExecutorId ceid) {
     this.ceid = ceid;
-    Comparator<ExternalJob> comparator = Comparator.comparingLong(ej -> 
ej.getJob().getPriority());
-    comparator = comparator.reversed();
+    Comparator<ExternalJob> priorityComparator =
+        Comparator.comparingLong(ej -> ej.getJob().getPriority());
+    priorityComparator = priorityComparator.reversed();
+    Comparator<ExternalJob> timeComparator = 
Comparator.comparingLong(ExternalJob::getTimeCreated);
 
-    this.queue = new PriorityBlockingQueue<ExternalJob>(100, comparator);
+    this.queue = new PriorityBlockingQueue<ExternalJob>(100,
+        priorityComparator.thenComparing(timeComparator));
   }
 
   @Override
@@ -139,34 +148,42 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
   ExternalCompactionJob reserveExternalCompaction(long priority, String 
compactorId,
       ExternalCompactionId externalCompactionId) {
 
-    ExternalJob extJob = queue.poll();
-    while (extJob != null && extJob.getStatus() != Status.QUEUED) {
-      extJob = queue.poll();
-    }
+    ExternalCompactionJob found = null;
 
-    if (extJob == null) {
-      return null;
-    }
+    while (found == null) {
+      ExternalJob extJob = queue.poll();
+      while (extJob != null && extJob.getStatus() != Status.QUEUED) {
+        // Remove non-queued state from queue
+        extJob = queue.poll();
+      }
 
-    if (extJob.getJob().getPriority() >= priority) {
-      if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
-        queuedTask.remove(extJob);
-        var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, 
extJob.getJob(),
-            compactorId, externalCompactionId);
-        if (ecj == null)
-          return null;
-        extJob.ecid = ecj.getExternalCompactionId();
-        return ecj;
+      if (extJob == null) {
+        found = null;
+        // nothing left in queue
+        break;
+      }
+
+      if (extJob.getJob().getPriority() >= priority) {
+        if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
+          queuedTask.remove(extJob);
+          var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, 
extJob.getJob(),
+              compactorId, externalCompactionId);
+          if (ecj == null) {
+            found = null;
+            break;
+          } else {
+            extJob.ecid = ecj.getExternalCompactionId();
+            found = ecj;
+          }
+        } else {
+          // Something else modified this job, try again
+          continue;
+        }
       } else {
-        // TODO could this cause a stack overflow?
-        return reserveExternalCompaction(priority, compactorId, 
externalCompactionId);
+        queue.add(extJob);
       }
-    } else {
-      // TODO this messes with the ordering.. maybe make the comparator 
compare on time also
-      queue.add(extJob);
     }
-
-    return null;
+    return found;
   }
 
   // TODO maybe create non-thrift type to avoid thrift types all over the code

Reply via email to