This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 860b87d Added IT for external compaction metrics 860b87d is described below commit 860b87d7c84018c84b6244f06bb54b7db9f5c4a9 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Apr 28 15:38:59 2021 -0400 Added IT for external compaction metrics --- .../apache/accumulo/test/ExternalCompactionIT.java | 127 ++++++++++++++++++--- .../resources/hadoop-metrics2-accumulo.properties | 7 +- 2 files changed, 116 insertions(+), 18 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index dc475df..e790f96 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -34,15 +35,20 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -84,7 +90,10 @@ import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.miniclusterImpl.ProcessReference; import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.commons.io.input.Tailer; +import org.apache.commons.io.input.TailerListenerAdapter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; @@ -256,12 +265,8 @@ public class ExternalCompactionIT extends ConfigurableMacBase { public void testMergeDuringExternalCompaction() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { String table1 = "ectt7"; - SortedSet<Text> splits = new TreeSet<>(); - int jump = MAX_DATA / 2; - for (int r = jump; r < MAX_DATA; r += jump) { - splits.add(new Text(row(r))); - } - createTable(client, table1, "cs1", splits); + + createTable(client, table1, "cs1", 2); // set compaction ratio to 1 so that majc occurs naturally, not user compaction // user compaction blocks merge client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.toString(), "1.0"); @@ -334,14 +339,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { String table1 = "ectt4"; - SortedSet<Text> splits = new TreeSet<>(); - int jump = MAX_DATA / 200; - - for (int r = jump; r < MAX_DATA; r += jump) { - splits.add(new Text(row(r))); - } - - createTable(client, table1, "cs1", splits); + createTable(client, table1, "cs1", 200); writeData(client, table1); @@ -625,6 +623,92 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } } + private static Optional<String> extract(String input, String regex) { + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(input); + if (matcher.matches()) { + return Optional.of(matcher.group(1)); + } + + return Optional.empty(); + } + + @Test + public void testMetrics() throws Exception { + Collection<ProcessReference> tservers = + getCluster().getProcesses().get(ServerType.TABLET_SERVER); + assertEquals(2, tservers.size()); + // kill one tserver so that queue metrics are not spread across tservers + getCluster().killProcess(TABLET_SERVER, tservers.iterator().next()); + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + String[] names = getUniqueNames(2); + String table1 = names[0]; + createTable(client, table1, "cs1", 5); + + String table2 = names[1]; + createTable(client, table2, "cs2", 10); + + writeData(client, table1); + writeData(client, table2); + + LinkedBlockingQueue<String> queueMetrics = new LinkedBlockingQueue<>(); + + Tailer tailer = + Tailer.create(new File("./target/tserver.metrics"), new TailerListenerAdapter() { + @Override + public void handle(final String line) { + extract(line, ".*(DCQ1_queued=[0-9]+).*").ifPresent(queueMetrics::add); + extract(line, ".*(DCQ2_queued=[0-9]+).*").ifPresent(queueMetrics::add); + } + }); + + compact(client, table1, 7, "DCQ1", false); + compact(client, table2, 13, "DCQ2", false); + + boolean sawDCQ1_5 = false; + boolean sawDCQ2_10 = false; + + // wait until expected number of queued are seen in metrics + while (!sawDCQ1_5 || !sawDCQ2_10) { + String qm = queueMetrics.take(); + sawDCQ1_5 |= qm.equals("DCQ1_queued=5"); + sawDCQ2_10 |= qm.equals("DCQ2_queued=10"); + } + + // start compactors + cluster.exec(Compactor.class, "-q", "DCQ1"); + cluster.exec(Compactor.class, "-q", "DCQ2"); + cluster.exec(CompactionCoordinator.class); + + boolean sawDCQ1_0 = false; + boolean sawDCQ2_0 = false; + + // wait until queued goes to zero in metrics + while (!sawDCQ1_0 || !sawDCQ2_0) { + String qm = queueMetrics.take(); + sawDCQ1_0 |= qm.equals("DCQ1_queued=0"); + sawDCQ2_0 |= qm.equals("DCQ2_queued=0"); + } + + tailer.stop(); + + // Wait for all external compactions to complete + long count; + do { + UtilWaitThread.sleep(100); + try (TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() + .forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build()) { + count = tm.stream().flatMap(t -> t.getExternalCompactions().keySet().stream()).count(); + } + } while (count > 0); + + verify(client, table1, 7); + verify(client, table2, 13); + + } + } + private ExternalCompactionMetrics getCoordinatorMetrics() throws Exception { HttpResponse<String> res = hc.send(req, BodyHandlers.ofString()); assertEquals(200, res.statusCode()); @@ -638,7 +722,8 @@ public class ExternalCompactionIT extends ConfigurableMacBase { try (Scanner scanner = client.createScanner(table1)) { int count = 0; for (Entry<Key,Value> entry : scanner) { - assertTrue(Integer.parseInt(entry.getValue().toString()) % modulus == 0); + assertTrue(String.format("%s %s %d != 0", entry.getValue(), "%", modulus), + Integer.parseInt(entry.getValue().toString()) % modulus == 0); count++; } @@ -675,6 +760,18 @@ public class ExternalCompactionIT extends ConfigurableMacBase { } + private void createTable(AccumuloClient client, String tableName, String service, int numTablets) + throws Exception { + SortedSet<Text> splits = new TreeSet<>(); + int jump = MAX_DATA / numTablets; + + for (int r = jump; r < MAX_DATA; r += jump) { + splits.add(new Text(row(r))); + } + + createTable(client, tableName, service, splits); + } + private void createTable(AccumuloClient client, String tableName, String service, SortedSet<Text> splits) throws Exception { Map<String,String> props = diff --git a/test/src/main/resources/hadoop-metrics2-accumulo.properties b/test/src/main/resources/hadoop-metrics2-accumulo.properties index 416a0f7..cacd779 100644 --- a/test/src/main/resources/hadoop-metrics2-accumulo.properties +++ b/test/src/main/resources/hadoop-metrics2-accumulo.properties @@ -41,9 +41,10 @@ accumulo.sink.file-gc.filename=./target/accgc.metrics accumulo.sink.file-gc.period=5 # File sink for tserver metrics -# accumulo.sink.file-tserver.class=org.apache.hadoop.metrics2.sink.FileSink -# accumulo.sink.file-tserver.context=tserver -# accumulo.sink.file-tserver.filename=tserver.metrics +accumulo.sink.file-tserver.class=org.apache.hadoop.metrics2.sink.FileSink +accumulo.sink.file-tserver.context=tserver +accumulo.sink.file-tserver.filename=./target/tserver.metrics +accumulo.sink.file-tserver.period=5 # File sink for manager metrics accumulo.sink.file-manager.class=org.apache.hadoop.metrics2.sink.FileSink