This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 7cf5c73222 Modified coordinator restart test to test compaction 
duration update (#4766)
7cf5c73222 is described below

commit 7cf5c732229894034ed05e177db587e4bb89c263
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Jul 29 07:59:55 2024 -0400

    Modified coordinator restart test to test compaction duration update (#4766)
    
    Closes #4680
---
 .../coordinator/CompactionCoordinator.java         |   5 +-
 .../test/compaction/ExternalCompaction_3_IT.java   | 115 +++++++++++++++------
 2 files changed, 86 insertions(+), 34 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 355d5a8a5e..34e9d9191b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -148,6 +148,9 @@ public class CompactionCoordinator
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
 
+  public static final String RESTART_UPDATE_MSG =
+      "Coordinator restarted, compaction found in progress";
+
   /*
    * Map of compactionId to RunningCompactions. This is an informational cache 
of what external
    * compactions may be running. Its possible it may contain external 
compactions that are not
@@ -301,7 +304,7 @@ public class CompactionCoordinator
       running.forEach(rc -> {
         TCompactionStatusUpdate update = new TCompactionStatusUpdate();
         update.setState(TCompactionState.IN_PROGRESS);
-        update.setMessage("Coordinator restarted, compaction found in 
progress");
+        update.setMessage(RESTART_UPDATE_MSG);
         rc.addUpdate(System.currentTimeMillis(), update);
         
RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()),
 rc);
       });
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
index b1d7128b59..6d80c970e3 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
@@ -22,7 +22,6 @@ import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.confirmCompactionCompleted;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
-import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getLastState;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.waitForCompactionStartAndReturnEcids;
 import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
@@ -31,29 +30,40 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
+import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
+import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
+import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -76,9 +86,6 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
   @BeforeAll
   public static void beforeTests() throws Exception {
     startMiniClusterWithConfig(new ExternalCompaction3Config());
-    getCluster().getClusterControl().stop(ServerType.COMPACTOR);
-    getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
-        ExternalDoNothingCompactor.class);
   }
 
   @Test
@@ -88,6 +95,10 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
     try (AccumuloClient client =
         Accumulo.newClient().from(getCluster().getClientProperties()).build()) 
{
 
+      getCluster().getClusterControl().stop(ServerType.COMPACTOR);
+      getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
+          ExternalDoNothingCompactor.class);
+
       createTable(client, table1, "cs1", 2);
       // set compaction ratio to 1 so that majc occurs naturally, not user 
compaction
       // user compaction blocks merge
@@ -143,6 +154,10 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
       // Verify that the tmp file are cleaned up
       Wait.waitFor(() -> FindCompactionTmpFiles
           .findTempFiles(getCluster().getServerContext(), 
tid.canonical()).size() == 0);
+    } finally {
+      getCluster().getClusterControl().stop(ServerType.COMPACTOR);
+      getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
     }
   }
 
@@ -155,6 +170,12 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
 
       createTable(client, table1, "cs2", 2);
       writeData(client, table1);
+
+      IteratorSetting setting = new IteratorSetting(50, "slow", 
SlowIterator.class);
+      SlowIterator.setSeekSleepTime(setting, 5000);
+      SlowIterator.setSleepTime(setting, 5000);
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+
       compact(client, table1, 2, GROUP2, false);
 
       TableId tid = getCluster().getServerContext().getTableId(table1);
@@ -163,44 +184,33 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
       Set<ExternalCompactionId> ecids =
           
waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid);
 
+      ServerContext ctx = getCluster().getServerContext();
+
+      // Wait for all compactions to start
+      Map<ExternalCompactionId,RunningCompactionInfo> originalRunningInfo = 
null;
+      do {
+        originalRunningInfo = getRunningCompactionInformation(ctx, ecids);
+      } while (originalRunningInfo == null
+          || originalRunningInfo.values().stream().allMatch(rci -> 
rci.duration == 0));
+
       // Stop the Manager (Coordinator)
       getCluster().getClusterControl().stop(ServerType.MANAGER);
 
       // Restart the Manager while the compaction is running
       getCluster().getClusterControl().start(ServerType.MANAGER);
 
-      ServerContext ctx = getCluster().getServerContext();
+      Map<ExternalCompactionId,RunningCompactionInfo> postRestartRunningInfo =
+          getRunningCompactionInformation(ctx, ecids);
 
-      // Confirm compaction is still running
-      int matches = 0;
-      while (matches == 0) {
-        TExternalCompactionList running = null;
-        while (running == null) {
-          try {
-            Optional<HostAndPort> coordinatorHost =
-                ExternalCompactionUtil.findCompactionCoordinator(ctx);
-            if (coordinatorHost.isEmpty()) {
-              throw new TTransportException(
-                  "Unable to get CompactionCoordinator address from 
ZooKeeper");
-            }
-            running = getRunningCompactions(ctx, coordinatorHost);
-          } catch (TException t) {
-            running = null;
-            Thread.sleep(2000);
-          }
-        }
-        if (running.getCompactions() != null) {
-          for (ExternalCompactionId ecid : ecids) {
-            TExternalCompaction tec = 
running.getCompactions().get(ecid.canonical());
-            if (tec != null && tec.getUpdates() != null && 
!tec.getUpdates().isEmpty()) {
-              matches++;
-              assertEquals(TCompactionState.IN_PROGRESS, getLastState(tec));
-            }
-          }
+      for (Entry<ExternalCompactionId,RunningCompactionInfo> post : 
postRestartRunningInfo
+          .entrySet()) {
+        if (originalRunningInfo.containsKey(post.getKey())) {
+          assertTrue(
+              (post.getValue().duration - 
originalRunningInfo.get(post.getKey()).duration) > 0);
         }
-        UtilWaitThread.sleep(250);
+        final String lastState = post.getValue().status;
+        assertTrue(lastState.equals(TCompactionState.IN_PROGRESS.name()));
       }
-      assertTrue(matches > 0);
 
       // We need to cancel the compaction or delete the table here because we 
initiate a user
       // compaction above in the test. Even though the external compaction was 
cancelled
@@ -210,4 +220,43 @@ public class ExternalCompaction_3_IT extends 
SharedMiniClusterBase {
     }
   }
 
+  private Map<ExternalCompactionId,RunningCompactionInfo> 
getRunningCompactionInformation(
+      ServerContext ctx, Set<ExternalCompactionId> ecids) throws 
InterruptedException {
+
+    final Map<ExternalCompactionId,RunningCompactionInfo> results = new 
HashMap<>();
+
+    while (results.isEmpty()) {
+      TExternalCompactionList running = null;
+      while (running == null || running.getCompactions() == null) {
+        try {
+          Optional<HostAndPort> coordinatorHost =
+              ExternalCompactionUtil.findCompactionCoordinator(ctx);
+          if (coordinatorHost.isEmpty()) {
+            throw new TTransportException(
+                "Unable to get CompactionCoordinator address from ZooKeeper");
+          }
+          running = getRunningCompactions(ctx, coordinatorHost);
+        } catch (TException t) {
+          running = null;
+          Thread.sleep(2000);
+        }
+      }
+      for (ExternalCompactionId ecid : ecids) {
+        final TExternalCompaction tec = 
running.getCompactions().get(ecid.canonical());
+        if (tec != null && tec.getUpdatesSize() > 0) {
+          // When the coordinator restarts it inserts a message into the 
updates. If this
+          // is the last message, then don't insert this into the results. We 
want to get
+          // an actual update from the Compactor.
+          TreeMap<Long,TCompactionStatusUpdate> sorted = new 
TreeMap<>(tec.getUpdates());
+          var lastEntry = sorted.lastEntry();
+          if 
(lastEntry.getValue().getMessage().equals(CompactionCoordinator.RESTART_UPDATE_MSG))
 {
+            continue;
+          }
+          results.put(ecid, new RunningCompactionInfo(tec));
+        }
+      }
+    }
+    return results;
+  }
+
 }

Reply via email to