This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 5dbc99ed69 Fix failing ITs due to recent features (#4813) 5dbc99ed69 is described below commit 5dbc99ed6979da97e1570e135137011568f37ae2 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Aug 20 18:01:59 2024 -0400 Fix failing ITs due to recent features (#4813) Changes introduced in #4800 to validate compaction configurations caused several ITs to start failing. In these cases the ITs were using invalid configurations. The changes here fix the ITs by either isolating the invalid configurations to only the associated tests (vs having the invalid configuration persist for all the tests in the class), or I made modifications to the test so that they would continue to work. --- .../accumulo/core/metrics/MetricsProducer.java | 9 ++ .../accumulo/manager/TabletGroupWatcher.java | 9 +- .../accumulo/manager/metrics/ManagerMetrics.java | 12 ++ .../manager/upgrade/UpgradeCoordinator.java | 2 +- .../compaction/BadCompactionServiceConfigIT.java | 146 ++++++++++++++++++--- .../test/compaction/CompactionExecutorIT.java | 30 +++-- .../test/compaction/ExternalCompaction2BaseIT.java | 2 +- .../accumulo/test/functional/CompactionIT.java | 2 +- 8 files changed, 174 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 73b2fe5029..60b108245a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -634,6 +634,13 @@ import io.micrometer.core.instrument.MeterRegistry; * <tr> * <td>N/A</td> * <td>N/A</td> + * <td>{@link #METRICS_MANAGER_COMPACTION_SVC_ERRORS}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> * <td>{@link #METRICS_MANAGER_USER_TGW_ERRORS}</td> * <td>Gauge</td> * <td></td> @@ -700,6 +707,8 @@ public interface MetricsProducer { String METRICS_MANAGER_ROOT_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tabletmgmt.root.errors"; String METRICS_MANAGER_META_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tabletmgmt.meta.errors"; String METRICS_MANAGER_USER_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tabletmgmt.user.errors"; + String METRICS_MANAGER_COMPACTION_SVC_ERRORS = + METRICS_MANAGER_PREFIX + "compaction.svc.misconfigured"; String METRICS_MAJC_PREFIX = "accumulo.compactions.majc."; String METRICS_MAJC_QUEUED = METRICS_MAJC_PREFIX + "queued"; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 3b77816a1e..94bf353ffb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -443,11 +443,12 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { try { CheckCompactionConfig.validate(manager.getConfiguration()); - } catch (SecurityException | IllegalArgumentException | IllegalStateException - | ReflectiveOperationException e) { + this.metrics.clearCompactionServiceConfigurationError(); + } catch (RuntimeException | ReflectiveOperationException e) { + this.metrics.setCompactionServiceConfigurationError(); LOG.error( - "Error validating compaction configuration, all compactions are paused until the configuration is fixed.", - e); + "Error validating compaction configuration, all {} compactions are paused until the configuration is fixed.", + store.getLevel(), e); compactionGenerator = null; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java index 2869e2d7e8..a6c59f5328 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java @@ -22,6 +22,7 @@ import static java.util.Objects.requireNonNull; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -42,6 +43,7 @@ public class ManagerMetrics implements MetricsProducer { private final AtomicLong rootTGWErrorsGauge = new AtomicLong(0); private final AtomicLong metadataTGWErrorsGauge = new AtomicLong(0); private final AtomicLong userTGWErrorsGauge = new AtomicLong(0); + private final AtomicInteger compactionConfigurationError = new AtomicInteger(0); public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) { requireNonNull(conf, "AccumuloConfiguration must not be null"); @@ -69,12 +71,22 @@ public class ManagerMetrics implements MetricsProducer { } } + public void setCompactionServiceConfigurationError() { + this.compactionConfigurationError.set(1); + } + + public void clearCompactionServiceConfigurationError() { + this.compactionConfigurationError.set(0); + } + @Override public void registerMetrics(MeterRegistry registry) { fateMetrics.forEach(fm -> fm.registerMetrics(registry)); registry.gauge(METRICS_MANAGER_ROOT_TGW_ERRORS, rootTGWErrorsGauge); registry.gauge(METRICS_MANAGER_META_TGW_ERRORS, metadataTGWErrorsGauge); registry.gauge(METRICS_MANAGER_USER_TGW_ERRORS, userTGWErrorsGauge); + registry.gauge(METRICS_MANAGER_COMPACTION_SVC_ERRORS, compactionConfigurationError, + AtomicInteger::get); } public List<MetricsProducer> getProducers(AccumuloConfiguration conf, Manager manager) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 1856529670..61a7516f15 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -263,7 +263,7 @@ public class UpgradeCoordinator { } try { CheckCompactionConfig.validate(context.getConfiguration()); - } catch (SecurityException | IllegalArgumentException | ReflectiveOperationException e) { + } catch (RuntimeException | ReflectiveOperationException e) { throw new IllegalStateException("Error validating compaction configuration", e); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java index 96b74c0d25..9aea90d60a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java @@ -29,6 +29,7 @@ import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -42,43 +43,71 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner; -import org.apache.accumulo.harness.MiniClusterConfigurationCallback; -import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.MoreCollectors; -public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { +public class BadCompactionServiceConfigIT extends AccumuloClusterHarness { + private static final Logger LOG = LoggerFactory.getLogger(BadCompactionServiceConfigIT.class); private static final String CSP = Property.COMPACTION_SERVICE_PREFIX.getKey(); + private static TestStatsDSink sink; @BeforeAll public static void beforeTests() throws Exception { - startMiniClusterWithConfig(new ClusterConfig()); + sink = new TestStatsDSink(); } - static class ClusterConfig implements MiniClusterConfigurationCallback { + @AfterAll + public static void after() throws Exception { + sink.close(); + } - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - Map<String,String> siteCfg = new HashMap<>(); - siteCfg.put(CSP + DEFAULT_COMPACTION_SERVICE_NAME + ".planner", - RatioBasedCompactionPlanner.class.getName()); - siteCfg.put(CSP + DEFAULT_COMPACTION_SERVICE_NAME + ".planner.opts.groups", - "[{\"group\":\"default\"}]"); - siteCfg.put(CSP + "cs1.planner", RatioBasedCompactionPlanner.class.getName()); - // place invalid json in the planners config - siteCfg.put(CSP + "cs1.planner.opts.groups", "{{'group]"); - cfg.setSiteConfig(siteCfg); - } + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> siteCfg = new HashMap<>(); + siteCfg.put(CSP + DEFAULT_COMPACTION_SERVICE_NAME + ".planner", + RatioBasedCompactionPlanner.class.getName()); + siteCfg.put(CSP + DEFAULT_COMPACTION_SERVICE_NAME + ".planner.opts.groups", + "[{\"group\":\"default\"}]"); + siteCfg.put(CSP + "cs1.planner", RatioBasedCompactionPlanner.class.getName()); + // place invalid json in the planners config + siteCfg.put(CSP + "cs1.planner.opts.groups", "{{'group]"); + cfg.setSiteConfig(siteCfg); + cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "3s"); + // Tell the server processes to use a StatsDMeterRegistry and the simple logging registry + // that will be configured to push all metrics to the sink we started. + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_USER_TAGS, "tag1=value1,tag2=value2"); + cfg.setProperty(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true"); + cfg.setProperty("general.custom.metrics.opts.logging.step", "10s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); } public static class EverythingFilter extends Filter { @@ -102,6 +131,32 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { @Test public void testUsingMisconfiguredService() throws Exception { + + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + final AtomicBoolean serviceMisconfigured = new AtomicBoolean(false); + final Thread thread = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_MANAGER_COMPACTION_SVC_ERRORS)) { + Metric m = TestStatsDSink.parseStatsDMetric(s); + Integer value = Integer.parseInt(m.getValue()); + if (value == 0) { + serviceMisconfigured.set(false); + } else if (value == 1) { + serviceMisconfigured.set(true); + } else { + LOG.error("Invalid value received: " + m.getValue()); + } + } + } + } + }); + thread.start(); + String table = getUniqueNames(1)[0]; // Create a table that is configured to use a compaction service with bad configuration. @@ -110,6 +165,8 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { Map.of(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "cs1")); client.tableOperations().create(table, ntc); + Wait.waitFor(() -> serviceMisconfigured.get() == true); + try (var writer = client.createBatchWriter(table)) { writer.addMutation(new Mutation("0").at().family("f").qualifier("q").put("v")); } @@ -135,14 +192,16 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { client.instanceOperations().setProperty(CSP + "cs1.planner.opts.groups", value); // start the compactor, it was not started initially because of bad config - getCluster().getConfig().getClusterServerConfiguration() + ((MiniAccumuloClusterImpl) getCluster()).getConfig().getClusterServerConfiguration() .addCompactorResourceGroup("cs1q1", 1); - getCluster().getClusterControl().start(ServerType.COMPACTOR); + ((MiniAccumuloClusterImpl) getCluster()).getClusterControl().start(ServerType.COMPACTOR); } catch (Exception e) { throw new RuntimeException(e); } }); + Wait.waitFor(() -> serviceMisconfigured.get() == false); + List<IteratorSetting> iterators = Collections.singletonList(new IteratorSetting(100, EverythingFilter.class)); client.tableOperations().compact(table, @@ -158,6 +217,8 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { // misconfigure the service, test how going from good config to bad config works. The test // started with an initial state of bad config. client.instanceOperations().setProperty(CSP + "cs1.planner.opts.groups", "]o.o["); + Wait.waitFor(() -> serviceMisconfigured.get() == true); + try (var writer = client.createBatchWriter(table)) { writer.addMutation(new Mutation("0").at().family("f").qualifier("q").put("v")); } @@ -175,6 +236,7 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { throw new RuntimeException(e); } }); + Wait.waitFor(() -> serviceMisconfigured.get() == false); client.tableOperations().compact(table, new CompactionConfig().setIterators(iterators).setWait(true)); @@ -185,19 +247,57 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { } fixerFuture.get(); + } finally { + shutdownTailer.set(true); + thread.join(); } } @Test public void testUsingNonExistentService() throws Exception { + + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + final AtomicBoolean serviceMisconfigured = new AtomicBoolean(false); + final Thread thread = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_MANAGER_COMPACTION_SVC_ERRORS)) { + Metric m = TestStatsDSink.parseStatsDMetric(s); + Integer value = Integer.parseInt(m.getValue()); + if (value == 0) { + serviceMisconfigured.set(false); + } else if (value == 1) { + serviceMisconfigured.set(true); + } else { + LOG.error("Invalid value received: " + m.getValue()); + } + } + } + } + }); + thread.start(); + String table = getUniqueNames(1)[0]; // Create a table that is configured to use a compaction service that does not exist try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration().setProperties( Map.of(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "cs5")); client.tableOperations().create(table, ntc); + Wait.waitFor(() -> serviceMisconfigured.get() == true); + + // The setup of this test creates an invalid configuration, fix this first thing. + var value = "[{'group':'cs1q1'}]".replaceAll("'", "\""); + client.instanceOperations().setProperty(CSP + "cs1.planner.opts.groups", value); + + Wait.waitFor(() -> serviceMisconfigured.get() == false); + // Add splits so that the tserver logs can manually be inspected to ensure they are not // spammed. Not sure how to check this automatically. var splits = IntStream.range(1, 10).mapToObj(i -> new Text(i + "")) @@ -236,6 +336,9 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { } }); + Wait.waitFor(() -> fixerFuture.isDone()); + fixerFuture.get(); + List<IteratorSetting> iterators = Collections.singletonList(new IteratorSetting(100, EverythingFilter.class)); client.tableOperations().compact(table, @@ -246,8 +349,9 @@ public class BadCompactionServiceConfigIT extends SharedMiniClusterBase { assertEquals(0, scanner.stream().count()); } - fixerFuture.get(); - + } finally { + shutdownTailer.set(true); + thread.join(); } } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java index a6bd658c8a..7a065b4538 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java @@ -192,16 +192,6 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11"); cfg.setProperty(csp + "cs4.planner.opts.process", "USER"); - // Setup three planner that fail to initialize or plan, these planners should not impede - // tablet assignment. - cfg.setProperty(csp + "cse1.planner", ErroringPlanner.class.getName()); - cfg.setProperty(csp + "cse1.planner.opts.failInInit", "true"); - - cfg.setProperty(csp + "cse2.planner", ErroringPlanner.class.getName()); - cfg.setProperty(csp + "cse2.planner.opts.failInInit", "false"); - - cfg.setProperty(csp + "cse3.planner", "NonExistentPlanner20240522"); - // this is meant to be dynamically reconfigured cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName()); cfg.setProperty(csp + "recfg.planner.opts.groups", "[{'group':'i1'},{'group':'i2'}]"); @@ -260,6 +250,18 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { // This test ensures that a table w/ failing compaction planner can still be read and written. try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + // Setup three planner that fail to initialize or plan, these planners should not impede + // tablet assignment. + var csp = Property.COMPACTION_SERVICE_PREFIX.getKey(); + client.instanceOperations().setProperty(csp + "cse1.planner", + ErroringPlanner.class.getName()); + client.instanceOperations().setProperty(csp + "cse1.planner.opts.failInInit", "true"); + client.instanceOperations().setProperty(csp + "cse2.planner", + ErroringPlanner.class.getName()); + client.instanceOperations().setProperty(csp + "cse2.planner.opts.failInInit", "false"); + client.instanceOperations().setProperty(csp + "cse3.planner", "NonExistentPlanner20240522"); + createTable(client, "fail1", "cse1"); createTable(client, "fail2", "cse2"); createTable(client, "fail3", "cse3"); @@ -281,6 +283,14 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { assertEquals(30, getFiles(client, "fail1").size()); assertEquals(30, getFiles(client, "fail2").size()); assertEquals(30, getFiles(client, "fail3").size()); + + // Remove the properties for the invalid planners + client.instanceOperations().removeProperty(csp + "cse1.planner"); + client.instanceOperations().removeProperty(csp + "cse1.planner.opts.failInInit"); + client.instanceOperations().removeProperty(csp + "cse2.planner"); + client.instanceOperations().removeProperty(csp + "cse2.planner.opts.failInInit"); + client.instanceOperations().removeProperty(csp + "cse3.planner"); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2BaseIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2BaseIT.java index fd1238f800..bdcd9af817 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2BaseIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction2BaseIT.java @@ -70,7 +70,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.slf4j.LoggerFactory; -public class ExternalCompaction2BaseIT extends SharedMiniClusterBase { +public abstract class ExternalCompaction2BaseIT extends SharedMiniClusterBase { static class ExternalCompaction2Config implements MiniClusterConfigurationCallback { @Override diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 0ee30cf684..d9f77297c7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -865,7 +865,7 @@ public class CompactionIT extends CompactionBaseIT { RatioBasedCompactionPlanner.class.getName()); c.instanceOperations().setProperty( Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.groups", - ("[{'group':'" + COMPACTOR_GROUP_2 + "'}]").replaceAll("'", "\"")); + ("[{'group':'" + COMPACTOR_GROUP_1 + "'}]").replaceAll("'", "\"")); // set table 1 to a compaction service newcs c.tableOperations().setProperty(table1,