This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 69a62fa55007dd6f9f6181f6b9a5f855f91e6937 Merge: f4a857e1fa 000855d4c1 Author: Dom Garguilo <domgargu...@apache.org> AuthorDate: Thu Jun 6 12:36:02 2024 -0400 Merge remote-tracking branch 'upstream/main' into elasticity # Conflicts: # core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java # test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java .../accumulo/core/metrics/MetricsProducer.java | 27 +++++++- .../org/apache/accumulo/compactor/Compactor.java | 8 +++ .../compaction/ExternalCompactionProgressIT.java | 81 +++++++++++++--------- 3 files changed, 84 insertions(+), 32 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 8e0c0a8d58,0f8e607ee6..933c651297 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@@ -61,46 -62,27 +62,69 @@@ import io.micrometer.core.instrument.Me * <tr> * <td>N/A</td> * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUES}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> ++ * <td>N/A</td> ++ * <td>N/A</td> + * <td>{@value #METRICS_COMPACTOR_ENTRIES_READ}</td> + * <td>FunctionCounter</td> + * <td>Number of entries read by all threads performing compactions</td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@value #METRICS_COMPACTOR_ENTRIES_WRITTEN}</td> + * <td>FunctionCounter</td> + * <td>Number of entries written by all threads performing compactions</td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@value #METRICS_COMPACTOR_BUSY}</td> + * <td>Gauge</td> + * <td>Indicates if the compactor is busy or not. The value will be 0 when idle and 1 when + * busy.</td> + * </tr> + * <!-- fate --> + * <tr> * <td>currentFateOps</td> * <td>Gauge</td> * <td>{@value #METRICS_FATE_OPS}</td> @@@ -624,23 -591,11 +648,24 @@@ public interface MetricsProducer Logger LOG = LoggerFactory.getLogger(MetricsProducer.class); String METRICS_LOW_MEMORY = "accumulo.detected.low.memory"; + String METRICS_SERVER_IDLE = "accumulo.server.idle"; + String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; + String METRICS_COMPACTOR_QUEUE_PREFIX = METRICS_COMPACTOR_PREFIX + "queue."; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_QUEUE_PREFIX + "count"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = METRICS_COMPACTOR_QUEUE_PREFIX + "length"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.dequeued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.queued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.rejected"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY = + METRICS_COMPACTOR_QUEUE_PREFIX + "jobs.priority"; String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX + "entries.read"; String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX + "entries.written"; + String METRICS_COMPACTOR_BUSY = METRICS_COMPACTOR_PREFIX + "busy"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in.progress.by.type"; diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index b9b41e0833,a298ae22cd..f7fefbc1f4 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@@ -34,11 -34,13 +34,12 @@@ import java.util.EnumSet import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.accumulo.compactor.Compactor; -import org.apache.accumulo.coordinator.CompactionCoordinator; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; @@@ -125,17 -123,23 +127,20 @@@ public class ExternalCompactionProgress public void testProgressViaMetrics() throws Exception { String table = this.getUniqueNames(1)[0]; + final AtomicLong totalEntriesRead = new AtomicLong(0); + final AtomicLong totalEntriesWritten = new AtomicLong(0); + final AtomicInteger compactorBusy = new AtomicInteger(-1); - final long expectedEntriesRead = 9216; - final long expectedEntriesWritten = 4096; ++ final long expectedEntriesRead = 18432; ++ final long expectedEntriesWritten = 13312; + + Thread checkerThread = + getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten, compactorBusy); + try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { createTable(client, table, "cs1"); writeData(client, table, ROWS); - final long expectedEntriesRead = 18432; - final long expectedEntriesWritten = 13312; - final AtomicLong totalEntriesRead = new AtomicLong(0); - final AtomicLong totalEntriesWritten = new AtomicLong(0); - cluster.getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); - cluster.getClusterControl().startCoordinator(CompactionCoordinator.class); -- - Thread checkerThread = getMetricsCheckerThread(totalEntriesRead, totalEntriesWritten); checkerThread.start(); IteratorSetting setting = new IteratorSetting(50, "Slow", SlowIterator.class); @@@ -143,9 -147,14 +148,14 @@@ client.tableOperations().attachIterator(table, setting, EnumSet.of(IteratorUtil.IteratorScope.majc)); log.info("Compacting table"); - compact(client, table, 2, GROUP1, true); - log.info("Done Compacting table"); - verify(client, table, 2, ROWS); + + Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, + "Compactor busy metric should be false initially"); + - compact(client, table, 2, QUEUE1, false); ++ compact(client, table, 2, GROUP1, false); + + Wait.waitFor(() -> compactorBusy.get() == 1, 30_000, CHECKER_THREAD_SLEEP_MS, + "Compactor busy metric should be true after starting compaction"); Wait.waitFor(() -> { if (totalEntriesRead.get() == expectedEntriesRead @@@ -157,10 -166,19 +167,17 @@@ expectedEntriesRead, totalEntriesRead.get(), expectedEntriesWritten, totalEntriesWritten.get()); return false; - }, 30000, 3000, "Entries read and written metrics values did not match expected values"); + }, 30_000, CHECKER_THREAD_SLEEP_MS, + "Entries read and written metrics values did not match expected values"); + + log.info("Done Compacting table"); + verify(client, table, 2, ROWS); + Wait.waitFor(() -> compactorBusy.get() == 0, 30_000, CHECKER_THREAD_SLEEP_MS, + "Compactor busy metric should be false once compaction completes"); + } finally { stopCheckerThread.set(true); checkerThread.join(); - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR); } }