This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new ff951d7f99 Add compaction IT that verifies queues are cleared when
tablets no longer need to compact (#4466)
ff951d7f99 is described below
commit ff951d7f99000c0a4e8303408b3233cab168e634
Author: Dom G <[email protected]>
AuthorDate: Wed May 15 09:32:15 2024 -0400
Add compaction IT that verifies queues are cleared when tablets no longer
need to compact (#4466)
* Add compaction IT that verifies queues are cleared when tablets no longer
need to compact
---
.../CompactionPriorityQueueMetricsIT.java | 129 ++++++++++++++++-----
.../test/metrics/TestStatsDRegistryFactory.java | 4 +-
2 files changed, 102 insertions(+), 31 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index 800ceffbcd..7af245fde3 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.compaction;
+import static
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -42,6 +43,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
@@ -72,12 +74,14 @@ import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.functional.CompactionIT;
import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -103,15 +107,23 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
public static final String QUEUE1_SERVICE = "Q1";
public static final int QUEUE1_SIZE = 6;
+ // Metrics collector Thread
+ final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
+ final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+ Thread metricsTailer;
+
@BeforeEach
public void setupMetricsTest() throws Exception {
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+ Wait.waitFor(() ->
getCompactorAddrs(getCluster().getServerContext()).isEmpty());
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
tableName = getUniqueNames(1)[0];
Map<String,String> props =
Map.of("table.compaction.dispatcher",
SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", QUEUE1_SERVICE);
- NewTableConfiguration ntc = new
NewTableConfiguration().setProperties(props);
+ NewTableConfiguration ntc = new
NewTableConfiguration().setProperties(props)
+ .withInitialTabletAvailability(TabletAvailability.HOSTED);
c.tableOperations().create(tableName, ntc);
tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
@@ -119,6 +131,28 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
fs = getCluster().getFileSystem();
rootPath = getCluster().getTemporaryPath().toString();
}
+ queueMetrics.clear();
+ shutdownTailer.set(false);
+ metricsTailer = Threads.createThread("metric-tailer", () -> {
+ while (!shutdownTailer.get()) {
+ List<String> statsDMetrics = sink.getLines();
+ for (String s : statsDMetrics) {
+ if (shutdownTailer.get()) {
+ break;
+ }
+ if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX +
"queue")) {
+ queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+ }
+ }
+ }
+ });
+ metricsTailer.start();
+ }
+
+ @AfterEach
+ public void teardownMetricsTest() throws Exception {
+ shutdownTailer.set(true);
+ metricsTailer.join();
}
private String getDir(String testName) throws Exception {
@@ -254,24 +288,6 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
@Test
public void testQueueMetrics() throws Exception {
- // Metrics collector Thread
- final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
- final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
-
- Thread thread = Threads.createThread("metric-tailer", () -> {
- while (!shutdownTailer.get()) {
- List<String> statsDMetrics = sink.getLines();
- for (String s : statsDMetrics) {
- if (shutdownTailer.get()) {
- break;
- }
- if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX +
"queue")) {
- queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
- }
- }
- }
- });
- thread.start();
long highestFileCount = 0L;
ServerContext context = getCluster().getServerContext();
@@ -282,11 +298,8 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
fs.mkdirs(new Path(dir));
// Create splits so there are two groupings of tablets with similar file
counts.
- List<String> splitPoints =
- List.of("500", "1000", "1500", "2000", "3750", "5500", "7250",
"9000");
- for (String splitPoint : splitPoints) {
- addSplits(c, tableName, splitPoint);
- }
+ String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+ addSplits(c, tableName, splitString);
for (int i = 0; i < 100; i++) {
writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
@@ -322,9 +335,8 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
}
}
}
- // Current poll rate of the TestStatsDRegistryFactory is 3 seconds
// If metrics are not found in the queue, sleep until the next poll.
- UtilWaitThread.sleep(3500);
+
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}
// Set lowest priority to the lowest possible system compaction priority
@@ -380,7 +392,7 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
boolean emptyQueue = false;
// Make sure that metrics added to the queue are recent
- UtilWaitThread.sleep(3500);
+
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
while (!emptyQueue) {
while (!queueMetrics.isEmpty()) {
@@ -401,10 +413,67 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
}
}
}
- UtilWaitThread.sleep(3500);
+
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
+ }
+ }
+
+ /**
+ * Test that the compaction queue is cleared when compactions no longer need
to happen.
+ */
+ @Test
+ public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
+ ServerContext context = getCluster().getServerContext();
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ // Create splits so there are two groupings of tablets with similar file
counts.
+ String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
+ addSplits(c, tableName, splitString);
+
+ for (int i = 0; i < 100; i++) {
+ writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+ }
+ c.tableOperations().importDirectory(dir).to(tableName).load();
+ verifyData(c, tableName, 0, 100 * 100 - 1, false);
}
- shutdownTailer.set(true);
- thread.join();
+ final long sleepMillis =
TestStatsDRegistryFactory.pollingFrequency.toMillis();
+
+ // wait for compaction jobs to be queued
+ Wait.waitFor(() -> getJobsQueued() > 0, 60_000, sleepMillis,
+ "Expected to see compaction jobs queued");
+
+ // change compactor settings so that compactions no longer need to run
+ context.tableOperations().setProperty(tableName,
Property.TABLE_MAJC_RATIO.getKey(), "2000");
+ context.tableOperations().setProperty(tableName,
Property.TABLE_FILE_MAX.getKey(), "2000");
+
+ // wait for queue to clear
+ Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis,
+ "Expected job queue to be cleared once compactions no longer need to
happen");
}
+
+ /**
+ * @return the number of jobs queued in the compaction queue. Returns -1 if
no metrics are found.
+ */
+ private int getJobsQueued() throws InterruptedException {
+ Integer jobsQueued = null;
+ while (!queueMetrics.isEmpty()) {
+ var metric = queueMetrics.take();
+ if (metric.getName()
+
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+ && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+ jobsQueued = Integer.parseInt(metric.getValue());
+ }
+ }
+ if (jobsQueued == null) {
+ log.warn("No compaction job queue metrics found.");
+ return -1;
+ }
+ log.info("Jobs Queued: {}", jobsQueued);
+ return jobsQueued;
+ }
+
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
index 8715a40c00..4c2261d6a2 100644
---
a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
+++
b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
@@ -37,6 +37,8 @@ public class TestStatsDRegistryFactory implements
MeterRegistryFactory {
public static final String SERVER_HOST = "test.meter.registry.host";
public static final String SERVER_PORT = "test.meter.registry.port";
+ public static final Duration pollingFrequency = Duration.ofSeconds(3);
+
@Override
public MeterRegistry create(final InitParameters params) {
LOG.info("starting metrics registration.");
@@ -77,7 +79,7 @@ public class TestStatsDRegistryFactory implements
MeterRegistryFactory {
@Override
public Duration pollingFrequency() {
- return Duration.ofSeconds(3);
+ return pollingFrequency;
}
@Override