This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 10f1eed Remove two TODOs in ExternalCompactionExecutor 10f1eed is described below commit 10f1eedafbd1569a8c232cd8d461d4d2a51d18d0 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon May 10 16:27:56 2021 +0000 Remove two TODOs in ExternalCompactionExecutor --- .../compactions/ExternalCompactionExecutor.java | 79 +++++++++++++--------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java index 894de4a..336978b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java @@ -49,17 +49,19 @@ public class ExternalCompactionExecutor implements CompactionExecutor { private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new HashSet<>()); private class ExternalJob extends SubmittedJob { - private AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED); - private Compactable compactable; - private CompactionServiceId csid; + private final AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED); + private final Compactable compactable; + private final CompactionServiceId csid; private volatile ExternalCompactionId ecid; - private AtomicLong cancelCount = new AtomicLong(); + private final AtomicLong cancelCount = new AtomicLong(); + private final long timeCreated; - public ExternalJob(CompactionJob job, Compactable compactable, CompactionServiceId csid) { + ExternalJob(CompactionJob job, Compactable compactable, CompactionServiceId csid) { super(job); this.compactable = compactable; this.csid = csid; queuedTask.add(this); + this.timeCreated = System.currentTimeMillis(); } @Override @@ -97,6 +99,10 @@ public class ExternalCompactionExecutor implements CompactionExecutor { public KeyExtent getExtent() { return compactable.getExtent(); } + + public long getTimeCreated() { + return timeCreated; + } } private PriorityBlockingQueue<ExternalJob> queue; @@ -104,10 +110,13 @@ public class ExternalCompactionExecutor implements CompactionExecutor { public ExternalCompactionExecutor(CompactionExecutorId ceid) { this.ceid = ceid; - Comparator<ExternalJob> comparator = Comparator.comparingLong(ej -> ej.getJob().getPriority()); - comparator = comparator.reversed(); + Comparator<ExternalJob> priorityComparator = + Comparator.comparingLong(ej -> ej.getJob().getPriority()); + priorityComparator = priorityComparator.reversed(); + Comparator<ExternalJob> timeComparator = Comparator.comparingLong(ExternalJob::getTimeCreated); - this.queue = new PriorityBlockingQueue<ExternalJob>(100, comparator); + this.queue = new PriorityBlockingQueue<ExternalJob>(100, + priorityComparator.thenComparing(timeComparator)); } @Override @@ -139,34 +148,42 @@ public class ExternalCompactionExecutor implements CompactionExecutor { ExternalCompactionJob reserveExternalCompaction(long priority, String compactorId, ExternalCompactionId externalCompactionId) { - ExternalJob extJob = queue.poll(); - while (extJob != null && extJob.getStatus() != Status.QUEUED) { - extJob = queue.poll(); - } + ExternalCompactionJob found = null; - if (extJob == null) { - return null; - } + while (found == null) { + ExternalJob extJob = queue.poll(); + while (extJob != null && extJob.getStatus() != Status.QUEUED) { + // Remove non-queued state from queue + extJob = queue.poll(); + } - if (extJob.getJob().getPriority() >= priority) { - if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) { - queuedTask.remove(extJob); - var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, extJob.getJob(), - compactorId, externalCompactionId); - if (ecj == null) - return null; - extJob.ecid = ecj.getExternalCompactionId(); - return ecj; + if (extJob == null) { + found = null; + // nothing left in queue + break; + } + + if (extJob.getJob().getPriority() >= priority) { + if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) { + queuedTask.remove(extJob); + var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, extJob.getJob(), + compactorId, externalCompactionId); + if (ecj == null) { + found = null; + break; + } else { + extJob.ecid = ecj.getExternalCompactionId(); + found = ecj; + } + } else { + // Something else modified this job, try again + continue; + } } else { - // TODO could this cause a stack overflow? - return reserveExternalCompaction(priority, compactorId, externalCompactionId); + queue.add(extJob); } - } else { - // TODO this messes with the ordering.. maybe make the comparator compare on time also - queue.add(extJob); } - - return null; + return found; } // TODO maybe create non-thrift type to avoid thrift types all over the code