This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 03d1198 Lots of little changes to get compactor and coordinator to run from command line 03d1198 is described below commit 03d11985cbc9bc8acca83cb2c98766144710532a Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Mar 11 10:32:18 2021 -0500 Lots of little changes to get compactor and coordinator to run from command line --- assemble/conf/accumulo-env.sh | 2 +- assemble/pom.xml | 10 ++++++ .../java/org/apache/accumulo/core/Constants.java | 2 +- .../core/spi/compaction/CompactionExecutorId.java | 19 +++++++++-- .../apache/accumulo/fate/zookeeper/ZooLock.java | 4 ++- .../compaction/DefaultCompactionPlannerTest.java | 22 ++++++------- .../util/compaction/CompactionPrioritizerTest.java | 2 +- pom.xml | 10 ++++++ .../server/compaction/ExternalCompactionUtil.java | 14 +++++--- .../apache/accumulo/server/init/Initialize.java | 8 +++++ server/compaction-coordinator/pom.xml | 7 +++- .../coordinator/CompactionCoordinator.java | 15 +++++---- .../coordinator/CoordinatorExecutable.java | 38 +++++++++++++--------- server/compactor/pom.xml | 7 +++- .../org/apache/accumulo/compactor/Compactor.java | 11 ++++--- .../accumulo/compactor/CompactorExecutable.java | 38 +++++++++++++--------- .../tserver/compactions/CompactionManager.java | 3 +- .../tserver/compactions/CompactionService.java | 4 +-- .../compactions/ExternalCompactionExecutor.java | 26 ++++++++++----- 19 files changed, 166 insertions(+), 76 deletions(-) diff --git a/assemble/conf/accumulo-env.sh b/assemble/conf/accumulo-env.sh index 3161b8b..d5b6012 100644 --- a/assemble/conf/accumulo-env.sh +++ b/assemble/conf/accumulo-env.sh @@ -95,7 +95,7 @@ JAVA_OPTS=("${JAVA_OPTS[@]}" ) case "$cmd" in - monitor|gc|manager|master|tserver|tracer) + monitor|gc|manager|master|tserver|tracer|compaction-coordinator|compactor) JAVA_OPTS=("${JAVA_OPTS[@]}" "-Dlog4j.configurationFile=log4j2-service.properties") ;; *) diff --git a/assemble/pom.xml b/assemble/pom.xml index ee4fcda..3b6ea0d 100644 --- a/assemble/pom.xml +++ b/assemble/pom.xml @@ -171,6 +171,16 @@ </dependency> <dependency> <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-compaction-coordinator</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-compactor</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-core</artifactId> <optional>true</optional> </dependency> diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 3276eb2..e2fac29 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -62,7 +62,7 @@ public class Constants { public static final String ZCOMPACTORS = "/compactors"; public static final String ZCOORDINATOR = "/coordinators"; - public static final String ZCOORDINATOR_LOCK = "/coordinators/lock"; + public static final String ZCOORDINATOR_LOCK = ZCOORDINATOR + "/lock"; public static final String ZDEAD = "/dead"; public static final String ZDEADTSERVERS = ZDEAD + "/tservers"; diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java index 91e8bb0..498c8b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java @@ -20,6 +20,8 @@ package org.apache.accumulo.core.spi.compaction; import org.apache.accumulo.core.data.AbstractId; +import com.google.common.base.Preconditions; + /** * A unique identifier for a a compaction executor that a {@link CompactionPlanner} can schedule * compactions on using a {@link CompactionJob}. @@ -34,7 +36,20 @@ public class CompactionExecutorId extends AbstractId<CompactionExecutorId> { super(canonical); } - public static CompactionExecutorId of(String canonical) { - return new CompactionExecutorId(canonical); + public boolean isExernalId() { + return canonical().startsWith("e."); + } + + public String getExernalName() { + Preconditions.checkState(isExernalId()); + return canonical().substring("e.".length()); + } + + public static CompactionExecutorId internalId(CompactionServiceId csid, String executorName) { + return new CompactionExecutorId("i." + csid + "." + executorName); + } + + public static CompactionExecutorId externalId(String executorName) { + return new CompactionExecutorId("e." + executorName); } } diff --git a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java index 15ff591..4511f03 100644 --- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java +++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java @@ -134,7 +134,9 @@ public class ZooLock implements Watcher { } @Override - public void failedToAcquireLock(Exception e) {} + public void failedToAcquireLock(Exception e) { + LOG.debug("Failed to acquire lock", e); + } @Override public void lostLock(LockLossReason reason) { diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index c494f4c..f01e476 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@ -146,7 +146,7 @@ public class DefaultCompactionPlannerTest { // planner should compact. var job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); - assertEquals(CompactionExecutorId.of("medium"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("medium"), job.getExecutor()); } @Test @@ -162,7 +162,7 @@ public class DefaultCompactionPlannerTest { // a running non-user compaction should not prevent a user compaction var job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(candidates, job.getFiles()); - assertEquals(CompactionExecutorId.of("medium"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("medium"), job.getExecutor()); // should only run one user compaction at a time compacting = Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "3M", "F2", "3M"))); @@ -180,7 +180,7 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "1M", "F2", "2M", "F3", "4M"), job.getFiles()); - assertEquals(CompactionExecutorId.of("small"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("small"), job.getExecutor()); // should compact all 15 all = createCFs("FI", "7M", "F4", "8M", "F5", "16M", "F6", "32M", "F7", "64M", "F8", "128M", @@ -190,7 +190,7 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorId.of("huge"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("huge"), job.getExecutor()); // For user compaction, can compact a subset that meets the compaction ratio if there is also a // larger set of files the meets the compaction ratio @@ -200,7 +200,7 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "3M", "F2", "4M", "F3", "5M", "F4", "6M"), job.getFiles()); - assertEquals(CompactionExecutorId.of("small"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("small"), job.getExecutor()); // There is a subset of small files that meets the compaction ratio, but the larger set does not // so compact everything to avoid doing more than logarithmic work @@ -209,7 +209,7 @@ public class DefaultCompactionPlannerTest { plan = planner.makePlan(params); job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorId.of("medium"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("medium"), job.getExecutor()); } @@ -223,21 +223,21 @@ public class DefaultCompactionPlannerTest { // should only compact files less than max size var job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(createCFs("F1", "128M", "F2", "129M", "F3", "130M"), job.getFiles()); - assertEquals(CompactionExecutorId.of("large"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("large"), job.getExecutor()); // user compaction can exceed the max size params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.USER); plan = planner.makePlan(params); job = Iterables.getOnlyElement(plan.getJobs()); assertEquals(all, job.getFiles()); - assertEquals(CompactionExecutorId.of("large"), job.getExecutor()); + assertEquals(CompactionExecutorId.externalId("large"), job.getExecutor()); } private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all, Set<CompactableFile> files) { return new CompactionPlanImpl.BuilderImpl(kind, all, all) - .addJob(all.size(), CompactionExecutorId.of("small"), files).build().getJobs().iterator() - .next(); + .addJob(all.size(), CompactionExecutorId.externalId("small"), files).build().getJobs() + .iterator().next(); } private static Set<CompactableFile> createCFs(String... namesSizePairs) { @@ -395,7 +395,7 @@ public class DefaultCompactionPlannerTest { fail("Unexpected name " + name); break; } - return CompactionExecutorId.of(name); + return CompactionExecutorId.externalId(name); } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java index fcc3d41..58d676e 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionPrioritizerTest.java @@ -43,7 +43,7 @@ public class CompactionPrioritizerTest { .create(URI.create("hdfs://foonn/accumulo/tables/5/" + tablet + "/" + i + ".rf"), 4, 4)); } return new CompactionJobImpl(CompactionJobPrioritizer.createPriority(kind, totalFiles), - CompactionExecutorId.of("test"), files, kind, false); + CompactionExecutorId.externalId("test"), files, kind, false); } @Test diff --git a/pom.xml b/pom.xml index 1d92c34..7705225 100644 --- a/pom.xml +++ b/pom.xml @@ -269,6 +269,16 @@ </dependency> <dependency> <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-compaction-coordinator</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-compactor</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-core</artifactId> <version>${project.version}</version> </dependency> diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java index e346baa..07b2eef 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionUtil.java @@ -20,7 +20,9 @@ package org.apache.accumulo.server.compaction; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; public class ExternalCompactionUtil { @@ -39,14 +41,18 @@ public class ExternalCompactionUtil { } /** - * + * * @param context * @return */ public static HostAndPort findCompactionCoordinator(ServerContext context) { final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; - byte[] address = context.getZooCache().get(lockPath); - String coordinatorAddress = new String(address); - return HostAndPort.fromString(coordinatorAddress); + try { + byte[] address = ZooLock.getLockData(context.getZooReaderWriter().getZooKeeper(), lockPath); + String coordinatorAddress = new String(address); + return HostAndPort.fromString(coordinatorAddress); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException(e); + } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 226532a..388b1e0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -660,6 +660,14 @@ public class Initialize implements KeywordExecutable { NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + WalStateManager.ZWALS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + // TODO upgrade code should create following paths in ZK + zoo.putPersistentData(zkInstanceRoot + Constants.ZCOORDINATOR, EMPTY_BYTE_ARRAY, + NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZCOORDINATOR_LOCK, EMPTY_BYTE_ARRAY, + NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZCOMPACTORS, EMPTY_BYTE_ARRAY, + NodeExistsPolicy.FAIL); + } private String getInstanceNamePrefix() { diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml index caebb50..32a36ad 100644 --- a/server/compaction-coordinator/pom.xml +++ b/server/compaction-coordinator/pom.xml @@ -31,8 +31,13 @@ <name>Apache Accumulo Compaction Coordinator</name> <dependencies> <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + <dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-server-base</artifactId> </dependency> </dependencies> -</project> \ No newline at end of file +</project> diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index db6777f..b7f54b4 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.ThriftTransportPool; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface; import org.apache.accumulo.core.compaction.thrift.CompactionState; import org.apache.accumulo.core.compaction.thrift.Compactor; import org.apache.accumulo.core.compaction.thrift.Status; @@ -123,9 +124,11 @@ public class CompactionCoordinator extends AbstractServer final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; final UUID zooLockUUID = UUID.randomUUID(); - CoordinatorLockWatcher managerLockWatcher = new CoordinatorLockWatcher(); + CoordinatorLockWatcher coordinatorLockWatcher = new CoordinatorLockWatcher(); coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), lockPath, zooLockUUID); - return coordinatorLock.tryLock(managerLockWatcher, coordinatorClientAddress.getBytes()); + // TODO may want to wait like manager code when lock not acquired, this allows starting multiple + // coordinators. + return coordinatorLock.tryLock(coordinatorLockWatcher, coordinatorClientAddress.getBytes()); } /** @@ -135,12 +138,12 @@ public class CompactionCoordinator extends AbstractServer * @throws UnknownHostException */ protected ServerAddress startCoordinatorClientService() throws UnknownHostException { - CompactionCoordinator rpcProxy = TraceUtil.wrapService(this); + Iface rpcProxy = TraceUtil.wrapService(this); final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor< - CompactionCoordinator> processor; + Iface> processor; if (getContext().getThriftServerType() == ThriftServerType.SASL) { - CompactionCoordinator tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, - CompactionCoordinator.class, getConfiguration()); + Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, CompactionCoordinator.class, + getConfiguration()); processor = new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>( tcredProxy); } else { diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorExecutable.java similarity index 56% copy from core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java copy to server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorExecutable.java index 91e8bb0..dff0995 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CoordinatorExecutable.java @@ -16,25 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.core.spi.compaction; +package org.apache.accumulo.coordinator; -import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.start.spi.KeywordExecutable; -/** - * A unique identifier for a a compaction executor that a {@link CompactionPlanner} can schedule - * compactions on using a {@link CompactionJob}. - * - * @since 2.1.0 - * @see org.apache.accumulo.core.spi.compaction - */ -public class CompactionExecutorId extends AbstractId<CompactionExecutorId> { - private static final long serialVersionUID = 1L; +import com.google.auto.service.AutoService; + +@AutoService(KeywordExecutable.class) +public class CoordinatorExecutable implements KeywordExecutable { + + @Override + public String keyword() { + return "compaction-coordinator"; + } + + @Override + public UsageGroup usageGroup() { + return UsageGroup.PROCESS; + } - private CompactionExecutorId(String canonical) { - super(canonical); + @Override + public String description() { + return "Starts Accumulo Compaction Coordinator"; } - public static CompactionExecutorId of(String canonical) { - return new CompactionExecutorId(canonical); + @Override + public void execute(final String[] args) throws Exception { + CompactionCoordinator.main(args); } + } diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml index 1ed866f..2466190 100644 --- a/server/compactor/pom.xml +++ b/server/compactor/pom.xml @@ -31,8 +31,13 @@ <name>Apache Accumulo Compactor</name> <dependencies> <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + <dependency> <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-server-base</artifactId> </dependency> </dependencies> -</project> \ No newline at end of file +</project> diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 992c383..44a397b 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinator; import org.apache.accumulo.core.compaction.thrift.CompactionState; +import org.apache.accumulo.core.compaction.thrift.Compactor.Iface; import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -200,11 +201,11 @@ public class Compactor extends AbstractServer * @throws UnknownHostException */ protected ServerAddress startCompactorClientService() throws UnknownHostException { - Compactor rpcProxy = TraceUtil.wrapService(this); - final org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Compactor> processor; + Iface rpcProxy = TraceUtil.wrapService(this); + final org.apache.accumulo.core.compaction.thrift.Compactor.Processor<Iface> processor; if (getContext().getThriftServerType() == ThriftServerType.SASL) { - Compactor tcredProxy = - TCredentialsUpdatingWrapper.service(rpcProxy, Compactor.class, getConfiguration()); + Iface tcredProxy = + TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration()); processor = new org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(tcredProxy); } else { processor = new org.apache.accumulo.core.compaction.thrift.Compactor.Processor<>(rpcProxy); @@ -349,7 +350,7 @@ public class Compactor extends AbstractServer /** * Create compaction runnable - * + * * @param job * compaction job * @param totalInputEntries diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactorExecutable.java similarity index 56% copy from core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java copy to server/compactor/src/main/java/org/apache/accumulo/compactor/CompactorExecutable.java index 91e8bb0..35672d1 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactorExecutable.java @@ -16,25 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.core.spi.compaction; +package org.apache.accumulo.compactor; -import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.start.spi.KeywordExecutable; -/** - * A unique identifier for a a compaction executor that a {@link CompactionPlanner} can schedule - * compactions on using a {@link CompactionJob}. - * - * @since 2.1.0 - * @see org.apache.accumulo.core.spi.compaction - */ -public class CompactionExecutorId extends AbstractId<CompactionExecutorId> { - private static final long serialVersionUID = 1L; +import com.google.auto.service.AutoService; + +@AutoService(KeywordExecutable.class) +public class CompactorExecutable implements KeywordExecutable { + + @Override + public String keyword() { + return "compactor"; + } + + @Override + public UsageGroup usageGroup() { + return UsageGroup.PROCESS; + } - private CompactionExecutorId(String canonical) { - super(canonical); + @Override + public String description() { + return "Starts Accumulo Compactor"; } - public static CompactionExecutorId of(String canonical) { - return new CompactionExecutorId(canonical); + @Override + public void execute(final String[] args) throws Exception { + Compactor.main(args); } + } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 63ddf75..8626e14 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -420,8 +420,7 @@ public class CompactionManager { } ExternalCompactionExecutor getExternalExecutor(String queueName) { - // TODO put prefix for external executor in one place - return getExternalExecutor(CompactionExecutorId.of("e." + queueName)); + return getExternalExecutor(CompactionExecutorId.externalId(queueName)); } public void commitExternalCompaction(UUID extCompactionId, Map<KeyExtent,Tablet> currentTablets, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index 0a7246f..a1febbe 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -119,7 +119,7 @@ public class CompactionService { public CompactionExecutorId createExecutor(String executorName, int threads) { Preconditions.checkArgument(threads > 0, "Positive number of threads required : %s", threads); - var ceid = CompactionExecutorId.of("i." + myId + "." + executorName); + var ceid = CompactionExecutorId.internalId(myId, executorName); Preconditions.checkState(!requestedExecutors.containsKey(ceid)); requestedExecutors.put(ceid, threads); return ceid; @@ -127,7 +127,7 @@ public class CompactionService { @Override public CompactionExecutorId getExternalExecutor(String name) { - var ceid = CompactionExecutorId.of("e." + name); + var ceid = CompactionExecutorId.externalId(name); Preconditions.checkArgument(!requestedExternalExecutors.contains(ceid)); requestedExternalExecutors.add(ceid); return ceid; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java index 2b0c77c..46268a7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java @@ -111,16 +111,21 @@ public class ExternalCompactionExecutor implements CompactionExecutor { ExternalCompactionJob reserveExternalCompaction(long priority, String compactorId) { ExternalJob extJob = queue.poll(); - while (extJob.getStatus() != Status.QUEUED) { + while (extJob != null && extJob.getStatus() != Status.QUEUED) { extJob = queue.poll(); } + if (extJob == null) { + return null; + } + if (extJob.getJob().getPriority() >= priority) { if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) { return extJob.compactable.reserveExternalCompaction(extJob.csid, extJob.getJob(), compactorId); } else { - // TODO try again + // TODO could this cause a stack overflow? + return reserveExternalCompaction(priority, compactorId); } } else { // TODO this messes with the ordering.. maybe make the comparator compare on time also @@ -138,13 +143,18 @@ public class ExternalCompactionExecutor implements CompactionExecutor { // TODO cast to int is problematic int count = (int) queue.stream().filter(extJob -> extJob.status.get() == Status.QUEUED).count(); - // TODO is there a better way to get prio w/o looping over everything? seems a stream over the - // queue is not in particular order - long priority = queue.stream().filter(extJob -> extJob.status.get() == Status.QUEUED) - .mapToLong(extJob -> extJob.getJob().getPriority()).max().orElse(0); + long priority = 0; + ExternalJob topJob = queue.peek(); + while (topJob != null && topJob.getStatus() != Status.QUEUED) { + queue.removeIf(extJob -> extJob.getStatus() != Status.QUEUED); + topJob = queue.peek(); + } + + if (topJob != null) { + priority = topJob.getJob().getPriority(); + } - // TODO put extraction of queue name in one place - return new TCompactionQueueSummary(ceid.canonical().substring(2), priority, count); + return new TCompactionQueueSummary(ceid.getExernalName(), priority, count); } }