This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 4721ad5eef Recreated compaction coordinator unit test (#4188)
4721ad5eef is described below

commit 4721ad5eefdcc05c4b7607dd3f380bab669dd492
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Jan 25 15:33:22 2024 -0500

    Recreated compaction coordinator unit test (#4188)
    
    Fixes #3473
---
 .../org/apache/accumulo/core/conf/Property.java    |   2 +
 .../java/org/apache/accumulo/manager/Manager.java  |   3 +-
 .../coordinator/CompactionCoordinator.java         |  62 ++--
 .../compaction/CompactionCoordinatorTest.java      | 351 ++++++++++++++++++++-
 4 files changed, 383 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 6122eb0c07..8e5d9cd6d8 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -441,6 +441,8 @@ public enum Property {
       "The number of threads used to inspect tablets files to find split 
points.", "4.0.0"),
 
   
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
+      // ELASTICITY_TODO: It might be good to note that there is a priority 
queue per compactor
+      // resource group
       "10000", PropertyType.COUNT, "The max size of the priority queue.", 
"4.0"),
   // properties that are specific to scan server behavior
   @Experimental
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 515ca9e1f2..8c1114df78 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -940,8 +940,7 @@ public class Manager extends AbstractServer
     // Start the Manager's Fate Service
     fateServiceHandler = new FateServiceHandler(this);
     managerClientHandler = new ManagerClientServiceHandler(this);
-    compactionCoordinator =
-        new CompactionCoordinator(context, tserverSet, security, nextEvent, 
fateRefs);
+    compactionCoordinator = new CompactionCoordinator(context, security, 
fateRefs);
     // Start the Manager's Client service
     // Ensure that calls before the manager gets the lock fail
     ManagerClientService.Iface haProxy =
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 43477c46b3..ec6ee9a42d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -87,6 +87,7 @@ import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
@@ -101,7 +102,6 @@ import 
org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.core.volume.Volume;
-import org.apache.accumulo.manager.EventCoordinator;
 import org.apache.accumulo.manager.Manager;
 import 
org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
 import 
org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
@@ -110,7 +110,6 @@ import 
org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 import org.apache.accumulo.server.compaction.CompactionPluginUtils;
-import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.tablets.TabletNameGenerator;
 import org.apache.hadoop.fs.FileStatus;
@@ -145,17 +144,16 @@ public class CompactionCoordinator
    * is the most authoritative source of what external compactions are 
currently running, but it
    * does not have the stats that this map has.
    */
-  protected static final Map<ExternalCompactionId,RunningCompaction> 
RUNNING_CACHE =
+  protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
       new ConcurrentHashMap<>();
 
   /* Map of group name to last time compactor called to get a compaction job */
   // ELASTICITY_TODO need to clean out groups that are no longer configured..
-  private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
+  private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new 
ConcurrentHashMap<>();
 
   private final ServerContext ctx;
   private final SecurityOperation security;
   private final CompactionJobQueues jobQueues;
-  private final EventCoordinator eventCoordinator;
   private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances;
   // Exposed for tests
   protected volatile Boolean shutdown = false;
@@ -163,19 +161,17 @@ public class CompactionCoordinator
   private final ScheduledThreadPoolExecutor schedExecutor;
 
   private final Cache<ExternalCompactionId,RunningCompaction> completed;
-  private LoadingCache<Long,CompactionConfig> compactionConfigCache;
-  private final Cache<Path,Integer> checked_tablet_dir_cache;
+  private final LoadingCache<Long,CompactionConfig> compactionConfigCache;
+  private final Cache<Path,Integer> tabletDirCache;
   private final DeadCompactionDetector deadCompactionDetector;
 
   private final QueueMetrics queueMetrics;
 
-  public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
-      SecurityOperation security, EventCoordinator eventCoordinator,
+  public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
       AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) {
     this.ctx = ctx;
     this.schedExecutor = this.ctx.getScheduledExecutor();
     this.security = security;
-    this.eventCoordinator = eventCoordinator;
 
     this.jobQueues = new CompactionJobQueues(
         
ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
@@ -201,9 +197,8 @@ public class CompactionCoordinator
       return path.toUri().toString().length();
     };
 
-    checked_tablet_dir_cache =
-        ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
-            .maximumWeight(10485760L).weigher(weigher).build();
+    tabletDirCache = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
+        .maximumWeight(10485760L).weigher(weigher).build();
 
     deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, 
schedExecutor);
     // At this point the manager does not have its lock so no actions should 
be taken yet
@@ -271,7 +266,7 @@ public class CompactionCoordinator
     // tservers. Its no longer doing that. May be best to remove the loop and 
make the remaining
     // task a scheduled one.
 
-    LOG.info("Starting loop to check tservers for compaction summaries");
+    LOG.info("Starting loop to check for compactors not checking in");
     while (!shutdown) {
       long start = System.currentTimeMillis();
 
@@ -331,13 +326,13 @@ public class CompactionCoordinator
       throw new AccumuloSecurityException(credentials.getPrincipal(),
           SecurityErrorCode.PERMISSION_DENIED).asThriftException();
     }
-    final String group = groupName.intern();
-    LOG.trace("getCompactionJob called for group {} by compactor {}", group, 
compactorAddress);
-    TIME_COMPACTOR_LAST_CHECKED.put(group, System.currentTimeMillis());
+    CompactorGroupId groupId = CompactorGroupIdImpl.groupId(groupName);
+    LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, 
compactorAddress);
+    TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
 
     TExternalCompactionJob result = null;
 
-    CompactionJobQueues.MetaJob metaJob = 
jobQueues.poll(CompactorGroupIdImpl.groupId(groupName));
+    CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId);
 
     while (metaJob != null) {
 
@@ -362,23 +357,24 @@ public class CompactionCoordinator
         // It is possible that by the time this added that the the compactor 
that made this request
         // is dead. In this cases the compaction is not actually running.
         
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
-            new RunningCompaction(result, compactorAddress, group));
+            new RunningCompaction(result, compactorAddress, groupName));
         LOG.debug("Returning external job {} to {} with {} files", 
result.externalCompactionId,
             compactorAddress, ecm.getJobFiles().size());
         break;
       } else {
-        LOG.debug("Unable to reserve compaction job for {}, pulling another 
off the queue ",
-            metaJob.getTabletMetadata().getExtent());
+        LOG.debug(
+            "Unable to reserve compaction job for {}, pulling another off the 
queue for group {}",
+            metaJob.getTabletMetadata().getExtent(), groupName);
         metaJob = jobQueues.poll(CompactorGroupIdImpl.groupId(groupName));
       }
     }
 
     if (metaJob == null) {
-      LOG.debug("No jobs found in group {} ", group);
+      LOG.debug("No jobs found in group {} ", groupName);
     }
 
     if (result == null) {
-      LOG.trace("No jobs found for group {}, returning empty job to compactor 
{}", group,
+      LOG.trace("No jobs found for group {}, returning empty job to compactor 
{}", groupName,
           compactorAddress);
       result = new TExternalCompactionJob();
     }
@@ -434,7 +430,7 @@ public class CompactionCoordinator
 
   private void checkTabletDir(KeyExtent extent, Path path) {
     try {
-      if (checked_tablet_dir_cache.getIfPresent(path) == null) {
+      if (tabletDirCache.getIfPresent(path) == null) {
         FileStatus[] files = null;
         try {
           files = ctx.getVolumeManager().listStatus(path);
@@ -447,14 +443,14 @@ public class CompactionCoordinator
 
           ctx.getVolumeManager().mkdirs(path);
         }
-        checked_tablet_dir_cache.put(path, 1);
+        tabletDirCache.put(path, 1);
       }
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
   }
 
-  private CompactionMetadata createExternalCompactionMetadata(CompactionJob 
job,
+  protected CompactionMetadata createExternalCompactionMetadata(CompactionJob 
job,
       Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String 
compactorAddress,
       ExternalCompactionId externalCompactionId) {
     boolean propDels;
@@ -488,7 +484,7 @@ public class CompactionCoordinator
 
   }
 
-  private CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob 
metaJob,
+  protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob 
metaJob,
       String compactorAddress, ExternalCompactionId externalCompactionId) {
 
     Preconditions.checkArgument(metaJob.getJob().getKind() == 
CompactionKind.SYSTEM
@@ -543,8 +539,9 @@ public class CompactionCoordinator
     return null;
   }
 
-  TExternalCompactionJob createThriftJob(String externalCompactionId, 
CompactionMetadata ecm,
-      CompactionJobQueues.MetaJob metaJob, Optional<CompactionConfig> 
compactionConfig) {
+  protected TExternalCompactionJob createThriftJob(String externalCompactionId,
+      CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob,
+      Optional<CompactionConfig> compactionConfig) {
 
     Set<CompactableFile> selectedFiles;
     if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
@@ -854,7 +851,7 @@ public class CompactionCoordinator
    * The RUNNING_CACHE set may contain external compactions that are not 
actually running. This
    * method periodically cleans those up.
    */
-  protected void cleanUpRunning() {
+  public void cleanUpRunning() {
 
     // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
     // avoid removing things that are added while reading the metadata.
@@ -950,6 +947,11 @@ public class CompactionCoordinator
     cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), 
externalCompactionId);
   }
 
+  /* Method exists to be called from test */
+  public CompactionJobQueues getJobQueues() {
+    return jobQueues;
+  }
+
   /* Method exists to be overridden in test to hide static method */
   protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
     return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 6ef9402886..973a369c96 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -18,8 +18,353 @@
  */
 package org.apache.accumulo.manager.compaction;
 
+import static org.easymock.EasyMock.expect;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.clientImpl.thrift.TInfo;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
+import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.cache.Caches;
+import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
+import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
+import org.apache.accumulo.core.util.compaction.RunningCompaction;
+import org.apache.accumulo.manager.Manager;
+import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
+import 
org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.net.HostAndPort;
+
 public class CompactionCoordinatorTest {
-  // ELASTICITY_TODO this test was no longer compiling with all the changes to
-  // CompactionCoordinator. Its contents were deleted to get things compiling, 
however need to go
-  // and look at the test and determine what to carry forward with 
CompactionCoordinator.
+
+  // Need a non-null fateInstances reference for 
CompactionCoordinator.compactionCompleted
+  private static final AtomicReference<Map<FateInstanceType,Fate<Manager>>> 
fateInstances =
+      new AtomicReference<>(Map.of());
+
+  private static final CompactorGroupId GROUP_ID = 
CompactorGroupIdImpl.groupId("R2DQ");
+
+  public class TestCoordinator extends CompactionCoordinator {
+
+    private final List<RunningCompaction> runningCompactions;
+
+    private Set<ExternalCompactionId> metadataCompactionIds = null;
+
+    public TestCoordinator(ServerContext ctx, SecurityOperation security,
+        List<RunningCompaction> runningCompactions) {
+      super(ctx, security, fateInstances);
+      this.runningCompactions = runningCompactions;
+    }
+
+    @Override
+    protected void startDeadCompactionDetector() {}
+
+    @Override
+    protected long getTServerCheckInterval() {
+      // This is called from CompactionCoordinator.run(). Setting shutdown to 
true
+      // here will exit the loop in run()
+      this.shutdown = true;
+      return 0L;
+    }
+
+    @Override
+    protected void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
+
+    @Override
+    protected void startRunningCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {}
+
+    @Override
+    public void compactionCompleted(TInfo tinfo, TCredentials credentials,
+        String externalCompactionId, TKeyExtent textent, TCompactionStats 
stats)
+        throws ThriftSecurityException {}
+
+    @Override
+    public void compactionFailed(TInfo tinfo, TCredentials credentials, String 
externalCompactionId,
+        TKeyExtent extent) throws ThriftSecurityException {}
+
+    void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
+      metadataCompactionIds = mci;
+    }
+
+    @Override
+    protected Set<ExternalCompactionId> readExternalCompactionIds() {
+      if (metadataCompactionIds == null) {
+        return RUNNING_CACHE.keySet();
+      } else {
+        return metadataCompactionIds;
+      }
+    }
+
+    public Map<ExternalCompactionId,RunningCompaction> getRunning() {
+      return RUNNING_CACHE;
+    }
+
+    public void resetInternals() {
+      getRunning().clear();
+      metadataCompactionIds = null;
+    }
+
+    @Override
+    protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
+      return runningCompactions;
+    }
+
+    @Override
+    protected CompactionMetadata reserveCompaction(MetaJob metaJob, String 
compactorAddress,
+        ExternalCompactionId externalCompactionId) {
+      return createExternalCompactionMetadata(metaJob.getJob(),
+          
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
+              .collect(Collectors.toSet()),
+          metaJob.getTabletMetadata(), compactorAddress, externalCompactionId);
+    }
+
+    @Override
+    protected CompactionMetadata 
createExternalCompactionMetadata(CompactionJob job,
+        Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String 
compactorAddress,
+        ExternalCompactionId externalCompactionId) {
+      return new CompactionMetadata(jobFiles,
+          new ReferencedTabletFile(new 
Path("file:///accumulo/tables/1/default_tablet/F00001.rf")),
+          compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), 
true, 1L);
+    }
+
+    @Override
+    protected TExternalCompactionJob createThriftJob(String 
externalCompactionId,
+        CompactionMetadata ecm, MetaJob metaJob, Optional<CompactionConfig> 
compactionConfig) {
+      return new TExternalCompactionJob(externalCompactionId,
+          metaJob.getTabletMetadata().getExtent().toThrift(), List.of(),
+          SystemIteratorUtil.toIteratorConfig(List.of()),
+          ecm.getCompactTmpName().getNormalizedPathStr(), 
ecm.getPropagateDeletes(),
+          TCompactionKind.valueOf(ecm.getKind().name()), 1L, Map.of());
+    }
+
+    @Override
+    protected void cancelCompactionOnCompactor(String address, String 
externalCompactionId) {}
+
+  }
+
+  @Test
+  public void testCoordinatorColdStart() throws Exception {
+
+    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
+    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
+    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+
+    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
+
+    EasyMock.replay(context, security);
+
+    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
+    assertEquals(0, coordinator.getRunning().size());
+    coordinator.run();
+    coordinator.shutdown();
+
+    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
+    assertEquals(0, coordinator.getRunning().size());
+    EasyMock.verify(context, security);
+  }
+
+  @Test
+  public void testCoordinatorRestartOneRunningCompaction() throws Exception {
+
+    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
+    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
+    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+
+    HostAndPort tserverAddress = HostAndPort.fromString("localhost:9997");
+
+    List<RunningCompaction> runningCompactions = new ArrayList<>();
+    ExternalCompactionId eci = 
ExternalCompactionId.generate(UUID.randomUUID());
+    TExternalCompactionJob job = 
EasyMock.createNiceMock(TExternalCompactionJob.class);
+    expect(job.getExternalCompactionId()).andReturn(eci.toString()).anyTimes();
+    TKeyExtent extent = new TKeyExtent();
+    extent.setTable("1".getBytes());
+    runningCompactions
+        .add(new RunningCompaction(job, tserverAddress.toString(), 
GROUP_ID.toString()));
+
+    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
+
+    EasyMock.replay(context, job, security);
+
+    var coordinator = new TestCoordinator(context, security, 
runningCompactions);
+    coordinator.resetInternals();
+    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
+    assertEquals(0, coordinator.getRunning().size());
+    coordinator.run();
+    coordinator.shutdown();
+    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
+    assertEquals(1, coordinator.getRunning().size());
+
+    Map<ExternalCompactionId,RunningCompaction> running = 
coordinator.getRunning();
+    Entry<ExternalCompactionId,RunningCompaction> ecomp = 
running.entrySet().iterator().next();
+    assertEquals(eci, ecomp.getKey());
+    RunningCompaction rc = ecomp.getValue();
+    assertEquals(GROUP_ID.toString(), rc.getGroupName());
+    assertEquals(tserverAddress.toString(), rc.getCompactorAddress());
+
+    EasyMock.verify(context, job, security);
+  }
+
+  @Test
+  public void testGetCompactionJob() throws Exception {
+
+    TableConfiguration tconf = 
EasyMock.createNiceMock(TableConfiguration.class);
+    expect(tconf.get(Property.TABLE_COMPACTION_CONFIGURER))
+        
.andReturn(Property.TABLE_COMPACTION_CONFIGURER.getDefaultValue()).anyTimes();
+
+    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
+    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
+    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+    
expect(context.getTableConfiguration(TableId.of("2a"))).andReturn(tconf).anyTimes();
+
+    TCredentials creds = EasyMock.createNiceMock(TCredentials.class);
+    expect(context.rpcCreds()).andReturn(creds).anyTimes();
+
+    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
+    expect(security.canPerformSystemActions(creds)).andReturn(true).anyTimes();
+
+    KeyExtent ke = new KeyExtent(TableId.of("2a"), new Text("z"), new 
Text("b"));
+    TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class);
+    expect(tm.getExtent()).andReturn(ke).anyTimes();
+    expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes();
+
+    EasyMock.replay(tconf, context, creds, tm, security);
+
+    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
+    assertEquals(0, coordinator.getRunning().size());
+    // Use coordinator.run() to populate the internal data structures. This is 
tested in a different
+    // test.
+    coordinator.run();
+    coordinator.shutdown();
+
+    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
+    assertEquals(0, coordinator.getRunning().size());
+
+    // Add a job to the job queue
+    CompactionJob job = new CompactionJobImpl((short) 1, GROUP_ID, 
Collections.emptyList(),
+        CompactionKind.SYSTEM, Optional.of(true));
+    coordinator.addJobs(tm, Collections.singleton(job));
+    CompactionJobPriorityQueue queue = 
coordinator.getJobQueues().getQueue(GROUP_ID);
+    assertEquals(1, queue.getQueuedJobs());
+
+    // Get the next job
+    ExternalCompactionId eci = 
ExternalCompactionId.generate(UUID.randomUUID());
+    TExternalCompactionJob createdJob = coordinator.getCompactionJob(new 
TInfo(), creds,
+        GROUP_ID.toString(), "localhost:10241", eci.toString());
+    assertEquals(eci.toString(), createdJob.getExternalCompactionId());
+    assertEquals(ke, KeyExtent.fromThrift(createdJob.getExtent()));
+
+    assertEquals(0, coordinator.getJobQueues().getQueuedJobCount());
+    assertEquals(1, coordinator.getRunning().size());
+    Entry<ExternalCompactionId,RunningCompaction> entry =
+        coordinator.getRunning().entrySet().iterator().next();
+    assertEquals(eci.toString(), entry.getKey().toString());
+    assertEquals("localhost:10241", entry.getValue().getCompactorAddress());
+    assertEquals(eci.toString(), 
entry.getValue().getJob().getExternalCompactionId());
+
+    EasyMock.verify(tconf, context, creds, tm, security);
+  }
+
+  @Test
+  public void testGetCompactionJobNoJobs() throws Exception {
+
+    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
+    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
+    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+
+    TCredentials creds = EasyMock.createNiceMock(TCredentials.class);
+
+    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
+    expect(security.canPerformSystemActions(creds)).andReturn(true);
+
+    EasyMock.replay(context, creds, security);
+
+    var coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+    TExternalCompactionJob job = 
coordinator.getCompactionJob(TraceUtil.traceInfo(), creds,
+        GROUP_ID.toString(), "localhost:10240", UUID.randomUUID().toString());
+    assertNull(job.getExternalCompactionId());
+
+    EasyMock.verify(context, creds, security);
+  }
+
+  @Test
+  public void testCleanUpRunning() throws Exception {
+
+    ServerContext context = EasyMock.createNiceMock(ServerContext.class);
+    expect(context.getCaches()).andReturn(Caches.getInstance()).anyTimes();
+    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+
+    TCredentials creds = EasyMock.createNiceMock(TCredentials.class);
+
+    AuditedSecurityOperation security = 
EasyMock.createNiceMock(AuditedSecurityOperation.class);
+
+    EasyMock.replay(context, creds, security);
+
+    TestCoordinator coordinator = new TestCoordinator(context, security, new 
ArrayList<>());
+
+    var ecid1 = ExternalCompactionId.generate(UUID.randomUUID());
+    var ecid2 = ExternalCompactionId.generate(UUID.randomUUID());
+    var ecid3 = ExternalCompactionId.generate(UUID.randomUUID());
+
+    coordinator.getRunning().put(ecid1, new RunningCompaction(new 
TExternalCompaction()));
+    coordinator.getRunning().put(ecid2, new RunningCompaction(new 
TExternalCompaction()));
+    coordinator.getRunning().put(ecid3, new RunningCompaction(new 
TExternalCompaction()));
+
+    coordinator.cleanUpRunning();
+
+    assertEquals(Set.of(ecid1, ecid2, ecid3), 
coordinator.getRunning().keySet());
+
+    coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2));
+
+    coordinator.cleanUpRunning();
+
+    assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet());
+
+    EasyMock.verify(context, creds, security);
+
+  }
 }

Reply via email to