This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new c5122a3615 Fix FateConcurrencyIT (#4050) c5122a3615 is described below commit c5122a3615bd06d5fbd0f6941b2f1553296e324b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Dec 11 10:28:16 2023 -0500 Fix FateConcurrencyIT (#4050) SlowOps class was using getActiveCompaction(TabletServer) to check that majc was running. However, this only reports minc now as majc no longer run in the TabletServer. --- .../test/functional/FateConcurrencyIT.java | 19 ++++++--- .../org/apache/accumulo/test/util/SlowOps.java | 46 +++++----------------- 2 files changed, 22 insertions(+), 43 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 2283132c3f..a9e0a437a8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -51,13 +52,15 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.test.util.SlowOps; +import org.apache.accumulo.test.util.Wait; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +76,6 @@ import org.slf4j.LoggerFactory; * (original) and additional method without.</li> * </ul> */ -@Disabled // ELASTICITY_TODO public class FateConcurrencyIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(FateConcurrencyIT.class); @@ -127,7 +129,6 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { @Test public void changeTableStateTest() throws Exception { String tableName = getUniqueNames(1)[0]; - SlowOps.setExpectedCompactions(client, 1); slowOps = new SlowOps(client, tableName, maxWaitMillis); assertEquals(TableState.ONLINE, getTableState(tableName), "verify table online after created"); @@ -219,7 +220,6 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { */ @Test public void getFateStatus() { - SlowOps.setExpectedCompactions(client, 1); String tableName = getUniqueNames(1)[0]; slowOps = new SlowOps(client, tableName, maxWaitMillis); @@ -474,10 +474,14 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { * valid. */ @Test - public void multipleCompactions() { + public void multipleCompactions() throws InterruptedException, IOException { int tableCount = 4; - SlowOps.setExpectedCompactions(client, tableCount); + + // Start 4 Compactors for the user_small group + MiniAccumuloClusterImpl mini = (MiniAccumuloClusterImpl) getCluster(); + mini.getConfig().getClusterServerConfiguration().addCompactorResourceGroup("user_small", 4); + mini.start(); List<SlowOps> tables = Arrays.stream(getUniqueNames(tableCount)) .map(tableName -> new SlowOps(client, tableName, maxWaitMillis)) @@ -487,6 +491,9 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { assertEquals(tableCount, tables.stream().map(SlowOps::getTableName).filter(this::findFate).count()); + Wait.waitFor(() -> tableCount + == ExternalCompactionUtil.getCompactionsRunningOnCompactors((ClientContext) client).size()); + tables.forEach(t -> { try { client.tableOperations().cancelCompaction(t.getTableName()); diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java index 14c291a053..fb48e21fe3 100644 --- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java +++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java @@ -38,13 +38,15 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.ActiveCompaction; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsImpl; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -78,20 +80,6 @@ public class SlowOps { createData(); } - @SuppressWarnings("deprecation") - public static void setExpectedCompactions(AccumuloClient client, final int numParallelExpected) { - final int target = numParallelExpected + 1; - try { - client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(), - "[{'name':'any','numThreads':" + target + "}]".replaceAll("'", "\"")); - Thread.sleep(3_000); // give it time to propagate - } catch (AccumuloException | AccumuloSecurityException | InterruptedException - | NumberFormatException ex) { - throw new IllegalStateException("Could not set parallel compaction limit to " + target, ex); - } - } - public String getTableName() { return tableName; } @@ -234,28 +222,12 @@ public class SlowOps { * wait for compaction to start on table - The compaction will acquire a fate transaction lock * that used to block a subsequent online command while the fate transaction lock was held. */ + TableId tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); do { - List<String> tservers = client.instanceOperations().getTabletServers(); - boolean tableFound = tservers.stream().flatMap(tserver -> { - // get active compactions from each server - try { - List<ActiveCompaction> ac = client.instanceOperations().getActiveCompactions(tserver); - log.trace("tserver {}, running compactions {}", tserver, ac.size()); - return ac.stream(); - } catch (AccumuloException | AccumuloSecurityException e) { - throw new IllegalStateException("failed to get active compactions, test fails.", e); - } - }).map(activeCompaction -> { - // emit table being compacted - try { - String compactionTable = activeCompaction.getTable(); - log.debug("Compaction running for {}", compactionTable); - return compactionTable; - } catch (TableNotFoundException ex) { - log.trace("Compaction found for unknown table {}", activeCompaction); - return null; - } - }).anyMatch(tableName::equals); + boolean tableFound = + ExternalCompactionUtil.getCompactionsRunningOnCompactors((ClientContext) client).stream() + .map(rc -> KeyExtent.fromThrift(rc.getJob().getExtent()).tableId()) + .anyMatch(tableId::equals); if (tableFound) { return true;