This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to
refs/heads/1451-external-compactions-feature by this push:
new 10f1eed Remove two TODOs in ExternalCompactionExecutor
10f1eed is described below
commit 10f1eedafbd1569a8c232cd8d461d4d2a51d18d0
Author: Dave Marion <[email protected]>
AuthorDate: Mon May 10 16:27:56 2021 +0000
Remove two TODOs in ExternalCompactionExecutor
---
.../compactions/ExternalCompactionExecutor.java | 79 +++++++++++++---------
1 file changed, 48 insertions(+), 31 deletions(-)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 894de4a..336978b 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -49,17 +49,19 @@ public class ExternalCompactionExecutor implements
CompactionExecutor {
private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new
HashSet<>());
private class ExternalJob extends SubmittedJob {
- private AtomicReference<Status> status = new
AtomicReference<>(Status.QUEUED);
- private Compactable compactable;
- private CompactionServiceId csid;
+ private final AtomicReference<Status> status = new
AtomicReference<>(Status.QUEUED);
+ private final Compactable compactable;
+ private final CompactionServiceId csid;
private volatile ExternalCompactionId ecid;
- private AtomicLong cancelCount = new AtomicLong();
+ private final AtomicLong cancelCount = new AtomicLong();
+ private final long timeCreated;
- public ExternalJob(CompactionJob job, Compactable compactable,
CompactionServiceId csid) {
+ ExternalJob(CompactionJob job, Compactable compactable,
CompactionServiceId csid) {
super(job);
this.compactable = compactable;
this.csid = csid;
queuedTask.add(this);
+ this.timeCreated = System.currentTimeMillis();
}
@Override
@@ -97,6 +99,10 @@ public class ExternalCompactionExecutor implements
CompactionExecutor {
public KeyExtent getExtent() {
return compactable.getExtent();
}
+
+ public long getTimeCreated() {
+ return timeCreated;
+ }
}
private PriorityBlockingQueue<ExternalJob> queue;
@@ -104,10 +110,13 @@ public class ExternalCompactionExecutor implements
CompactionExecutor {
public ExternalCompactionExecutor(CompactionExecutorId ceid) {
this.ceid = ceid;
- Comparator<ExternalJob> comparator = Comparator.comparingLong(ej ->
ej.getJob().getPriority());
- comparator = comparator.reversed();
+ Comparator<ExternalJob> priorityComparator =
+ Comparator.comparingLong(ej -> ej.getJob().getPriority());
+ priorityComparator = priorityComparator.reversed();
+ Comparator<ExternalJob> timeComparator =
Comparator.comparingLong(ExternalJob::getTimeCreated);
- this.queue = new PriorityBlockingQueue<ExternalJob>(100, comparator);
+ this.queue = new PriorityBlockingQueue<ExternalJob>(100,
+ priorityComparator.thenComparing(timeComparator));
}
@Override
@@ -139,34 +148,42 @@ public class ExternalCompactionExecutor implements
CompactionExecutor {
ExternalCompactionJob reserveExternalCompaction(long priority, String
compactorId,
ExternalCompactionId externalCompactionId) {
- ExternalJob extJob = queue.poll();
- while (extJob != null && extJob.getStatus() != Status.QUEUED) {
- extJob = queue.poll();
- }
+ ExternalCompactionJob found = null;
- if (extJob == null) {
- return null;
- }
+ while (found == null) {
+ ExternalJob extJob = queue.poll();
+ while (extJob != null && extJob.getStatus() != Status.QUEUED) {
+ // Remove non-queued state from queue
+ extJob = queue.poll();
+ }
- if (extJob.getJob().getPriority() >= priority) {
- if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
- queuedTask.remove(extJob);
- var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid,
extJob.getJob(),
- compactorId, externalCompactionId);
- if (ecj == null)
- return null;
- extJob.ecid = ecj.getExternalCompactionId();
- return ecj;
+ if (extJob == null) {
+ found = null;
+ // nothing left in queue
+ break;
+ }
+
+ if (extJob.getJob().getPriority() >= priority) {
+ if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
+ queuedTask.remove(extJob);
+ var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid,
extJob.getJob(),
+ compactorId, externalCompactionId);
+ if (ecj == null) {
+ found = null;
+ break;
+ } else {
+ extJob.ecid = ecj.getExternalCompactionId();
+ found = ecj;
+ }
+ } else {
+ // Something else modified this job, try again
+ continue;
+ }
} else {
- // TODO could this cause a stack overflow?
- return reserveExternalCompaction(priority, compactorId,
externalCompactionId);
+ queue.add(extJob);
}
- } else {
- // TODO this messes with the ordering.. maybe make the comparator
compare on time also
- queue.add(extJob);
}
-
- return null;
+ return found;
}
// TODO maybe create non-thrift type to avoid thrift types all over the code