This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new c5122a3615 Fix FateConcurrencyIT (#4050)
c5122a3615 is described below

commit c5122a3615bd06d5fbd0f6941b2f1553296e324b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Dec 11 10:28:16 2023 -0500

    Fix FateConcurrencyIT (#4050)
    
    SlowOps class was using getActiveCompaction(TabletServer) to check
    that majc was running. However, this only reports minc now as majc
    no longer run in the TabletServer.
---
 .../test/functional/FateConcurrencyIT.java         | 19 ++++++---
 .../org/apache/accumulo/test/util/SlowOps.java     | 46 +++++-----------------
 2 files changed, 22 insertions(+), 43 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 2283132c3f..a9e0a437a8 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
@@ -51,13 +52,15 @@ import 
org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.test.util.SlowOps;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.zookeeper.KeeperException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,7 +76,6 @@ import org.slf4j.LoggerFactory;
  * (original) and additional method without.</li>
  * </ul>
  */
-@Disabled // ELASTICITY_TODO
 public class FateConcurrencyIT extends AccumuloClusterHarness {
 
   private static final Logger log = 
LoggerFactory.getLogger(FateConcurrencyIT.class);
@@ -127,7 +129,6 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
   @Test
   public void changeTableStateTest() throws Exception {
     String tableName = getUniqueNames(1)[0];
-    SlowOps.setExpectedCompactions(client, 1);
     slowOps = new SlowOps(client, tableName, maxWaitMillis);
 
     assertEquals(TableState.ONLINE, getTableState(tableName), "verify table 
online after created");
@@ -219,7 +220,6 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
    */
   @Test
   public void getFateStatus() {
-    SlowOps.setExpectedCompactions(client, 1);
     String tableName = getUniqueNames(1)[0];
     slowOps = new SlowOps(client, tableName, maxWaitMillis);
 
@@ -474,10 +474,14 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
    * valid.
    */
   @Test
-  public void multipleCompactions() {
+  public void multipleCompactions() throws InterruptedException, IOException {
 
     int tableCount = 4;
-    SlowOps.setExpectedCompactions(client, tableCount);
+
+    // Start 4 Compactors for the user_small group
+    MiniAccumuloClusterImpl mini = (MiniAccumuloClusterImpl) getCluster();
+    
mini.getConfig().getClusterServerConfiguration().addCompactorResourceGroup("user_small",
 4);
+    mini.start();
 
     List<SlowOps> tables = Arrays.stream(getUniqueNames(tableCount))
         .map(tableName -> new SlowOps(client, tableName, maxWaitMillis))
@@ -487,6 +491,9 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
     assertEquals(tableCount,
         
tables.stream().map(SlowOps::getTableName).filter(this::findFate).count());
 
+    Wait.waitFor(() -> tableCount
+        == 
ExternalCompactionUtil.getCompactionsRunningOnCompactors((ClientContext) 
client).size());
+
     tables.forEach(t -> {
       try {
         client.tableOperations().cancelCompaction(t.getTableName());
diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java 
b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
index 14c291a053..fb48e21fe3 100644
--- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@ -38,13 +38,15 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.ActiveCompaction;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
-import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -78,20 +80,6 @@ public class SlowOps {
     createData();
   }
 
-  @SuppressWarnings("deprecation")
-  public static void setExpectedCompactions(AccumuloClient client, final int 
numParallelExpected) {
-    final int target = numParallelExpected + 1;
-    try {
-      client.instanceOperations().setProperty(
-          Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(),
-          "[{'name':'any','numThreads':" + target + "}]".replaceAll("'", 
"\""));
-      Thread.sleep(3_000); // give it time to propagate
-    } catch (AccumuloException | AccumuloSecurityException | 
InterruptedException
-        | NumberFormatException ex) {
-      throw new IllegalStateException("Could not set parallel compaction limit 
to " + target, ex);
-    }
-  }
-
   public String getTableName() {
     return tableName;
   }
@@ -234,28 +222,12 @@ public class SlowOps {
      * wait for compaction to start on table - The compaction will acquire a 
fate transaction lock
      * that used to block a subsequent online command while the fate 
transaction lock was held.
      */
+    TableId tableId = 
TableId.of(client.tableOperations().tableIdMap().get(tableName));
     do {
-      List<String> tservers = client.instanceOperations().getTabletServers();
-      boolean tableFound = tservers.stream().flatMap(tserver -> {
-        // get active compactions from each server
-        try {
-          List<ActiveCompaction> ac = 
client.instanceOperations().getActiveCompactions(tserver);
-          log.trace("tserver {}, running compactions {}", tserver, ac.size());
-          return ac.stream();
-        } catch (AccumuloException | AccumuloSecurityException e) {
-          throw new IllegalStateException("failed to get active compactions, 
test fails.", e);
-        }
-      }).map(activeCompaction -> {
-        // emit table being compacted
-        try {
-          String compactionTable = activeCompaction.getTable();
-          log.debug("Compaction running for {}", compactionTable);
-          return compactionTable;
-        } catch (TableNotFoundException ex) {
-          log.trace("Compaction found for unknown table {}", activeCompaction);
-          return null;
-        }
-      }).anyMatch(tableName::equals);
+      boolean tableFound =
+          
ExternalCompactionUtil.getCompactionsRunningOnCompactors((ClientContext) 
client).stream()
+              .map(rc -> 
KeyExtent.fromThrift(rc.getJob().getExtent()).tableId())
+              .anyMatch(tableId::equals);
 
       if (tableFound) {
         return true;

Reply via email to