This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new c5c6d5b588 Improvements and cleanup to ExternalCompaction ITs and 
related code (#3661)
c5c6d5b588 is described below

commit c5c6d5b588b98e1930fd21c3a66bbf2b2fe92f68
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Jul 27 08:38:52 2023 -0400

    Improvements and cleanup to ExternalCompaction ITs and related code (#3661)
    
    Modified ExternalCompactionITs to remove the startup/shutdown of Compactors
    that are defined in the compaction services created by the MAC configuration
    callback. MAC now starts these Compactors based on the compaction service
    configuration. Made some modifications to ExternalCompactionITs to reduce 
the
    load on ZK as some calls where being made to find the compaction coordinator
    address inside of a loop, and that wasn't necessary. Made a modification to 
the
    CompactionCoordinator so that it won't try to update tablet metadata for a 
table
    that has been deleted. Made a modification to the Manager so that it only 
advertises
    the FATE and CompactionCoordinator Thrift service after the Manager has 
started up.
---
 .../java/org/apache/accumulo/manager/Manager.java  | 11 ++-
 .../coordinator/CompactionCoordinator.java         | 13 +++-
 .../compaction/ExternalCompactionProgressIT.java   | 15 +++-
 .../compaction/ExternalCompactionTestUtils.java    | 50 +++++++------
 .../test/compaction/ExternalCompaction_1_IT.java   | 34 ++-------
 .../test/compaction/ExternalCompaction_2_IT.java   | 82 ++++++++++++----------
 .../test/compaction/ExternalCompaction_3_IT.java   | 36 +++++-----
 7 files changed, 118 insertions(+), 123 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 09c673aef0..aa6ef53d19 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1362,8 +1362,8 @@ public class Manager extends AbstractServer
     String address = sa.address.toString();
     UUID uuid = sld.getServerUUID(ThriftService.MANAGER);
     ServiceDescriptors descriptors = new ServiceDescriptors();
-    for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER,
-        ThriftService.COORDINATOR}) {
+    for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, 
ThriftService.COORDINATOR,
+        ThriftService.FATE}) {
       descriptors.addService(new ServiceDescriptor(uuid, svc, address, 
this.getResourceGroup()));
     }
 
@@ -1593,11 +1593,8 @@ public class Manager extends AbstractServer
     UUID zooLockUUID = UUID.randomUUID();
 
     ServiceDescriptors descriptors = new ServiceDescriptors();
-    for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER,
-        ThriftService.COORDINATOR}) {
-      descriptors.addService(
-          new ServiceDescriptor(zooLockUUID, svc, managerClientAddress, 
this.getResourceGroup()));
-    }
+    descriptors.addService(new ServiceDescriptor(zooLockUUID, 
ThriftService.MANAGER,
+        managerClientAddress, this.getResourceGroup()));
 
     ServiceLockData sld = new ServiceLockData(descriptors);
     while (true) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 191f990a17..49a27ef7db 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableDeletedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
@@ -1059,9 +1060,15 @@ public class CompactionCoordinator implements 
CompactionCoordinatorService.Iface
 
     try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
       compactions.forEach((ecid, extent) -> {
-        
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
-            
.requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid)
-            .submit(tabletMetadata -> 
!tabletMetadata.getExternalCompactions().containsKey(ecid));
+        try {
+          ctx.requireNotDeleted(extent.tableId());
+          
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
+              
.requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid)
+              .submit(tabletMetadata -> 
!tabletMetadata.getExternalCompactions().containsKey(ecid));
+        } catch (TableDeletedException e) {
+          LOG.warn("Table {} was deleted, unable to update metadata for 
compaction failure.",
+              extent.tableId());
+        }
       });
 
       tabletsMutator.process().forEach((extent, result) -> {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index b1b3883f96..b77ec14a8e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@ -31,6 +31,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -38,17 +39,22 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.net.HostAndPort;
+
 /**
  * Tests that external compactions report progress from start to finish. To 
prevent flaky test
  * failures, we only measure progress in quarter segments: STARTED, QUARTER, 
HALF, THREE_QUARTERS.
@@ -124,7 +130,14 @@ public class ExternalCompactionProgressIT extends 
AccumuloClusterHarness {
    * Check running compaction progress.
    */
   private void checkRunning() throws TException {
-    var ecList = getRunningCompactions(getCluster().getServerContext());
+
+    ServerContext ctx = getCluster().getServerContext();
+    Optional<HostAndPort> coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(ctx);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
+    }
+
+    var ecList = getRunningCompactions(ctx, coordinatorHost);
     var ecMap = ecList.getCompactions();
     if (ecMap != null) {
       ecMap.forEach((ecid, ec) -> {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index a3be15838b..c9b0108278 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -233,13 +233,8 @@ public class ExternalCompactionTestUtils {
     coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  public static TExternalCompactionList getRunningCompactions(ClientContext 
context)
-      throws TException {
-    Optional<HostAndPort> coordinatorHost =
-        ExternalCompactionUtil.findCompactionCoordinator(context);
-    if (coordinatorHost.isEmpty()) {
-      throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
-    }
+  public static TExternalCompactionList getRunningCompactions(ClientContext 
context,
+      Optional<HostAndPort> coordinatorHost) throws TException {
     CompactionCoordinatorService.Client client =
         ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, 
coordinatorHost.orElseThrow(), context);
     try {
@@ -251,13 +246,8 @@ public class ExternalCompactionTestUtils {
     }
   }
 
-  private static TExternalCompactionList getCompletedCompactions(ClientContext 
context)
-      throws Exception {
-    Optional<HostAndPort> coordinatorHost =
-        ExternalCompactionUtil.findCompactionCoordinator(context);
-    if (coordinatorHost.isEmpty()) {
-      throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
-    }
+  private static TExternalCompactionList getCompletedCompactions(ClientContext 
context,
+      Optional<HostAndPort> coordinatorHost) throws Exception {
     CompactionCoordinatorService.Client client =
         ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, 
coordinatorHost.orElseThrow(), context);
     try {
@@ -309,8 +299,13 @@ public class ExternalCompactionTestUtils {
   public static int confirmCompactionRunning(ServerContext ctx, 
Set<ExternalCompactionId> ecids)
       throws Exception {
     int matches = 0;
+    Optional<HostAndPort> coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(ctx);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
+    }
     while (matches == 0) {
-      TExternalCompactionList running = 
ExternalCompactionTestUtils.getRunningCompactions(ctx);
+      TExternalCompactionList running =
+          ExternalCompactionTestUtils.getRunningCompactions(ctx, 
coordinatorHost);
       if (running.getCompactions() != null) {
         for (ExternalCompactionId ecid : ecids) {
           TExternalCompaction tec = 
running.getCompactions().get(ecid.canonical());
@@ -329,21 +324,24 @@ public class ExternalCompactionTestUtils {
 
   public static void confirmCompactionCompleted(ServerContext ctx, 
Set<ExternalCompactionId> ecids,
       TCompactionState expectedState) throws Exception {
+    Optional<HostAndPort> coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(ctx);
+    if (coordinatorHost.isEmpty()) {
+      throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
+    }
+
     // The running compaction should be removed
-    TExternalCompactionList running = 
ExternalCompactionTestUtils.getRunningCompactions(ctx);
-    while (running.getCompactions() != null) {
-      running = ExternalCompactionTestUtils.getRunningCompactions(ctx);
-      if (running.getCompactions() == null) {
-        UtilWaitThread.sleep(250);
-      }
+    TExternalCompactionList running =
+        ExternalCompactionTestUtils.getRunningCompactions(ctx, 
coordinatorHost);
+    while (running.getCompactions() != null && 
running.getCompactions().keySet().stream()
+        .anyMatch((e) -> ecids.contains(ExternalCompactionId.of(e)))) {
+      running = ExternalCompactionTestUtils.getRunningCompactions(ctx, 
coordinatorHost);
     }
     // The compaction should be in the completed list with the expected state
-    TExternalCompactionList completed = 
ExternalCompactionTestUtils.getCompletedCompactions(ctx);
+    TExternalCompactionList completed =
+        ExternalCompactionTestUtils.getCompletedCompactions(ctx, 
coordinatorHost);
     while (completed.getCompactions() == null) {
-      completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx);
-      if (completed.getCompactions() == null) {
-        UtilWaitThread.sleep(50);
-      }
+      UtilWaitThread.sleep(50);
+      completed = ExternalCompactionTestUtils.getCompletedCompactions(ctx, 
coordinatorHost);
     }
     for (ExternalCompactionId e : ecids) {
       TExternalCompaction tec = completed.getCompactions().get(e.canonical());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 695c87b897..5085c310b4 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -22,7 +22,6 @@ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4;
-import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP5;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
@@ -74,7 +73,6 @@ import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
@@ -95,13 +93,6 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
     startMiniClusterWithConfig(new ExternalCompaction1Config());
   }
 
-  @AfterEach
-  public void tearDown() throws Exception {
-    // The ExternalDoNothingCompactor needs to be restarted between tests
-    getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-    
getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups();
-  }
-
   public static class TestFilter extends Filter {
 
     int modulus = 1;
@@ -159,10 +150,6 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
       writeData(client, table1);
       writeData(client, table2);
 
-      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1,
 1);
-      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP2,
 1);
-      getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
       compact(client, table1, 2, GROUP1, true);
       verify(client, table1, 2);
 
@@ -186,12 +173,7 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
       writeData(client, table1);
       verify(client, table1, 1);
 
-      // ELASTICITY_TODO the compactors started by mini inspecting the config 
were interfering with
-      // starting the ExternalDoNothingCompactor, so killed all compactors. 
This is not the best way
-      // to handle this.
       getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
-
-      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP3,
 1);
       getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
           ExternalDoNothingCompactor.class);
 
@@ -224,6 +206,10 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
     } finally {
       // We stopped the TServer and started our own, restart the original 
TabletServers
       getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+      // Restart the regular compactors
+      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+      getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
     }
 
   }
@@ -238,10 +224,6 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
 
       writeData(client, table1);
 
-      // ELASTICITY_TODO there is already one compactor started by mini based 
on config
-      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP4,
 2);
-      getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
       compact(client, table1, 3, GROUP4, true);
 
       verify(client, table1, 3);
@@ -252,9 +234,6 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
   public void testConfigurer() throws Exception {
     String tableName = this.getUniqueNames(1)[0];
 
-    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP5,
 1);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
 
@@ -328,8 +307,6 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
       createTable(client, table1, "cs6");
       writeData(client, table1);
-      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP6,
 1);
-      getCluster().getClusterControl().start(ServerType.COMPACTOR);
       compact(client, table1, 2, GROUP6, true);
       verify(client, table1, 2);
 
@@ -368,9 +345,6 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
     try (final AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
 
-      
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP8,
 1);
-      getCluster().getClusterControl().start(ServerType.COMPACTOR);
-
       createTable(client, tableName, "cs8");
 
       writeData(client, tableName);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
index c692e7fc86..4d1cf1b730 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.test.compaction;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP4;
-import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP5;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
@@ -31,16 +30,22 @@ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.ro
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Collections;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
@@ -51,11 +56,12 @@ import 
org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
 
 public class ExternalCompaction_2_IT extends SharedMiniClusterBase {
 
@@ -69,22 +75,14 @@ public class ExternalCompaction_2_IT extends 
SharedMiniClusterBase {
   @BeforeAll
   public static void beforeTests() throws Exception {
     startMiniClusterWithConfig(new ExternalCompaction2Config());
-  }
-
-  @AfterEach
-  public void tearDown() throws Exception {
-    // The ExternalDoNothingCompactor needs to be restarted between tests
     getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-    
getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups();
+    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
+        ExternalDoNothingCompactor.class);
   }
 
   @Test
   public void testSplitCancelsExternalCompaction() throws Exception {
 
-    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1,
 1);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
-        ExternalDoNothingCompactor.class);
-
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
@@ -133,10 +131,6 @@ public class ExternalCompaction_2_IT extends 
SharedMiniClusterBase {
   @Test
   public void testUserCompactionCancellation() throws Exception {
 
-    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP3,
 1);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
-        ExternalDoNothingCompactor.class);
-
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
@@ -170,10 +164,6 @@ public class ExternalCompaction_2_IT extends 
SharedMiniClusterBase {
   @Test
   public void testDeleteTableCancelsUserExternalCompaction() throws Exception {
 
-    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP4,
 1);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
-        ExternalDoNothingCompactor.class);
-
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
@@ -202,45 +192,59 @@ public class ExternalCompaction_2_IT extends 
SharedMiniClusterBase {
   @Test
   public void testDeleteTableCancelsExternalCompaction() throws Exception {
 
-    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP5,
 1);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
-        ExternalDoNothingCompactor.class);
-
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
 
       createTable(client, table1, "cs5");
+
+      ServerContext ctx = getCluster().getServerContext();
+      TableId tid = ctx.getTableId(table1);
+
       // set compaction ratio to 1 so that majc occurs naturally, not user 
compaction
       // user compaction blocks delete
       client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.toString(), "1.0");
-      // cause multiple rfiles to be created
-      writeData(client, table1);
-      writeData(client, table1);
-      writeData(client, table1);
-      writeData(client, table1);
-
-      TableId tid = getCluster().getServerContext().getTableId(table1);
 
+      AtomicReference<Throwable> error = new AtomicReference<>();
+      CountDownLatch latch = new CountDownLatch(1);
+      Runnable r = () -> {
+        try {
+          // cause multiple rfiles to be created
+          latch.countDown();
+          writeData(client, table1);
+          writeData(client, table1);
+          writeData(client, table1);
+          writeData(client, table1);
+        } catch (TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
+          error.set(e);
+        }
+      };
+      Thread t = new Thread(r);
+      t.start();
+      latch.await();
       // Wait for the compaction to start by waiting for 1 external compaction 
column
-      Set<ExternalCompactionId> ecids = ExternalCompactionTestUtils
-          
.waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid);
+      Set<ExternalCompactionId> ecids =
+          
ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids(ctx, tid);
 
       // Confirm that this ECID shows up in RUNNING set
-      int matches = ExternalCompactionTestUtils
-          .confirmCompactionRunning(getCluster().getServerContext(), ecids);
+      int matches = ExternalCompactionTestUtils.confirmCompactionRunning(ctx, 
ecids);
       assertTrue(matches > 0);
 
       client.tableOperations().delete(table1);
 
-      confirmCompactionCompleted(getCluster().getServerContext(), ecids,
-          TCompactionState.CANCELLED);
+      LoggerFactory.getLogger(getClass()).debug("Table deleted.");
+
+      confirmCompactionCompleted(ctx, ecids, TCompactionState.CANCELLED);
+
+      LoggerFactory.getLogger(getClass()).debug("Confirmed compaction 
cancelled.");
 
-      TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
-          .fetch(ColumnType.ECOMP).build();
+      TabletsMetadata tm =
+          
ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.PREV_ROW).build();
       assertEquals(0, tm.stream().count());
       tm.close();
 
+      t.join();
+      assertNull(error.get());
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
index d3de651332..52da17e009 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
@@ -18,7 +18,6 @@
  */
 package org.apache.accumulo.test.compaction;
 
-import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
@@ -32,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -47,17 +47,22 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
-import org.junit.jupiter.api.AfterEach;
+import org.apache.thrift.transport.TTransportException;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.net.HostAndPort;
+
 public class ExternalCompaction_3_IT extends SharedMiniClusterBase {
 
   public static class ExternalCompaction3Config implements 
MiniClusterConfigurationCallback {
@@ -70,22 +75,15 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
   @BeforeAll
   public static void beforeTests() throws Exception {
     startMiniClusterWithConfig(new ExternalCompaction3Config());
-  }
-
-  @AfterEach
-  public void tearDown() throws Exception {
-    // The ExternalDoNothingCompactor needs to be restarted between tests
     getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-    
getCluster().getConfig().getClusterServerConfiguration().clearCompactorResourceGroups();
+    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
+        ExternalDoNothingCompactor.class);
   }
 
   @Test
+  @Disabled // ELASTICITY_TODO: Merges are broken currently
   public void testMergeCancelsExternalCompaction() throws Exception {
 
-    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP1,
 1);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
-        ExternalDoNothingCompactor.class);
-
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
@@ -143,10 +141,6 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
   @Test
   public void testCoordinatorRestartsDuringCompaction() throws Exception {
 
-    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(GROUP2,
 1);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
-        ExternalDoNothingCompactor.class);
-
     String table1 = this.getUniqueNames(1)[0];
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
@@ -167,13 +161,21 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
       // Restart the Manager while the compaction is running
       getCluster().getClusterControl().start(ServerType.MANAGER);
 
+      ServerContext ctx = getCluster().getServerContext();
+
       // Confirm compaction is still running
       int matches = 0;
       while (matches == 0) {
         TExternalCompactionList running = null;
         while (running == null) {
           try {
-            running = getRunningCompactions(getCluster().getServerContext());
+            Optional<HostAndPort> coordinatorHost =
+                ExternalCompactionUtil.findCompactionCoordinator(ctx);
+            if (coordinatorHost.isEmpty()) {
+              throw new TTransportException(
+                  "Unable to get CompactionCoordinator address from 
ZooKeeper");
+            }
+            running = getRunningCompactions(ctx, coordinatorHost);
           } catch (TException t) {
             running = null;
             Thread.sleep(2000);

Reply via email to