This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 32bbcbf351da18f590ce99dda64f305e904f1d5e
Merge: 3fcab5e18a 627ed85555
Author: Dom Garguilo <domgargu...@apache.org>
AuthorDate: Thu May 30 11:45:20 2024 -0400

    Merge remote-tracking branch 'upstream/main' into elasticity

 .../accumulo/core/metrics/MetricsProducer.java     |   2 +
 .../accumulo/server/compaction/FileCompactor.java  |  88 +++++++++++++--
 .../org/apache/accumulo/compactor/Compactor.java   |  27 ++++-
 .../tserver/metrics/TabletServerMetrics.java       |  24 ++++
 .../compaction/ExternalCompactionProgressIT.java   | 123 ++++++++++++++++++++-
 5 files changed, 244 insertions(+), 20 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 0ed24a24b4,ca3d90ccf1..8e0c0a8d58
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@@ -624,21 -567,10 +624,23 @@@ public interface MetricsProducer 
    Logger LOG = LoggerFactory.getLogger(MetricsProducer.class);
  
    String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
 +  String METRICS_SERVER_IDLE = "accumulo.server.idle";
 +
    String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
    String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
 +  String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue.";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = 
METRICS_COMPACTOR_QUEUE_PREFIX + "count";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = 
METRICS_COMPACTOR_QUEUE_PREFIX + "length";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected";
 +  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY =
 +      METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority";
+   String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + 
"entries.read";
+   String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + 
"entries.written";
  
    String METRICS_FATE_PREFIX = "accumulo.fate.";
    String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in.progress.by.type";
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 41ff9f5f51,cebdce684d..4d47b63053
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@@ -63,6 -65,8 +65,7 @@@ import org.apache.accumulo.core.tablets
  import org.apache.accumulo.core.trace.TraceUtil;
  import org.apache.accumulo.core.util.LocalityGroupUtil;
  import 
org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 -import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+ import org.apache.accumulo.core.util.time.NanoTime;
  import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.fs.VolumeManager;
  import org.apache.accumulo.server.iterators.SystemIteratorEnvironment;
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 2922a3999e,b44fcbbca1..4c5357e35a
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -149,13 -150,17 +150,21 @@@ public class Compactor extends Abstract
  
    protected Compactor(ConfigOpts opts, String[] args) {
      super("compactor", opts, args);
 -    queueName = super.getConfiguration().get(Property.COMPACTOR_QUEUE_NAME);
 +  }
 +
 +  @Override
 +  protected String getResourceGroupPropertyValue(SiteConfiguration conf) {
 +    return conf.get(Property.COMPACTOR_GROUP_NAME);
    }
  
+   private long getTotalEntriesRead() {
+     return FileCompactor.getTotalEntriesRead();
+   }
+ 
+   private long getTotalEntriesWritten() {
+     return FileCompactor.getTotalEntriesWritten();
+   }
+ 
    @Override
    public void registerMetrics(MeterRegistry registry) {
      super.registerMetrics(registry);
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
index 30c8594f6c,70b0c4980b..847c39f572
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetrics.java
@@@ -32,9 -36,28 +36,29 @@@ public class TabletServerMetrics implem
      util = new TabletServerMetricsUtil(tserver);
    }
  
+   private long getTotalEntriesRead() {
+     return FileCompactor.getTotalEntriesRead();
+   }
+ 
+   private long getTotalEntriesWritten() {
+     return FileCompactor.getTotalEntriesWritten();
+   }
+ 
    @Override
    public void registerMetrics(MeterRegistry registry) {
+     FunctionCounter
+         .builder(METRICS_COMPACTOR_ENTRIES_READ, this, 
TabletServerMetrics::getTotalEntriesRead)
+         .description("Number of entries read by all compactions that have run 
on this tserver")
+         .register(registry);
+     FunctionCounter
+         .builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this,
+             TabletServerMetrics::getTotalEntriesWritten)
+         .description("Number of entries written by all compactions that have 
run on this tserver")
+         .register(registry);
 -    LongTaskTimer timer = LongTaskTimer.builder(METRICS_TSERVER_MAJC_STUCK)
++    LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
+         .description("Number and duration of stuck major 
compactions").register(registry);
+     CompactionWatcher.setTimer(timer);
 +
      Gauge
          .builder(METRICS_TSERVER_TABLETS_LONG_ASSIGNMENTS, util,
              TabletServerMetricsUtil::getLongTabletAssignments)
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
index 1bcd075577,f77f2ea6a9..b9b41e0833
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java
@@@ -18,7 -18,8 +18,8 @@@
   */
  package org.apache.accumulo.test.compaction;
  
+ import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 -import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
 +import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getRunningCompactions;
@@@ -33,9 -34,12 +34,11 @@@ import java.util.EnumSet
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
 +import java.util.Optional;
+ import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
  
 -import org.apache.accumulo.compactor.Compactor;
 -import org.apache.accumulo.coordinator.CompactionCoordinator;
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.IteratorSetting;
@@@ -46,16 -50,21 +49,23 @@@ import org.apache.accumulo.core.data.Ta
  import org.apache.accumulo.core.iterators.IteratorUtil;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+ import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
  import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
  import org.apache.accumulo.core.util.threads.Threads;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
 -import org.apache.accumulo.minicluster.ServerType;
  import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.test.functional.SlowIterator;
+ import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+ import org.apache.accumulo.test.metrics.TestStatsDSink;
+ import org.apache.accumulo.test.util.Wait;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.thrift.TException;
 +import org.apache.thrift.transport.TTransportException;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.BeforeEach;
  import org.junit.jupiter.api.Test;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -85,7 -110,94 +113,92 @@@ public class ExternalCompactionProgress
    @Override
    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
      ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
 +    cfg.getClusterServerConfiguration().addCompactorResourceGroup(GROUP1, 1);
+     cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+     cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, 
TestStatsDRegistryFactory.class.getName());
+     Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+         TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
+     cfg.setSystemProperties(sysProps);
+   }
+ 
+   @Test
+   public void testProgressViaMetrics() throws Exception {
+     String table = this.getUniqueNames(1)[0];
+ 
+     try (AccumuloClient client =
+         
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
+       createTable(client, table, "cs1");
+       writeData(client, table, ROWS);
+ 
 -      cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1);
 -      
cluster.getClusterControl().startCoordinator(CompactionCoordinator.class);
 -
 -      final AtomicLong expectedEntriesRead = new AtomicLong(9216);
 -      final AtomicLong expectedEntriesWritten = new AtomicLong(4096);
++      final long expectedEntriesRead = 18432;
++      final long expectedEntriesWritten = 13312;
+       final AtomicLong totalEntriesRead = new AtomicLong(0);
+       final AtomicLong totalEntriesWritten = new AtomicLong(0);
+ 
+       Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, 
totalEntriesWritten);
+       checkerThread.start();
+ 
+       IteratorSetting setting = new IteratorSetting(50, "Slow", 
SlowIterator.class);
+       SlowIterator.setSleepTime(setting, 1);
+       client.tableOperations().attachIterator(table, setting,
+           EnumSet.of(IteratorUtil.IteratorScope.majc));
+       log.info("Compacting table");
 -      compact(client, table, 2, QUEUE1, true);
++      compact(client, table, 2, GROUP1, true);
+       log.info("Done Compacting table");
+       verify(client, table, 2, ROWS);
+ 
+       Wait.waitFor(() -> {
 -        if (totalEntriesRead.get() == expectedEntriesRead.get()
 -            && totalEntriesWritten.get() == expectedEntriesWritten.get()) {
++        if (totalEntriesRead.get() == expectedEntriesRead
++            && totalEntriesWritten.get() == expectedEntriesWritten) {
+           return true;
+         }
+         log.info(
+             "Waiting for entries read to be {} (currently {}) and entries 
written to be {} (currently {})",
 -            expectedEntriesRead.get(), totalEntriesRead.get(), 
expectedEntriesWritten.get(),
++            expectedEntriesRead, totalEntriesRead.get(), 
expectedEntriesWritten,
+             totalEntriesWritten.get());
+         return false;
+       }, 30000, 3000, "Entries read and written metrics values did not match 
expected values");
+ 
+       stopCheckerThread.set(true);
+       checkerThread.join();
+     }
+   }
+ 
+   /**
+    * Get a thread that checks the metrics for entries read and written.
+    *
+    * @param totalEntriesRead this is set to the value of the entries read 
metric
+    * @param totalEntriesWritten this is set to the value of the entries 
written metric
+    */
+   private static Thread getMetricsCheckerThread(AtomicLong totalEntriesRead,
+       AtomicLong totalEntriesWritten) {
+     return Threads.createThread("metric-tailer", () -> {
+       log.info("Starting metric tailer");
+ 
+       sink.getLines().clear();
+ 
+       while (!stopCheckerThread.get()) {
+         List<String> statsDMetrics = sink.getLines();
+         for (String s : statsDMetrics) {
+           if (stopCheckerThread.get()) {
+             break;
+           }
+           if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_READ)) {
+             TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
+             int value = Integer.parseInt(e.getValue());
+             totalEntriesRead.addAndGet(value);
+             log.info("Found entries.read metric: {} with value: {}", 
e.getName(), value);
+           } else if 
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_ENTRIES_WRITTEN)) {
+             TestStatsDSink.Metric e = TestStatsDSink.parseStatsDMetric(s);
+             int value = Integer.parseInt(e.getValue());
+             totalEntriesWritten.addAndGet(value);
+             log.info("Found entries.written metric: {} with value: {}", 
e.getName(), value);
+           }
+         }
+         sleepUninterruptibly(3000, TimeUnit.MILLISECONDS);
+       }
+       log.info("Metric tailer thread finished");
+     });
    }
  
    @Test

Reply via email to