This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new ff951d7f99 Add compaction IT that verifies queues are cleared when 
tablets no longer need to compact (#4466)
ff951d7f99 is described below

commit ff951d7f99000c0a4e8303408b3233cab168e634
Author: Dom G <domgargu...@apache.org>
AuthorDate: Wed May 15 09:32:15 2024 -0400

    Add compaction IT that verifies queues are cleared when tablets no longer 
need to compact (#4466)
    
    * Add compaction IT that verifies queues are cleared when tablets no longer 
need to compact
---
 .../CompactionPriorityQueueMetricsIT.java          | 129 ++++++++++++++++-----
 .../test/metrics/TestStatsDRegistryFactory.java    |   4 +-
 2 files changed, 102 insertions(+), 31 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index 800ceffbcd..7af245fde3 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test.compaction;
 
+import static 
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -42,6 +43,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
@@ -72,12 +74,14 @@ import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.test.functional.CompactionIT;
 import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
 import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -103,15 +107,23 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
   public static final String QUEUE1_SERVICE = "Q1";
   public static final int QUEUE1_SIZE = 6;
 
+  // Metrics collector Thread
+  final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+  final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+  Thread metricsTailer;
+
   @BeforeEach
   public void setupMetricsTest() throws Exception {
+    getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+    Wait.waitFor(() -> 
getCompactorAddrs(getCluster().getServerContext()).isEmpty());
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
       tableName = getUniqueNames(1)[0];
 
       Map<String,String> props =
           Map.of("table.compaction.dispatcher", 
SimpleCompactionDispatcher.class.getName(),
               "table.compaction.dispatcher.opts.service", QUEUE1_SERVICE);
-      NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props);
+      NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props)
+          .withInitialTabletAvailability(TabletAvailability.HOSTED);
       c.tableOperations().create(tableName, ntc);
 
       tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
@@ -119,6 +131,28 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
       fs = getCluster().getFileSystem();
       rootPath = getCluster().getTemporaryPath().toString();
     }
+    queueMetrics.clear();
+    shutdownTailer.set(false);
+    metricsTailer = Threads.createThread("metric-tailer", () -> {
+      while (!shutdownTailer.get()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            break;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    metricsTailer.start();
+  }
+
+  @AfterEach
+  public void teardownMetricsTest() throws Exception {
+    shutdownTailer.set(true);
+    metricsTailer.join();
   }
 
   private String getDir(String testName) throws Exception {
@@ -254,24 +288,6 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
 
   @Test
   public void testQueueMetrics() throws Exception {
-    // Metrics collector Thread
-    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
-    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
-
-    Thread thread = Threads.createThread("metric-tailer", () -> {
-      while (!shutdownTailer.get()) {
-        List<String> statsDMetrics = sink.getLines();
-        for (String s : statsDMetrics) {
-          if (shutdownTailer.get()) {
-            break;
-          }
-          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
-            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
-          }
-        }
-      }
-    });
-    thread.start();
 
     long highestFileCount = 0L;
     ServerContext context = getCluster().getServerContext();
@@ -282,11 +298,8 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
       fs.mkdirs(new Path(dir));
 
       // Create splits so there are two groupings of tablets with similar file 
counts.
-      List<String> splitPoints =
-          List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", 
"9000");
-      for (String splitPoint : splitPoints) {
-        addSplits(c, tableName, splitPoint);
-      }
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
 
       for (int i = 0; i < 100; i++) {
         writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
@@ -322,9 +335,8 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
           }
         }
       }
-      // Current poll rate of the TestStatsDRegistryFactory is 3 seconds
       // If metrics are not found in the queue, sleep until the next poll.
-      UtilWaitThread.sleep(3500);
+      
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
     }
 
     // Set lowest priority to the lowest possible system compaction priority
@@ -380,7 +392,7 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
     boolean emptyQueue = false;
 
     // Make sure that metrics added to the queue are recent
-    UtilWaitThread.sleep(3500);
+    
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
 
     while (!emptyQueue) {
       while (!queueMetrics.isEmpty()) {
@@ -401,10 +413,67 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
           }
         }
       }
-      UtilWaitThread.sleep(3500);
+      
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
+    }
+  }
+
+  /**
+   * Test that the compaction queue is cleared when compactions no longer need 
to happen.
+   */
+  @Test
+  public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+      addSplits(c, tableName, splitString);
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+      verifyData(c, tableName, 0, 100 * 100 - 1, false);
     }
 
-    shutdownTailer.set(true);
-    thread.join();
+    final long sleepMillis = 
TestStatsDRegistryFactory.pollingFrequency.toMillis();
+
+    // wait for compaction jobs to be queued
+    Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis,
+        "Expected to see compaction jobs queued");
+
+    // change compactor settings so that compactions no longer need to run
+    context.tableOperations().setProperty(tableName, 
Property.TABLE_MAJC_RATIO.getKey(), "2000");
+    context.tableOperations().setProperty(tableName, 
Property.TABLE_FILE_MAX.getKey(), "2000");
+
+    // wait for queue to clear
+    Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis,
+        "Expected job queue to be cleared once compactions no longer need to 
happen");
   }
+
+  /**
+   * @return the number of jobs queued in the compaction queue. Returns -1 if 
no metrics are found.
+   */
+  private int getJobsQueued() throws InterruptedException {
+    Integer jobsQueued = null;
+    while (!queueMetrics.isEmpty()) {
+      var metric = queueMetrics.take();
+      if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        jobsQueued = Integer.parseInt(metric.getValue());
+      }
+    }
+    if (jobsQueued == null) {
+      log.warn("No compaction job queue metrics found.");
+      return -1;
+    }
+    log.info("Jobs Queued: {}", jobsQueued);
+    return jobsQueued;
+  }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
 
b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
index 8715a40c00..4c2261d6a2 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
@@ -37,6 +37,8 @@ public class TestStatsDRegistryFactory implements 
MeterRegistryFactory {
   public static final String SERVER_HOST = "test.meter.registry.host";
   public static final String SERVER_PORT = "test.meter.registry.port";
 
+  public static final Duration pollingFrequency = Duration.ofSeconds(3);
+
   @Override
   public MeterRegistry create(final InitParameters params) {
     LOG.info("starting metrics registration.");
@@ -77,7 +79,7 @@ public class TestStatsDRegistryFactory implements 
MeterRegistryFactory {
 
       @Override
       public Duration pollingFrequency() {
-        return Duration.ofSeconds(3);
+        return pollingFrequency;
       }
 
       @Override

Reply via email to