This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 4721ad5eef Recreated compaction coordinator unit test (#4188) 4721ad5eef is described below commit 4721ad5eefdcc05c4b7607dd3f380bab669dd492 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Jan 25 15:33:22 2024 -0500 Recreated compaction coordinator unit test (#4188) Fixes #3473 --- .../org/apache/accumulo/core/conf/Property.java | 2 + .../java/org/apache/accumulo/manager/Manager.java | 3 +- .../coordinator/CompactionCoordinator.java | 62 ++-- .../compaction/CompactionCoordinatorTest.java | 351 ++++++++++++++++++++- 4 files changed, 383 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6122eb0c07..8e5d9cd6d8 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -441,6 +441,8 @@ public enum Property { "The number of threads used to inspect tablets files to find split points.", "4.0.0"), MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", + // ELASTICITY_TODO: It might be good to note that there is a priority queue per compactor + // resource group "10000", PropertyType.COUNT, "The max size of the priority queue.", "4.0"), // properties that are specific to scan server behavior @Experimental diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 515ca9e1f2..8c1114df78 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -940,8 +940,7 @@ public class Manager extends AbstractServer // Start the Manager's Fate Service fateServiceHandler = new FateServiceHandler(this); managerClientHandler = new ManagerClientServiceHandler(this); - compactionCoordinator = - new CompactionCoordinator(context, tserverSet, security, nextEvent, fateRefs); + compactionCoordinator = new CompactionCoordinator(context, security, fateRefs); // Start the Manager's Client service // Ensure that calls before the manager gets the lock fail ManagerClientService.Iface haProxy = diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 43477c46b3..ec6ee9a42d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -87,6 +87,7 @@ import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.tabletserver.thrift.InputFile; import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; @@ -101,7 +102,6 @@ import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.volume.Volume; -import org.apache.accumulo.manager.EventCoordinator; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; @@ -110,7 +110,6 @@ import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.compaction.CompactionPluginUtils; -import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.hadoop.fs.FileStatus; @@ -145,17 +144,16 @@ public class CompactionCoordinator * is the most authoritative source of what external compactions are currently running, but it * does not have the stats that this map has. */ - protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = + protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = new ConcurrentHashMap<>(); /* Map of group name to last time compactor called to get a compaction job */ // ELASTICITY_TODO need to clean out groups that are no longer configured.. - private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); private final ServerContext ctx; private final SecurityOperation security; private final CompactionJobQueues jobQueues; - private final EventCoordinator eventCoordinator; private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances; // Exposed for tests protected volatile Boolean shutdown = false; @@ -163,19 +161,17 @@ public class CompactionCoordinator private final ScheduledThreadPoolExecutor schedExecutor; private final Cache<ExternalCompactionId,RunningCompaction> completed; - private LoadingCache<Long,CompactionConfig> compactionConfigCache; - private final Cache<Path,Integer> checked_tablet_dir_cache; + private final LoadingCache<Long,CompactionConfig> compactionConfigCache; + private final Cache<Path,Integer> tabletDirCache; private final DeadCompactionDetector deadCompactionDetector; private final QueueMetrics queueMetrics; - public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, - SecurityOperation security, EventCoordinator eventCoordinator, + public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) { this.ctx = ctx; this.schedExecutor = this.ctx.getScheduledExecutor(); this.security = security; - this.eventCoordinator = eventCoordinator; this.jobQueues = new CompactionJobQueues( ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); @@ -201,9 +197,8 @@ public class CompactionCoordinator return path.toUri().toString().length(); }; - checked_tablet_dir_cache = - ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true) - .maximumWeight(10485760L).weigher(weigher).build(); + tabletDirCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true) + .maximumWeight(10485760L).weigher(weigher).build(); deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor); // At this point the manager does not have its lock so no actions should be taken yet @@ -271,7 +266,7 @@ public class CompactionCoordinator // tservers. Its no longer doing that. May be best to remove the loop and make the remaining // task a scheduled one. - LOG.info("Starting loop to check tservers for compaction summaries"); + LOG.info("Starting loop to check for compactors not checking in"); while (!shutdown) { long start = System.currentTimeMillis(); @@ -331,13 +326,13 @@ public class CompactionCoordinator throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } - final String group = groupName.intern(); - LOG.trace("getCompactionJob called for group {} by compactor {}", group, compactorAddress); - TIME_COMPACTOR_LAST_CHECKED.put(group, System.currentTimeMillis()); + CompactorGroupId groupId = CompactorGroupIdImpl.groupId(groupName); + LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); + TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); TExternalCompactionJob result = null; - CompactionJobQueues.MetaJob metaJob = jobQueues.poll(CompactorGroupIdImpl.groupId(groupName)); + CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); while (metaJob != null) { @@ -362,23 +357,24 @@ public class CompactionCoordinator // It is possible that by the time this added that the the compactor that made this request // is dead. In this cases the compaction is not actually running. RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), - new RunningCompaction(result, compactorAddress, group)); + new RunningCompaction(result, compactorAddress, groupName)); LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId, compactorAddress, ecm.getJobFiles().size()); break; } else { - LOG.debug("Unable to reserve compaction job for {}, pulling another off the queue ", - metaJob.getTabletMetadata().getExtent()); + LOG.debug( + "Unable to reserve compaction job for {}, pulling another off the queue for group {}", + metaJob.getTabletMetadata().getExtent(), groupName); metaJob = jobQueues.poll(CompactorGroupIdImpl.groupId(groupName)); } } if (metaJob == null) { - LOG.debug("No jobs found in group {} ", group); + LOG.debug("No jobs found in group {} ", groupName); } if (result == null) { - LOG.trace("No jobs found for group {}, returning empty job to compactor {}", group, + LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, compactorAddress); result = new TExternalCompactionJob(); } @@ -434,7 +430,7 @@ public class CompactionCoordinator private void checkTabletDir(KeyExtent extent, Path path) { try { - if (checked_tablet_dir_cache.getIfPresent(path) == null) { + if (tabletDirCache.getIfPresent(path) == null) { FileStatus[] files = null; try { files = ctx.getVolumeManager().listStatus(path); @@ -447,14 +443,14 @@ public class CompactionCoordinator ctx.getVolumeManager().mkdirs(path); } - checked_tablet_dir_cache.put(path, 1); + tabletDirCache.put(path, 1); } } catch (IOException e) { throw new UncheckedIOException(e); } } - private CompactionMetadata createExternalCompactionMetadata(CompactionJob job, + protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, ExternalCompactionId externalCompactionId) { boolean propDels; @@ -488,7 +484,7 @@ public class CompactionCoordinator } - private CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, + protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, String compactorAddress, ExternalCompactionId externalCompactionId) { Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM @@ -543,8 +539,9 @@ public class CompactionCoordinator return null; } - TExternalCompactionJob createThriftJob(String externalCompactionId, CompactionMetadata ecm, - CompactionJobQueues.MetaJob metaJob, Optional<CompactionConfig> compactionConfig) { + protected TExternalCompactionJob createThriftJob(String externalCompactionId, + CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob, + Optional<CompactionConfig> compactionConfig) { Set<CompactableFile> selectedFiles; if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { @@ -854,7 +851,7 @@ public class CompactionCoordinator * The RUNNING_CACHE set may contain external compactions that are not actually running. This * method periodically cleans those up. */ - protected void cleanUpRunning() { + public void cleanUpRunning() { // grab a snapshot of the ids in the set before reading the metadata table. This is done to // avoid removing things that are added while reading the metadata. @@ -950,6 +947,11 @@ public class CompactionCoordinator cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); } + /* Method exists to be called from test */ + public CompactionJobQueues getJobQueues() { + return jobQueues; + } + /* Method exists to be overridden in test to hide static method */ protected List<RunningCompaction> getCompactionsRunningOnCompactors() { return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 6ef9402886..973a369c96 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -18,8 +18,353 @@ */ package org.apache.accumulo.manager.compaction; +import static org.easymock.EasyMock.expect; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl; +import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.junit.jupiter.api.Test; + +import com.google.common.net.HostAndPort; + public class CompactionCoordinatorTest { - // ELASTICITY_TODO this test was no longer compiling with all the changes to - // CompactionCoordinator. Its contents were deleted to get things compiling, however need to go - // and look at the test and determine what to carry forward with CompactionCoordinator. + + // Need a non-null fateInstances reference for CompactionCoordinator.compactionCompleted + private static final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances = + new AtomicReference<>(Map.of()); + + private static final CompactorGroupId GROUP_ID = CompactorGroupIdImpl.groupId("R2DQ"); + + public class TestCoordinator extends CompactionCoordinator { + + private final List<RunningCompaction> runningCompactions; + + private Set<ExternalCompactionId> metadataCompactionIds = null; + + public TestCoordinator(ServerContext ctx, SecurityOperation security, + List<RunningCompaction> runningCompactions) { + super(ctx, security, fateInstances); + this.runningCompactions = runningCompactions; + } + + @Override + protected void startDeadCompactionDetector() {} + + @Override + protected long getTServerCheckInterval() { + // This is called from CompactionCoordinator.run(). Setting shutdown to true + // here will exit the loop in run() + this.shutdown = true; + return 0L; + } + + @Override + protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + + @Override + protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {} + + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException {} + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws ThriftSecurityException {} + + void setMetadataCompactionIds(Set<ExternalCompactionId> mci) { + metadataCompactionIds = mci; + } + + @Override + protected Set<ExternalCompactionId> readExternalCompactionIds() { + if (metadataCompactionIds == null) { + return RUNNING_CACHE.keySet(); + } else { + return metadataCompactionIds; + } + } + + public Map<ExternalCompactionId,RunningCompaction> getRunning() { + return RUNNING_CACHE; + } + + public void resetInternals() { + getRunning().clear(); + metadataCompactionIds = null; + } + + @Override + protected List<RunningCompaction> getCompactionsRunningOnCompactors() { + return runningCompactions; + } + + @Override + protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactorAddress, + ExternalCompactionId externalCompactionId) { + return createExternalCompactionMetadata(metaJob.getJob(), + metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()), + metaJob.getTabletMetadata(), compactorAddress, externalCompactionId); + } + + @Override + protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, + Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, + ExternalCompactionId externalCompactionId) { + return new CompactionMetadata(jobFiles, + new ReferencedTabletFile(new Path("file:///accumulo/tables/1/default_tablet/F00001.rf")), + compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), true, 1L); + } + + @Override + protected TExternalCompactionJob createThriftJob(String externalCompactionId, + CompactionMetadata ecm, MetaJob metaJob, Optional<CompactionConfig> compactionConfig) { + return new TExternalCompactionJob(externalCompactionId, + metaJob.getTabletMetadata().getExtent().toThrift(), List.of(), + SystemIteratorUtil.toIteratorConfig(List.of()), + ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), 1L, Map.of()); + } + + @Override + protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {} + + } + + @Test + public void testCoordinatorColdStart() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + + EasyMock.replay(context, security); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>()); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + coordinator.run(); + coordinator.shutdown(); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + EasyMock.verify(context, security); + } + + @Test + public void testCoordinatorRestartOneRunningCompaction() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997"); + + List<RunningCompaction> runningCompactions = new ArrayList<>(); + ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); + TExternalCompactionJob job = EasyMock.createNiceMock(TExternalCompactionJob.class); + expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes(); + TKeyExtent extent = new TKeyExtent(); + extent.setTable("1".getBytes()); + runningCompactions + .add(new RunningCompaction(job, tserverAddress.toString(), GROUP_ID.toString())); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + + EasyMock.replay(context, job, security); + + var coordinator = new TestCoordinator(context, security, runningCompactions); + coordinator.resetInternals(); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + coordinator.run(); + coordinator.shutdown(); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(1, coordinator.getRunning().size()); + + Map<ExternalCompactionId,RunningCompaction> running = coordinator.getRunning(); + Entry<ExternalCompactionId,RunningCompaction> ecomp = running.entrySet().iterator().next(); + assertEquals(eci, ecomp.getKey()); + RunningCompaction rc = ecomp.getValue(); + assertEquals(GROUP_ID.toString(), rc.getGroupName()); + assertEquals(tserverAddress.toString(), rc.getCompactorAddress()); + + EasyMock.verify(context, job, security); + } + + @Test + public void testGetCompactionJob() throws Exception { + + TableConfiguration tconf = EasyMock.createNiceMock(TableConfiguration.class); + expect(tconf.get(Property.TABLE_COMPACTION_CONFIGURER)) + .andReturn(Property.TABLE_COMPACTION_CONFIGURER.getDefaultValue()).anyTimes(); + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + expect(context.getTableConfiguration(TableId.of("2a"))).andReturn(tconf).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + expect(context.rpcCreds()).andReturn(creds).anyTimes(); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + expect(security.canPerformSystemActions(creds)).andReturn(true).anyTimes(); + + KeyExtent ke = new KeyExtent(TableId.of("2a"), new Text("z"), new Text("b")); + TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); + expect(tm.getExtent()).andReturn(ke).anyTimes(); + expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); + + EasyMock.replay(tconf, context, creds, tm, security); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>()); + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + // Use coordinator.run() to populate the internal data structures. This is tested in a different + // test. + coordinator.run(); + coordinator.shutdown(); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(0, coordinator.getRunning().size()); + + // Add a job to the job queue + CompactionJob job = new CompactionJobImpl((short) 1, GROUP_ID, Collections.emptyList(), + CompactionKind.SYSTEM, Optional.of(true)); + coordinator.addJobs(tm, Collections.singleton(job)); + CompactionJobPriorityQueue queue = coordinator.getJobQueues().getQueue(GROUP_ID); + assertEquals(1, queue.getQueuedJobs()); + + // Get the next job + ExternalCompactionId eci = ExternalCompactionId.generate(UUID.randomUUID()); + TExternalCompactionJob createdJob = coordinator.getCompactionJob(new TInfo(), creds, + GROUP_ID.toString(), "localhost:10241", eci.toString()); + assertEquals(eci.toString(), createdJob.getExternalCompactionId()); + assertEquals(ke, KeyExtent.fromThrift(createdJob.getExtent())); + + assertEquals(0, coordinator.getJobQueues().getQueuedJobCount()); + assertEquals(1, coordinator.getRunning().size()); + Entry<ExternalCompactionId,RunningCompaction> entry = + coordinator.getRunning().entrySet().iterator().next(); + assertEquals(eci.toString(), entry.getKey().toString()); + assertEquals("localhost:10241", entry.getValue().getCompactorAddress()); + assertEquals(eci.toString(), entry.getValue().getJob().getExternalCompactionId()); + + EasyMock.verify(tconf, context, creds, tm, security); + } + + @Test + public void testGetCompactionJobNoJobs() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + expect(security.canPerformSystemActions(creds)).andReturn(true); + + EasyMock.replay(context, creds, security); + + var coordinator = new TestCoordinator(context, security, new ArrayList<>()); + TExternalCompactionJob job = coordinator.getCompactionJob(TraceUtil.traceInfo(), creds, + GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString()); + assertNull(job.getExternalCompactionId()); + + EasyMock.verify(context, creds, security); + } + + @Test + public void testCleanUpRunning() throws Exception { + + ServerContext context = EasyMock.createNiceMock(ServerContext.class); + expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes(); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + TCredentials creds = EasyMock.createNiceMock(TCredentials.class); + + AuditedSecurityOperation security = EasyMock.createNiceMock(AuditedSecurityOperation.class); + + EasyMock.replay(context, creds, security); + + TestCoordinator coordinator = new TestCoordinator(context, security, new ArrayList<>()); + + var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid3 = ExternalCompactionId.generate(UUID.randomUUID()); + + coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction())); + + coordinator.cleanUpRunning(); + + assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet()); + + coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2)); + + coordinator.cleanUpRunning(); + + assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet()); + + EasyMock.verify(context, creds, security); + + } }