This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 69a62fa55007dd6f9f6181f6b9a5f855f91e6937
Merge: f4a857e1fa 000855d4c1
Author: Dom Garguilo <domgargu...@apache.org>
AuthorDate: Thu Jun 6 12:36:02 2024 -0400

    Merge remote-tracking branch 'upstream/main' into elasticity
    
    # Conflicts:
    #       
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
    #       
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java

 .../accumulo/core/metrics/MetricsProducer.java     | 27 +++++++-
 .../org/apache/accumulo/compactor/Compactor.java   |  8 +++
 .../compaction/ExternalCompactionProgressIT.java   | 81 +++++++++++++---------
 3 files changed, 84 insertions(+), 32 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 8e0c0a8d58,0f8e607ee6..933c651297
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@@ -61,46 -62,27 +62,69 @@@ import io.micrometer.core.instrument.Me
   * <tr>
   * <td>N/A</td>
   * <td>N/A</td>
 + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUES}</td>
 + * <td>Gauge</td>
 + * <td></td>
 + * </tr>
 + * <tr>
 + * <td>N/A</td>
 + * <td>N/A</td>
 + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH}</td>
 + * <td>Gauge</td>
 + * <td></td>
 + * </tr>
 + * <tr>
 + * <td>N/A</td>
 + * <td>N/A</td>
 + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY}</td>
 + * <td>Gauge</td>
 + * <td></td>
 + * </tr>
 + * <tr>
 + * <td>N/A</td>
 + * <td>N/A</td>
 + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED}</td>
 + * <td>Gauge</td>
 + * <td></td>
 + * </tr>
 + * <tr>
 + * <td>N/A</td>
 + * <td>N/A</td>
 + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED}</td>
 + * <td>Gauge</td>
 + * <td></td>
 + * </tr>
 + * <tr>
 + * <td>N/A</td>
 + * <td>N/A</td>
 + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED}</td>
 + * <td>Gauge</td>
 + * <td></td>
 + * </tr>
 + * <tr>
++ * <td>N/A</td>
++ * <td>N/A</td>
+  * <td>{@value #METRICS_COMPACTOR_ENTRIES_READ}</td>
+  * <td>FunctionCounter</td>
+  * <td>Number of entries read by all threads performing compactions</td>
+  * </tr>
+  * <tr>
+  * <td>N/A</td>
+  * <td>N/A</td>
+  * <td>{@value #METRICS_COMPACTOR_ENTRIES_WRITTEN}</td>
+  * <td>FunctionCounter</td>
+  * <td>Number of entries written by all threads performing compactions</td>
+  * </tr>
+  * <tr>
+  * <td>N/A</td>
+  * <td>N/A</td>
+  * <td>{@value #METRICS_COMPACTOR_BUSY}</td>
+  * <td>Gauge</td>
+  * <td>Indicates if the compactor is busy or not. The value will be 0 when 
idle and 1 when
+  * busy.</td>
+  * </tr>
+  * <!-- fate -->
+  * <tr>
   * <td>currentFateOps</td>
   * <td>Gauge</td>
   * <td>{@value #METRICS_FATE_OPS}</td>
@@@ -624,23 -591,11 +648,24 @@@ public interface MetricsProducer 
    Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
  
    String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
 +  String METRICS_SERVER_IDLE = "accumulo.server.idle";
 +
    String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
    String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
 +  String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue.";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = 
METRICS_COMPACTOR_QUEUE_PREFIX + "count";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = 
METRICS_COMPACTOR_QUEUE_PREFIX + "length";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority";
    String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
    String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + 
"entries.written";
+   String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy";
  
    String METRICS_FATE_PREFIX = "accumulo.fate.";
    String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index b9b41e0833,a298ae22cd..f7fefbc1f4
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@@ -34,11 -34,13 +34,12 @@@ import java.util.EnumSet
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
 +import java.util.Optional;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
  
 -import org.apache.accumulo.compactor.Compactor;
 -import org.apache.accumulo.coordinator.CompactionCoordinator;
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.IteratorSetting;
@@@ -125,17 -123,23 +127,20 @@@ public class ExternalCompactionProgress
    public void testProgressViaMetrics() throws Exception {
      String table = this.getUniqueNames(1)[0];
  
+     final AtomicLong totalEntriesRead = new AtomicLong(0);
+     final AtomicLong totalEntriesWritten = new AtomicLong(0);
+     final AtomicInteger compactorBusy = new AtomicInteger(-1);
 -    final long expectedEntriesRead = 9216;
 -    final long expectedEntriesWritten = 4096;
++    final long expectedEntriesRead = 18432;
++    final long expectedEntriesWritten = 13312;
+ 
+     Thread checkerThread =
+         getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, 
compactorBusy);
+ 
      try (AccumuloClient client =
          
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
        createTable(client, table, "cs1");
        writeData(client, table, ROWS);
  
-       final long expectedEntriesRead = 18432;
-       final long expectedEntriesWritten = 13312;
-       final AtomicLong totalEntriesRead = new AtomicLong(0);
-       final AtomicLong totalEntriesWritten = new AtomicLong(0);
 -      cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
 -      
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
--
-       Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, 
totalEntriesWritten);
        checkerThread.start();
  
        IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
@@@ -143,9 -147,14 +148,14 @@@
        client.tableOperations().attachIterator(table, setting,
            EnumSet.of(IteratorUtil.IteratorScope.majc));
        log.info("Compacting table");
-       compact(client, table, 2, GROUP1, true);
-       log.info("Done Compacting table");
-       verify(client, table, 2, ROWS);
+ 
+       Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+           "Compactor busy metric should be false initially");
+ 
 -      compact(client, table, 2, QUEUE1, false);
++      compact(client, table, 2, GROUP1, false);
+ 
+       Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+           "Compactor busy metric should be true after starting compaction");
  
        Wait.waitFor(() -> {
          if (totalEntriesRead.get() == expectedEntriesRead
@@@ -157,10 -166,19 +167,17 @@@
              expectedEntriesRead, totalEntriesRead.get(), 
expectedEntriesWritten,
              totalEntriesWritten.get());
          return false;
-       }, 30000, 3000, "Entries read and written metrics values did not match 
expected values");
+       }, 30_000, CHECKER_THREAD_SLEEP_MS,
+           "Entries read and written metrics values did not match expected 
values");
+ 
+       log.info("Done Compacting table");
+       verify(client, table, 2, ROWS);
  
+       Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, 
CHECKER_THREAD_SLEEP_MS,
+           "Compactor busy metric should be false once compaction completes");
+     } finally {
        stopCheckerThread.set(true);
        checkerThread.join();
 -      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
 -      
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
      }
    }
  

Reply via email to