This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 1aa9a1b370 Modified InstanceOperations.getActiveCompactions for 
Compactors (#4845)
1aa9a1b370 is described below

commit 1aa9a1b3704a036c5aa95562f453eb3899659c27
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Aug 29 11:46:15 2024 -0400

    Modified InstanceOperations.getActiveCompactions for Compactors (#4845)
    
    Modified InstanceOperations.getActiveCompactions to handle addresses
    for Compactors in addition to TabletServers. Added InstanceOperations
    method to get Compactor addresses, it was missing.
---
 .../core/client/admin/InstanceOperations.java      | 19 +++--
 .../accumulo/core/clientImpl/ClientContext.java    |  9 +++
 .../core/clientImpl/InstanceOperationsImpl.java    | 46 ++++++++---
 .../accumulo/core/clientImpl/TabletLocator.java    |  6 +-
 .../core/clientImpl/TabletServerBatchWriter.java   |  6 +-
 .../core/clientImpl/ZookeeperLockChecker.java      |  5 ++
 .../shell/commands/ListCompactionsCommand.java     |  3 +-
 .../test/compaction/ExternalCompaction_1_IT.java   | 91 ++++++++++++++++++++++
 .../accumulo/test/functional/CompactionIT.java     | 82 +++++++++++++++++++
 9 files changed, 240 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
 
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index 2e1c0c92ae..033dacbe94 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@ -164,6 +164,14 @@ public interface InstanceOperations {
    */
   List<String> getManagerLocations();
 
+  /**
+   * Returns the locations of the active compactors
+   *
+   * @return A set of currently active compactors.
+   * @since 2.1.4
+   */
+  Set<String> getCompactors();
+
   /**
    * Returns the locations of the active scan servers
    *
@@ -189,13 +197,12 @@ public interface InstanceOperations {
       throws AccumuloException, AccumuloSecurityException;
 
   /**
-   * List the active compaction running on a tablet server. Using this method 
with
-   * {@link #getTabletServers()} will only show compactions running on 
tservers, leaving out any
-   * external compactions running on compactors. Use {@link 
#getActiveCompactions()} to get a list
-   * of all compactions running on tservers and compactors.
+   * List the active compaction running on a TabletServer or Compactor. The 
server address can be
+   * retrieved using {@link #getCompactors()} or {@link #getTabletServers()}. 
Use
+   * {@link #getActiveCompactions()} to get a list of all compactions running 
on tservers and
+   * compactors. Implementation updated in 2.1.4 to accept a compactor address.
    *
-   * @param tserver The tablet server address. This should be of the form
-   *        {@code <ip address>:<port>}
+   * @param tserver The server address. This should be of the form {@code <ip 
address>:<port>}
    * @return the list of active compactions
    * @since 1.5.0
    */
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index b6c30891b6..03bd7b66bc 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -142,6 +142,7 @@ public class ClientContext implements AccumuloClient {
   private final Supplier<ScanServerSelector> scanServerSelectorSupplier;
   private TCredentials rpcCreds;
   private ThriftTransportPool thriftTransportPool;
+  private ZookeeperLockChecker zkLockChecker;
 
   private volatile boolean closed = false;
 
@@ -1124,4 +1125,12 @@ public class ClientContext implements AccumuloClient {
     return thriftTransportPool;
   }
 
+  public synchronized ZookeeperLockChecker getTServerLockChecker() {
+    ensureOpen();
+    if (this.zkLockChecker == null) {
+      this.zkLockChecker = new ZookeeperLockChecker(this);
+    }
+    return this.zkLockChecker;
+  }
+
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index 8c73dab8e5..ef316defd9 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -33,6 +33,7 @@ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -211,6 +212,15 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
     return context.getManagerLocations();
   }
 
+  @Override
+  public Set<String> getCompactors() {
+    Set<String> compactors = new HashSet<>();
+    ExternalCompactionUtil.getCompactorAddrs(context).values().forEach(addrs 
-> {
+      addrs.forEach(hp -> compactors.add(hp.toString()));
+    });
+    return compactors;
+  }
+
   @Override
   public Set<String> getScanServers() {
     return Set.copyOf(context.getScanServers().keySet());
@@ -271,26 +281,37 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
   }
 
   @Override
-  public List<ActiveCompaction> getActiveCompactions(String tserver)
+  public List<ActiveCompaction> getActiveCompactions(String server)
       throws AccumuloException, AccumuloSecurityException {
-    final var parsedTserver = HostAndPort.fromString(tserver);
-    Client client = null;
-    try {
-      client = getClient(ThriftClientTypes.TABLET_SERVER, parsedTserver, 
context);
+    final var serverHostAndPort = HostAndPort.fromString(server);
 
-      List<ActiveCompaction> as = new ArrayList<>();
-      for (var tac : client.getActiveCompactions(TraceUtil.traceInfo(), 
context.rpcCreds())) {
-        as.add(new ActiveCompactionImpl(context, tac, parsedTserver, 
CompactionHost.Type.TSERVER));
+    final List<ActiveCompaction> as = new ArrayList<>();
+    try {
+      if (context.getTServerLockChecker().doesTabletServerLockExist(server)) {
+        Client client = null;
+        try {
+          client = getClient(ThriftClientTypes.TABLET_SERVER, 
serverHostAndPort, context);
+          for (var tac : client.getActiveCompactions(TraceUtil.traceInfo(), 
context.rpcCreds())) {
+            as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort,
+                CompactionHost.Type.TSERVER));
+          }
+        } finally {
+          if (client != null) {
+            returnClient(client, context);
+          }
+        }
+      } else {
+        // if not a TabletServer address, maybe it's a Compactor
+        for (var tac : 
ExternalCompactionUtil.getActiveCompaction(serverHostAndPort, context)) {
+          as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort,
+              CompactionHost.Type.COMPACTOR));
+        }
       }
       return as;
     } catch (ThriftSecurityException e) {
       throw new AccumuloSecurityException(e.user, e.code, e);
     } catch (TException e) {
       throw new AccumuloException(e);
-    } finally {
-      if (client != null) {
-        returnClient(client, context);
-      }
     }
   }
 
@@ -392,4 +413,5 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
   public InstanceId getInstanceId() {
     return context.getInstanceID();
   }
+
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
index 2f8e31b8ac..c4c1dcdcd9 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
@@ -141,13 +141,13 @@ public abstract class TabletLocator {
       MetadataLocationObtainer mlo = new MetadataLocationObtainer();
 
       if (RootTable.ID.equals(tableId)) {
-        tl = new RootTabletLocator(new ZookeeperLockChecker(context));
+        tl = new RootTabletLocator(context.getTServerLockChecker());
       } else if (MetadataTable.ID.equals(tableId)) {
         tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context, 
RootTable.ID), mlo,
-            new ZookeeperLockChecker(context));
+            context.getTServerLockChecker());
       } else {
         tl = new TabletLocatorImpl(tableId, getLocator(context, 
MetadataTable.ID), mlo,
-            new ZookeeperLockChecker(context));
+            context.getTServerLockChecker());
       }
       locators.put(key, tl);
     }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 2ee17ee834..46fce95cfc 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -73,7 +72,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 import org.apache.accumulo.core.dataImpl.thrift.TMutation;
 import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
-import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
@@ -1088,9 +1086,7 @@ public class TabletServerBatchWriter implements 
AutoCloseable {
        * Checks if there is a lock held by a tserver at a specific host and 
port.
        */
       private boolean isALockHeld(String tserver) {
-        var root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
-        var zLockPath = ServiceLock.path(root + "/" + tserver);
-        return ServiceLock.getSessionId(context.getZooCache(), zLockPath) != 0;
+        return 
context.getTServerLockChecker().doesTabletServerLockExist(tserver);
       }
 
       private void cancelSession() throws InterruptedException, 
ThriftSecurityException {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
index 23f036b8f0..8d99d36a30 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java
@@ -33,6 +33,11 @@ public class ZookeeperLockChecker implements 
TabletServerLockChecker {
     this.root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
   }
 
+  public boolean doesTabletServerLockExist(String server) {
+    var zLockPath = ServiceLock.path(root + "/" + server);
+    return ServiceLock.getSessionId(zc, zLockPath) != 0;
+  }
+
   @Override
   public boolean isLockHeld(String tserver, String session) {
     var zLockPath = ServiceLock.path(root + "/" + tserver);
diff --git 
a/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java
 
b/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java
index 7dcba79f7c..7c5f32cc48 100644
--- 
a/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java
+++ 
b/shell/src/main/java/org/apache/accumulo/shell/commands/ListCompactionsCommand.java
@@ -85,7 +85,8 @@ public class ListCompactionsCommand extends Command {
     filterOption = new Option("f", "filter", true, "show only compactions that 
match the regex");
     opts.addOption(filterOption);
 
-    tserverOption = new Option("ts", "tabletServer", true, "tablet server to 
list compactions for");
+    tserverOption = new Option("ts", "tabletServer", true,
+        "tablet server or compactor to list compactions for");
     tserverOption.setArgName("tablet server");
     opts.addOption(tserverOption);
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 3f1b63c5d2..fd50fd199e 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -48,7 +48,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -57,9 +59,14 @@ import 
org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
 import org.apache.accumulo.coordinator.CompactionCoordinator;
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.ActiveCompaction;
+import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.PluginConfig;
@@ -84,12 +91,14 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterEach;
@@ -531,4 +540,86 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
 
   }
 
+  @Test
+  public void testGetActiveCompactions() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE8);
+      
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+
+      createTable(client, table1, "cs8");
+
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= MAX_DATA; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          // flush often to create multiple files to compact
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      final AtomicReference<Exception> error = new AtomicReference<>();
+      final CountDownLatch started = new CountDownLatch(1);
+      Thread t = new Thread(() -> {
+        try {
+          IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+          setting.addOption("sleepTime", "3000");
+          setting.addOption("seekSleepTime", "3000");
+          client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+          started.countDown();
+          client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+          error.set(e);
+        }
+      });
+      t.start();
+
+      started.await();
+
+      List<ActiveCompaction> compactions = new ArrayList<>();
+      do {
+        client.instanceOperations().getActiveCompactions().forEach((ac) -> {
+          try {
+            if (ac.getTable().equals(table1)) {
+              compactions.add(ac);
+            }
+          } catch (TableNotFoundException e1) {
+            fail("Table was deleted during test, should not happen");
+          }
+        });
+        Thread.sleep(1000);
+      } while (compactions.isEmpty());
+
+      ActiveCompaction running1 = compactions.get(0);
+      CompactionHost host = running1.getHost();
+      assertTrue(host.getType() == CompactionHost.Type.COMPACTOR);
+
+      compactions.clear();
+      do {
+        HostAndPort hp = HostAndPort.fromParts(host.getAddress(), 
host.getPort());
+        
client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) -> 
{
+          try {
+            if (ac.getTable().equals(table1)) {
+              compactions.add(ac);
+            }
+          } catch (TableNotFoundException e1) {
+            fail("Table was deleted during test, should not happen");
+          }
+        });
+        Thread.sleep(1000);
+      } while (compactions.isEmpty());
+
+      ActiveCompaction running2 = compactions.get(0);
+      assertEquals(running1.getInputFiles(), running2.getInputFiles());
+      assertEquals(running1.getOutputFile(), running2.getOutputFile());
+      assertEquals(running1.getTablet(), running2.getTablet());
+
+      client.tableOperations().cancelCompaction(table1);
+      t.join();
+    }
+  }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 46aa609e2c..56e57bac33 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -36,6 +37,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +51,8 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.ActiveCompaction;
+import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.PluginConfig;
@@ -80,6 +84,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.VerifyIngest;
@@ -715,6 +720,83 @@ public class CompactionIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void testGetActiveCompactions() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      try (BatchWriter bw = client.createBatchWriter(table1)) {
+        for (int i = 1; i <= MAX_DATA; i++) {
+          Mutation m = new Mutation(Integer.toString(i));
+          m.put("cf", "cq", new Value());
+          bw.addMutation(m);
+          bw.flush();
+          // flush often to create multiple files to compact
+          client.tableOperations().flush(table1, null, null, true);
+        }
+      }
+
+      final AtomicReference<Exception> error = new AtomicReference<>();
+      final CountDownLatch started = new CountDownLatch(1);
+      Thread t = new Thread(() -> {
+        try {
+          IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+          setting.addOption("sleepTime", "3000");
+          setting.addOption("seekSleepTime", "3000");
+          client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+          started.countDown();
+          client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+        } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+          error.set(e);
+        }
+      });
+      t.start();
+
+      started.await();
+
+      List<ActiveCompaction> compactions = new ArrayList<>();
+      do {
+        client.instanceOperations().getActiveCompactions().forEach((ac) -> {
+          try {
+            if (ac.getTable().equals(table1)) {
+              compactions.add(ac);
+            }
+          } catch (TableNotFoundException e1) {
+            fail("Table was deleted during test, should not happen");
+          }
+        });
+        Thread.sleep(1000);
+      } while (compactions.isEmpty());
+
+      ActiveCompaction running1 = compactions.get(0);
+      CompactionHost host = running1.getHost();
+      assertTrue(host.getType() == CompactionHost.Type.TSERVER);
+
+      compactions.clear();
+      do {
+        HostAndPort hp = HostAndPort.fromParts(host.getAddress(), 
host.getPort());
+        
client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) -> 
{
+          try {
+            if (ac.getTable().equals(table1)) {
+              compactions.add(ac);
+            }
+          } catch (TableNotFoundException e1) {
+            fail("Table was deleted during test, should not happen");
+          }
+        });
+        Thread.sleep(1000);
+      } while (compactions.isEmpty());
+
+      ActiveCompaction running2 = compactions.get(0);
+      assertEquals(running1.getInputFiles(), running2.getInputFiles());
+      assertEquals(running1.getOutputFile(), running2.getOutputFile());
+      assertEquals(running1.getTablet(), running2.getTablet());
+
+      client.tableOperations().cancelCompaction(table1);
+      t.join();
+    }
+  }
+
   /**
    * Counts the number of tablets and files in a table.
    */

Reply via email to