This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 55c0d4ccee Improvements to ExternalCompactionsIT (#5297)
55c0d4ccee is described below

commit 55c0d4ccee250b8ee8596d073eafeac2679febc3
Author: Dom G. <domgargu...@apache.org>
AuthorDate: Tue Feb 4 10:18:16 2025 -0500

    Improvements to ExternalCompactionsIT (#5297)
---
 .../compaction/ExternalCompactionTestUtils.java    |  6 ++++
 .../test/compaction/ExternalCompaction_1_IT.java   | 39 ++++++++--------------
 .../test/compaction/ExternalCompaction_2_IT.java   | 22 ++++++------
 3 files changed, 30 insertions(+), 37 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index a0b470cadf..1b6981f679 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -99,6 +99,12 @@ public class ExternalCompactionTestUtils {
         .filter(state -> state.getExtent().tableId().equals(tid));
   }
 
+  public static long getFinalStateForTableCount(AccumuloCluster cluster, 
TableId tid) {
+    try (var finalStatesForTable = getFinalStatesForTable(cluster, tid)) {
+      return finalStatesForTable.count();
+    }
+  }
+
   public static void compact(final AccumuloClient client, String table1, int 
modulus,
       String expectedQueue, boolean wait)
       throws AccumuloSecurityException, TableNotFoundException, 
AccumuloException {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 695c97ebbf..7bf53e10ca 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -31,6 +31,7 @@ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QU
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE8;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStateForTableCount;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify;
@@ -56,7 +57,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.accumulo.compactor.Compactor;
 import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
@@ -90,20 +90,19 @@ import 
org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
-import 
org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterEach;
@@ -245,12 +244,7 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
       getCluster().getClusterControl().stop(ServerType.COMPACTOR);
 
       // DeadCompactionDetector in the CompactionCoordinator should fail the 
compaction.
-      long count = 0;
-      while (count == 0) {
-        count = getFinalStatesForTable(getCluster(), tid)
-            .filter(state -> 
state.getFinalState().equals(FinalState.FAILED)).count();
-        UtilWaitThread.sleep(250);
-      }
+      Wait.waitFor(() -> getFinalStateForTableCount(getCluster(), tid) > 0);
 
       // We need to cancel the compaction or delete the table here because we 
initiate a user
       // compaction above in the test. Even though the external compaction was 
cancelled
@@ -411,12 +405,10 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
       // metadata table entries to show up.
       LOG.info("Waiting for external compaction to complete.");
       TableId tid = getCluster().getServerContext().getTableId(table3);
-      Stream<ExternalCompactionFinalState> fs = 
getFinalStatesForTable(getCluster(), tid);
-      while (fs.findAny().isEmpty()) {
+      Wait.waitFor(() -> {
         LOG.info("Waiting for compaction completed marker to appear");
-        UtilWaitThread.sleep(250);
-        fs = getFinalStatesForTable(getCluster(), tid);
-      }
+        return getFinalStateForTableCount(getCluster(), tid) > 0;
+      }, 120_000, 250);
 
       LOG.info("Validating metadata table contents.");
       try (TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets()
@@ -424,11 +416,11 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
         TabletMetadata m = tm.stream().collect(onlyElement());
         Map<ExternalCompactionId,ExternalCompactionMetadata> em = 
m.getExternalCompactions();
         assertEquals(1, em.size());
-        List<ExternalCompactionFinalState> finished = new ArrayList<>();
-        getFinalStatesForTable(getCluster(), tid).forEach(f -> 
finished.add(f));
-        assertEquals(1, finished.size());
-        assertEquals(em.entrySet().iterator().next().getKey(),
-            finished.get(0).getExternalCompactionId());
+        try (var finalStates = getFinalStatesForTable(getCluster(), tid)
+            .map(ExternalCompactionFinalState::getExternalCompactionId)) {
+          ExternalCompactionId actual = finalStates.collect(onlyElement());
+          assertEquals(em.entrySet().iterator().next().getKey(), actual);
+        }
       }
 
       // Force a flush on the metadata table before killing our tserver
@@ -445,13 +437,10 @@ public class ExternalCompaction_1_IT extends 
SharedMiniClusterBase {
       getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
 
       // Wait for the compaction to be committed.
-      LOG.info("Waiting for compaction completed marker to disappear");
-      Stream<ExternalCompactionFinalState> fs2 = 
getFinalStatesForTable(getCluster(), tid);
-      while (fs2.findAny().isPresent()) {
+      Wait.waitFor(() -> {
         LOG.info("Waiting for compaction completed marker to disappear");
-        UtilWaitThread.sleep(500);
-        fs2 = getFinalStatesForTable(getCluster(), tid);
-      }
+        return getFinalStateForTableCount(getCluster(), tid) == 0;
+      }, 120_000, 500);
       verify(client, table3, 2);
 
       // We need to cancel the compaction or delete the table here because we 
initiate a user
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
index 3b902a5eab..b10c372cb3 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
@@ -28,7 +28,7 @@ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.co
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionRunning;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
-import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStatesForTable;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getFinalStateForTableCount;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
@@ -60,6 +60,7 @@ import 
org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterEach;
@@ -165,8 +166,8 @@ public class ExternalCompaction_2_IT extends 
SharedMiniClusterBase {
           .startCoordinator(TestCompactionCoordinatorForOfflineTable.class);
 
       TableId tid = getCluster().getServerContext().getTableId(table1);
-      // Confirm that no final state is in the metadata table
-      assertEquals(0, getFinalStatesForTable(getCluster(), tid).count());
+      assertEquals(0, getFinalStateForTableCount(getCluster(), tid),
+          "Expected no final state is in the metadata table");
 
       // Offline the table when the compaction starts
       final AtomicBoolean succeededInTakingOffline = new AtomicBoolean(false);
@@ -209,20 +210,17 @@ public class ExternalCompaction_2_IT extends 
SharedMiniClusterBase {
       confirmCompactionCompleted(getCluster().getServerContext(), ecids,
           TCompactionState.SUCCEEDED);
 
-      // Confirm that final state is in the metadata table
-      assertEquals(1, getFinalStatesForTable(getCluster(), tid).count());
+      assertEquals(1, getFinalStateForTableCount(getCluster(), tid),
+          "Expected 1 final state in the metadata table");
 
       // Online the table
       client.tableOperations().online(table1);
 
       // wait for compaction to be committed by tserver or test timeout
-      long finalStateCount = getFinalStatesForTable(getCluster(), tid).count();
-      while (finalStateCount > 0) {
-        finalStateCount = getFinalStatesForTable(getCluster(), tid).count();
-        if (finalStateCount > 0) {
-          UtilWaitThread.sleep(50);
-        }
-      }
+      Wait.waitFor(() -> {
+        LOG.info("Waiting for compaction completed marker to disappear");
+        return getFinalStateForTableCount(getCluster(), tid) == 0;
+      }, 120_000, 500);
 
       // We need to cancel the compaction or delete the table here because we 
initiate a user
       // compaction above in the test. Even though the external compaction was 
cancelled

Reply via email to