This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 860b87d  Added IT for external compaction metrics
860b87d is described below

commit 860b87d7c84018c84b6244f06bb54b7db9f5c4a9
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Apr 28 15:38:59 2021 -0400

    Added IT for external compaction metrics
---
 .../apache/accumulo/test/ExternalCompactionIT.java | 127 ++++++++++++++++++---
 .../resources/hadoop-metrics2-accumulo.properties  |   7 +-
 2 files changed, 116 insertions(+), 18 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index dc475df..e790f96 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -34,15 +35,20 @@ import java.net.http.HttpRequest;
 import java.net.http.HttpResponse;
 import java.net.http.HttpResponse.BodyHandlers;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -84,7 +90,10 @@ import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.miniclusterImpl.ProcessReference;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.commons.io.input.Tailer;
+import org.apache.commons.io.input.TailerListenerAdapter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
@@ -256,12 +265,8 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
   public void testMergeDuringExternalCompaction() throws Exception {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
       String table1 = "ectt7";
-      SortedSet<Text> splits = new TreeSet<>();
-      int jump = MAX_DATA / 2;
-      for (int r = jump; r < MAX_DATA; r += jump) {
-        splits.add(new Text(row(r)));
-      }
-      createTable(client, table1, "cs1", splits);
+
+      createTable(client, table1, "cs1", 2);
       // set compaction ratio to 1 so that majc occurs naturally, not user 
compaction
       // user compaction blocks merge
       client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.toString(), "1.0");
@@ -334,14 +339,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
       String table1 = "ectt4";
 
-      SortedSet<Text> splits = new TreeSet<>();
-      int jump = MAX_DATA / 200;
-
-      for (int r = jump; r < MAX_DATA; r += jump) {
-        splits.add(new Text(row(r)));
-      }
-
-      createTable(client, table1, "cs1", splits);
+      createTable(client, table1, "cs1", 200);
 
       writeData(client, table1);
 
@@ -625,6 +623,92 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
     }
   }
 
+  private static Optional<String> extract(String input, String regex) {
+    Pattern pattern = Pattern.compile(regex);
+    Matcher matcher = pattern.matcher(input);
+    if (matcher.matches()) {
+      return Optional.of(matcher.group(1));
+    }
+
+    return Optional.empty();
+  }
+
+  @Test
+  public void testMetrics() throws Exception {
+    Collection<ProcessReference> tservers =
+        getCluster().getProcesses().get(ServerType.TABLET_SERVER);
+    assertEquals(2, tservers.size());
+    // kill one tserver so that queue metrics are not spread across tservers
+    getCluster().killProcess(TABLET_SERVER, tservers.iterator().next());
+
+    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      String[] names = getUniqueNames(2);
+      String table1 = names[0];
+      createTable(client, table1, "cs1", 5);
+
+      String table2 = names[1];
+      createTable(client, table2, "cs2", 10);
+
+      writeData(client, table1);
+      writeData(client, table2);
+
+      LinkedBlockingQueue<String> queueMetrics = new LinkedBlockingQueue<>();
+
+      Tailer tailer =
+          Tailer.create(new File("./target/tserver.metrics"), new 
TailerListenerAdapter() {
+            @Override
+            public void handle(final String line) {
+              extract(line, 
".*(DCQ1_queued=[0-9]+).*").ifPresent(queueMetrics::add);
+              extract(line, 
".*(DCQ2_queued=[0-9]+).*").ifPresent(queueMetrics::add);
+            }
+          });
+
+      compact(client, table1, 7, "DCQ1", false);
+      compact(client, table2, 13, "DCQ2", false);
+
+      boolean sawDCQ1_5 = false;
+      boolean sawDCQ2_10 = false;
+
+      // wait until expected number of queued are seen in metrics
+      while (!sawDCQ1_5 || !sawDCQ2_10) {
+        String qm = queueMetrics.take();
+        sawDCQ1_5 |= qm.equals("DCQ1_queued=5");
+        sawDCQ2_10 |= qm.equals("DCQ2_queued=10");
+      }
+
+      // start compactors
+      cluster.exec(Compactor.class, "-q", "DCQ1");
+      cluster.exec(Compactor.class, "-q", "DCQ2");
+      cluster.exec(CompactionCoordinator.class);
+
+      boolean sawDCQ1_0 = false;
+      boolean sawDCQ2_0 = false;
+
+      // wait until queued goes to zero in metrics
+      while (!sawDCQ1_0 || !sawDCQ2_0) {
+        String qm = queueMetrics.take();
+        sawDCQ1_0 |= qm.equals("DCQ1_queued=0");
+        sawDCQ2_0 |= qm.equals("DCQ2_queued=0");
+      }
+
+      tailer.stop();
+
+      // Wait for all external compactions to complete
+      long count;
+      do {
+        UtilWaitThread.sleep(100);
+        try (TabletsMetadata tm = 
getCluster().getServerContext().getAmple().readTablets()
+            .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build()) {
+          count = tm.stream().flatMap(t -> 
t.getExternalCompactions().keySet().stream()).count();
+        }
+      } while (count > 0);
+
+      verify(client, table1, 7);
+      verify(client, table2, 13);
+
+    }
+  }
+
   private ExternalCompactionMetrics getCoordinatorMetrics() throws Exception {
     HttpResponse<String> res = hc.send(req, BodyHandlers.ofString());
     assertEquals(200, res.statusCode());
@@ -638,7 +722,8 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
     try (Scanner scanner = client.createScanner(table1)) {
       int count = 0;
       for (Entry<Key,Value> entry : scanner) {
-        assertTrue(Integer.parseInt(entry.getValue().toString()) % modulus == 
0);
+        assertTrue(String.format("%s %s %d != 0", entry.getValue(), "%", 
modulus),
+            Integer.parseInt(entry.getValue().toString()) % modulus == 0);
         count++;
       }
 
@@ -675,6 +760,18 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
 
   }
 
+  private void createTable(AccumuloClient client, String tableName, String 
service, int numTablets)
+      throws Exception {
+    SortedSet<Text> splits = new TreeSet<>();
+    int jump = MAX_DATA / numTablets;
+
+    for (int r = jump; r < MAX_DATA; r += jump) {
+      splits.add(new Text(row(r)));
+    }
+
+    createTable(client, tableName, service, splits);
+  }
+
   private void createTable(AccumuloClient client, String tableName, String 
service,
       SortedSet<Text> splits) throws Exception {
     Map<String,String> props =
diff --git a/test/src/main/resources/hadoop-metrics2-accumulo.properties 
b/test/src/main/resources/hadoop-metrics2-accumulo.properties
index 416a0f7..cacd779 100644
--- a/test/src/main/resources/hadoop-metrics2-accumulo.properties
+++ b/test/src/main/resources/hadoop-metrics2-accumulo.properties
@@ -41,9 +41,10 @@ accumulo.sink.file-gc.filename=./target/accgc.metrics
 accumulo.sink.file-gc.period=5
 
 # File sink for tserver metrics
-# accumulo.sink.file-tserver.class=org.apache.hadoop.metrics2.sink.FileSink
-# accumulo.sink.file-tserver.context=tserver
-# accumulo.sink.file-tserver.filename=tserver.metrics
+accumulo.sink.file-tserver.class=org.apache.hadoop.metrics2.sink.FileSink
+accumulo.sink.file-tserver.context=tserver
+accumulo.sink.file-tserver.filename=./target/tserver.metrics
+accumulo.sink.file-tserver.period=5
 
 # File sink for manager metrics
 accumulo.sink.file-manager.class=org.apache.hadoop.metrics2.sink.FileSink

Reply via email to