This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 03d1198  Lots of little changes to get compactor and coordinator to 
run from command line
03d1198 is described below

commit 03d11985cbc9bc8acca83cb2c98766144710532a
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Mar 11 10:32:18 2021 -0500

    Lots of little changes to get compactor and coordinator to run from command 
line
---
 assemble/conf/accumulo-env.sh                      |  2 +-
 assemble/pom.xml                                   | 10 ++++++
 .../java/org/apache/accumulo/core/Constants.java   |  2 +-
 .../core/spi/compaction/CompactionExecutorId.java  | 19 +++++++++--
 .../apache/accumulo/fate/zookeeper/ZooLock.java    |  4 ++-
 .../compaction/DefaultCompactionPlannerTest.java   | 22 ++++++-------
 .../util/compaction/CompactionPrioritizerTest.java |  2 +-
 pom.xml                                            | 10 ++++++
 .../server/compaction/ExternalCompactionUtil.java  | 14 +++++---
 .../apache/accumulo/server/init/Initialize.java    |  8 +++++
 server/compaction-coordinator/pom.xml              |  7 +++-
 .../coordinator/CompactionCoordinator.java         | 15 +++++----
 .../coordinator/CoordinatorExecutable.java         | 38 +++++++++++++---------
 server/compactor/pom.xml                           |  7 +++-
 .../org/apache/accumulo/compactor/Compactor.java   | 11 ++++---
 .../accumulo/compactor/CompactorExecutable.java    | 38 +++++++++++++---------
 .../tserver/compactions/CompactionManager.java     |  3 +-
 .../tserver/compactions/CompactionService.java     |  4 +--
 .../compactions/ExternalCompactionExecutor.java    | 26 ++++++++++-----
 19 files changed, 166 insertions(+), 76 deletions(-)

diff --git a/assemble/conf/accumulo-env.sh b/assemble/conf/accumulo-env.sh
index 3161b8b..d5b6012 100644
--- a/assemble/conf/accumulo-env.sh
+++ b/assemble/conf/accumulo-env.sh
@@ -95,7 +95,7 @@ JAVA_OPTS=("${JAVA_OPTS[@]}"
 )
 
 case "$cmd" in
-  monitor|gc|manager|master|tserver|tracer)
+  monitor|gc|manager|master|tserver|tracer|compaction-coordinator|compactor)
     JAVA_OPTS=("${JAVA_OPTS[@]}" 
"-Dlog4j.configurationFile=log4j2-service.properties")
     ;;
   *)
diff --git a/assemble/pom.xml b/assemble/pom.xml
index ee4fcda..3b6ea0d 100644
--- a/assemble/pom.xml
+++ b/assemble/pom.xml
@@ -171,6 +171,16 @@
     </dependency>
     <dependency>
       <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-compaction-coordinator</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-compactor</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-core</artifactId>
       <optional>true</optional>
     </dependency>
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java 
b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 3276eb2..e2fac29 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -62,7 +62,7 @@ public class Constants {
   public static final String ZCOMPACTORS = "/compactors";
 
   public static final String ZCOORDINATOR = "/coordinators";
-  public static final String ZCOORDINATOR_LOCK = "/coordinators/lock";
+  public static final String ZCOORDINATOR_LOCK = ZCOORDINATOR + "/lock";
 
   public static final String ZDEAD = "/dead";
   public static final String ZDEADTSERVERS = ZDEAD + "/tservers";
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
index 91e8bb0..498c8b4 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
@@ -20,6 +20,8 @@ package org.apache.accumulo.core.spi.compaction;
 
 import org.apache.accumulo.core.data.AbstractId;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A unique identifier for a a compaction executor that a {@link 
CompactionPlanner} can schedule
  * compactions on using a {@link CompactionJob}.
@@ -34,7 +36,20 @@ public class CompactionExecutorId extends 
AbstractId<CompactionExecutorId> {
     super(canonical);
   }
 
-  public static CompactionExecutorId of(String canonical) {
-    return new CompactionExecutorId(canonical);
+  public boolean isExernalId() {
+    return canonical().startsWith("e.");
+  }
+
+  public String getExernalName() {
+    Preconditions.checkState(isExernalId());
+    return canonical().substring("e.".length());
+  }
+
+  public static CompactionExecutorId internalId(CompactionServiceId csid, 
String executorName) {
+    return new CompactionExecutorId("i." + csid + "." + executorName);
+  }
+
+  public static CompactionExecutorId externalId(String executorName) {
+    return new CompactionExecutorId("e." + executorName);
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java 
b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
index 15ff591..4511f03 100644
--- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
+++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
@@ -134,7 +134,9 @@ public class ZooLock implements Watcher {
     }
 
     @Override
-    public void failedToAcquireLock(Exception e) {}
+    public void failedToAcquireLock(Exception e) {
+      LOG.debug("Failed to acquire lock", e);
+    }
 
     @Override
     public void lostLock(LockLossReason reason) {
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index c494f4c..f01e476 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -146,7 +146,7 @@ public class DefaultCompactionPlannerTest {
     // planner should compact.
     var job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(candidates, job.getFiles());
-    assertEquals(CompactionExecutorId.of("medium"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("medium"), job.getExecutor());
   }
 
   @Test
@@ -162,7 +162,7 @@ public class DefaultCompactionPlannerTest {
     // a running non-user compaction should not prevent a user compaction
     var job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(candidates, job.getFiles());
-    assertEquals(CompactionExecutorId.of("medium"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("medium"), job.getExecutor());
 
     // should only run one user compaction at a time
     compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", 
"3M", "F2", "3M")));
@@ -180,7 +180,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), 
job.getFiles());
-    assertEquals(CompactionExecutorId.of("small"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("small"), job.getExecutor());
 
     // should compact all 15
     all = createCFs("FI", "7M", "F4", "8M", "F5", "16M", "F6", "32M", "F7", 
"64M", "F8", "128M",
@@ -190,7 +190,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorId.of("huge"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("huge"), job.getExecutor());
 
     // For user compaction, can compact a subset that meets the compaction 
ratio if there is also a
     // larger set of files the meets the compaction ratio
@@ -200,7 +200,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "3M", "F2", "4M", "F3", "5M", "F4", "6M"), 
job.getFiles());
-    assertEquals(CompactionExecutorId.of("small"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("small"), job.getExecutor());
 
     // There is a subset of small files that meets the compaction ratio, but 
the larger set does not
     // so compact everything to avoid doing more than logarithmic work
@@ -209,7 +209,7 @@ public class DefaultCompactionPlannerTest {
     plan = planner.makePlan(params);
     job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorId.of("medium"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("medium"), job.getExecutor());
 
   }
 
@@ -223,21 +223,21 @@ public class DefaultCompactionPlannerTest {
     // should only compact files less than max size
     var job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(createCFs("F1", "128M", "F2", "129M", "F3", "130M"), 
job.getFiles());
-    assertEquals(CompactionExecutorId.of("large"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("large"), job.getExecutor());
 
     // user compaction can exceed the max size
     params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.USER);
     plan = planner.makePlan(params);
     job = Iterables.getOnlyElement(plan.getJobs());
     assertEquals(all, job.getFiles());
-    assertEquals(CompactionExecutorId.of("large"), job.getExecutor());
+    assertEquals(CompactionExecutorId.externalId("large"), job.getExecutor());
   }
 
   private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> 
all,
       Set<CompactableFile> files) {
     return new CompactionPlanImpl.BuilderImpl(kind, all, all)
-        .addJob(all.size(), CompactionExecutorId.of("small"), 
files).build().getJobs().iterator()
-        .next();
+        .addJob(all.size(), CompactionExecutorId.externalId("small"), 
files).build().getJobs()
+        .iterator().next();
   }
 
   private static Set<CompactableFile> createCFs(String... namesSizePairs) {
@@ -395,7 +395,7 @@ public class DefaultCompactionPlannerTest {
                 fail("Unexpected name " + name);
                 break;
             }
-            return CompactionExecutorId.of(name);
+            return CompactionExecutorId.externalId(name);
           }
 
           @Override
diff --git 
a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java
index fcc3d41..58d676e 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java
@@ -43,7 +43,7 @@ public class CompactionPrioritizerTest {
           .create(URI.create("hdfs://foonn/accumulo/tables/5/" + tablet + "/" 
+ i + ".rf"), 4, 4));
     }
     return new CompactionJobImpl(CompactionJobPrioritizer.createPriority(kind, 
totalFiles),
-        CompactionExecutorId.of("test"), files, kind, false);
+        CompactionExecutorId.externalId("test"), files, kind, false);
   }
 
   @Test
diff --git a/pom.xml b/pom.xml
index 1d92c34..7705225 100644
--- a/pom.xml
+++ b/pom.xml
@@ -269,6 +269,16 @@
       </dependency>
       <dependency>
         <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-compaction-coordinator</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-compactor</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
         <artifactId>accumulo-core</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
index e346baa..07b2eef 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java
@@ -20,7 +20,9 @@ package org.apache.accumulo.server.compaction;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.zookeeper.KeeperException;
 
 public class ExternalCompactionUtil {
 
@@ -39,14 +41,18 @@ public class ExternalCompactionUtil {
   }
 
   /**
-   * 
+   *
    * @param context
    * @return
    */
   public static HostAndPort findCompactionCoordinator(ServerContext context) {
     final String lockPath = context.getZooKeeperRoot() + 
Constants.ZCOORDINATOR_LOCK;
-    byte[] address = context.getZooCache().get(lockPath);
-    String coordinatorAddress = new String(address);
-    return HostAndPort.fromString(coordinatorAddress);
+    try {
+      byte[] address = 
ZooLock.getLockData(context.getZooReaderWriter().getZooKeeper(), lockPath);
+      String coordinatorAddress = new String(address);
+      return HostAndPort.fromString(coordinatorAddress);
+    } catch (KeeperException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java 
b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 226532a..388b1e0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -660,6 +660,14 @@ public class Initialize implements KeywordExecutable {
         NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + WalStateManager.ZWALS, 
EMPTY_BYTE_ARRAY,
         NodeExistsPolicy.FAIL);
+    // TODO upgrade code should create following paths in ZK
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZCOORDINATOR, 
EMPTY_BYTE_ARRAY,
+        NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZCOORDINATOR_LOCK, 
EMPTY_BYTE_ARRAY,
+        NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZCOMPACTORS, 
EMPTY_BYTE_ARRAY,
+        NodeExistsPolicy.FAIL);
+
   }
 
   private String getInstanceNamePrefix() {
diff --git a/server/compaction-coordinator/pom.xml 
b/server/compaction-coordinator/pom.xml
index caebb50..32a36ad 100644
--- a/server/compaction-coordinator/pom.xml
+++ b/server/compaction-coordinator/pom.xml
@@ -31,8 +31,13 @@
   <name>Apache Accumulo Compaction Coordinator</name>
   <dependencies>
     <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-server-base</artifactId>
     </dependency>
   </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index db6777f..b7f54b4 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
+import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface;
 import org.apache.accumulo.core.compaction.thrift.CompactionState;
 import org.apache.accumulo.core.compaction.thrift.Compactor;
 import org.apache.accumulo.core.compaction.thrift.Status;
@@ -123,9 +124,11 @@ public class CompactionCoordinator extends AbstractServer
     final String lockPath = getContext().getZooKeeperRoot() + 
Constants.ZCOORDINATOR_LOCK;
     final UUID zooLockUUID = UUID.randomUUID();
 
-    CoordinatorLockWatcher managerLockWatcher = new CoordinatorLockWatcher();
+    CoordinatorLockWatcher coordinatorLockWatcher = new 
CoordinatorLockWatcher();
     coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), 
lockPath, zooLockUUID);
-    return coordinatorLock.tryLock(managerLockWatcher, 
coordinatorClientAddress.getBytes());
+    // TODO may want to wait like manager code when lock not acquired, this 
allows starting multiple
+    // coordinators.
+    return coordinatorLock.tryLock(coordinatorLockWatcher, 
coordinatorClientAddress.getBytes());
   }
 
   /**
@@ -135,12 +138,12 @@ public class CompactionCoordinator extends AbstractServer
    * @throws UnknownHostException
    */
   protected ServerAddress startCoordinatorClientService() throws 
UnknownHostException {
-    CompactionCoordinator rpcProxy = TraceUtil.wrapService(this);
+    Iface rpcProxy = TraceUtil.wrapService(this);
     final 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<
-        CompactionCoordinator> processor;
+        Iface> processor;
     if (getContext().getThriftServerType() == ThriftServerType.SASL) {
-      CompactionCoordinator tcredProxy = 
TCredentialsUpdatingWrapper.service(rpcProxy,
-          CompactionCoordinator.class, getConfiguration());
+      Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, 
CompactionCoordinator.class,
+          getConfiguration());
       processor = new 
org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(
           tcredProxy);
     } else {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorExecutable.java
similarity index 56%
copy from 
core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
copy to 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorExecutable.java
index 91e8bb0..dff0995 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorExecutable.java
@@ -16,25 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.spi.compaction;
+package org.apache.accumulo.coordinator;
 
-import org.apache.accumulo.core.data.AbstractId;
+import org.apache.accumulo.start.spi.KeywordExecutable;
 
-/**
- * A unique identifier for a a compaction executor that a {@link 
CompactionPlanner} can schedule
- * compactions on using a {@link CompactionJob}.
- *
- * @since 2.1.0
- * @see org.apache.accumulo.core.spi.compaction
- */
-public class CompactionExecutorId extends AbstractId<CompactionExecutorId> {
-  private static final long serialVersionUID = 1L;
+import com.google.auto.service.AutoService;
+
+@AutoService(KeywordExecutable.class)
+public class CoordinatorExecutable implements KeywordExecutable {
+
+  @Override
+  public String keyword() {
+    return "compaction-coordinator";
+  }
+
+  @Override
+  public UsageGroup usageGroup() {
+    return UsageGroup.PROCESS;
+  }
 
-  private CompactionExecutorId(String canonical) {
-    super(canonical);
+  @Override
+  public String description() {
+    return "Starts Accumulo Compaction Coordinator";
   }
 
-  public static CompactionExecutorId of(String canonical) {
-    return new CompactionExecutorId(canonical);
+  @Override
+  public void execute(final String[] args) throws Exception {
+    CompactionCoordinator.main(args);
   }
+
 }
diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml
index 1ed866f..2466190 100644
--- a/server/compactor/pom.xml
+++ b/server/compactor/pom.xml
@@ -31,8 +31,13 @@
   <name>Apache Accumulo Compactor</name>
   <dependencies>
     <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>org.apache.accumulo</groupId>
       <artifactId>accumulo-server-base</artifactId>
     </dependency>
   </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 992c383..44a397b 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator;
 import org.apache.accumulo.core.compaction.thrift.CompactionState;
+import org.apache.accumulo.core.compaction.thrift.Compactor.Iface;
 import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -200,11 +201,11 @@ public class Compactor extends AbstractServer
    * @throws UnknownHostException
    */
   protected ServerAddress startCompactorClientService() throws 
UnknownHostException {
-    Compactor rpcProxy = TraceUtil.wrapService(this);
-    final 
org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Compactor> 
processor;
+    Iface rpcProxy = TraceUtil.wrapService(this);
+    final 
org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Iface> processor;
     if (getContext().getThriftServerType() == ThriftServerType.SASL) {
-      Compactor tcredProxy =
-          TCredentialsUpdatingWrapper.service(rpcProxy, Compactor.class, 
getConfiguration());
+      Iface tcredProxy =
+          TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), 
getConfiguration());
       processor = new 
org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(tcredProxy);
     } else {
       processor = new 
org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(rpcProxy);
@@ -349,7 +350,7 @@ public class Compactor extends AbstractServer
 
   /**
    * Create compaction runnable
-   * 
+   *
    * @param job
    *          compaction job
    * @param totalInputEntries
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactorExecutable.java
similarity index 56%
copy from 
core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
copy to 
server/compactor/src/main/java/org/apache/accumulo/compactor/CompactorExecutable.java
index 91e8bb0..35672d1 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactorExecutable.java
@@ -16,25 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.spi.compaction;
+package org.apache.accumulo.compactor;
 
-import org.apache.accumulo.core.data.AbstractId;
+import org.apache.accumulo.start.spi.KeywordExecutable;
 
-/**
- * A unique identifier for a a compaction executor that a {@link 
CompactionPlanner} can schedule
- * compactions on using a {@link CompactionJob}.
- *
- * @since 2.1.0
- * @see org.apache.accumulo.core.spi.compaction
- */
-public class CompactionExecutorId extends AbstractId<CompactionExecutorId> {
-  private static final long serialVersionUID = 1L;
+import com.google.auto.service.AutoService;
+
+@AutoService(KeywordExecutable.class)
+public class CompactorExecutable implements KeywordExecutable {
+
+  @Override
+  public String keyword() {
+    return "compactor";
+  }
+
+  @Override
+  public UsageGroup usageGroup() {
+    return UsageGroup.PROCESS;
+  }
 
-  private CompactionExecutorId(String canonical) {
-    super(canonical);
+  @Override
+  public String description() {
+    return "Starts Accumulo Compactor";
   }
 
-  public static CompactionExecutorId of(String canonical) {
-    return new CompactionExecutorId(canonical);
+  @Override
+  public void execute(final String[] args) throws Exception {
+    Compactor.main(args);
   }
+
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index 63ddf75..8626e14 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -420,8 +420,7 @@ public class CompactionManager {
   }
 
   ExternalCompactionExecutor getExternalExecutor(String queueName) {
-    // TODO put prefix for external executor in one place
-    return getExternalExecutor(CompactionExecutorId.of("e." + queueName));
+    return getExternalExecutor(CompactionExecutorId.externalId(queueName));
   }
 
   public void commitExternalCompaction(UUID extCompactionId, 
Map<KeyExtent,Tablet> currentTablets,
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 0a7246f..a1febbe 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -119,7 +119,7 @@ public class CompactionService {
         public CompactionExecutorId createExecutor(String executorName, int 
threads) {
           Preconditions.checkArgument(threads > 0, "Positive number of threads 
required : %s",
               threads);
-          var ceid = CompactionExecutorId.of("i." + myId + "." + executorName);
+          var ceid = CompactionExecutorId.internalId(myId, executorName);
           Preconditions.checkState(!requestedExecutors.containsKey(ceid));
           requestedExecutors.put(ceid, threads);
           return ceid;
@@ -127,7 +127,7 @@ public class CompactionService {
 
         @Override
         public CompactionExecutorId getExternalExecutor(String name) {
-          var ceid = CompactionExecutorId.of("e." + name);
+          var ceid = CompactionExecutorId.externalId(name);
           
Preconditions.checkArgument(!requestedExternalExecutors.contains(ceid));
           requestedExternalExecutors.add(ceid);
           return ceid;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 2b0c77c..46268a7 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -111,16 +111,21 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
   ExternalCompactionJob reserveExternalCompaction(long priority, String 
compactorId) {
 
     ExternalJob extJob = queue.poll();
-    while (extJob.getStatus() != Status.QUEUED) {
+    while (extJob != null && extJob.getStatus() != Status.QUEUED) {
       extJob = queue.poll();
     }
 
+    if (extJob == null) {
+      return null;
+    }
+
     if (extJob.getJob().getPriority() >= priority) {
       if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
         return extJob.compactable.reserveExternalCompaction(extJob.csid, 
extJob.getJob(),
             compactorId);
       } else {
-        // TODO try again
+        // TODO could this cause a stack overflow?
+        return reserveExternalCompaction(priority, compactorId);
       }
     } else {
       // TODO this messes with the ordering.. maybe make the comparator 
compare on time also
@@ -138,13 +143,18 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
     // TODO cast to int is problematic
     int count = (int) queue.stream().filter(extJob -> extJob.status.get() == 
Status.QUEUED).count();
 
-    // TODO is there a better way to get prio w/o looping over everything? 
seems a stream over the
-    // queue is not in particular order
-    long priority = queue.stream().filter(extJob -> extJob.status.get() == 
Status.QUEUED)
-        .mapToLong(extJob -> extJob.getJob().getPriority()).max().orElse(0);
+    long priority = 0;
+    ExternalJob topJob = queue.peek();
+    while (topJob != null && topJob.getStatus() != Status.QUEUED) {
+      queue.removeIf(extJob -> extJob.getStatus() != Status.QUEUED);
+      topJob = queue.peek();
+    }
+
+    if (topJob != null) {
+      priority = topJob.getJob().getPriority();
+    }
 
-    // TODO put extraction of queue name in one place
-    return new TCompactionQueueSummary(ceid.canonical().substring(2), 
priority, count);
+    return new TCompactionQueueSummary(ceid.getExernalName(), priority, count);
   }
 
 }

Reply via email to