ACCUMULO-4409 Add tests for monitor appender Changes to make AccumuloMonitorAppender more testable and the tests to accompany it.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5afbacc0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5afbacc0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5afbacc0 Branch: refs/heads/master Commit: 5afbacc0af9a6d7f9c610969eafcfed2734364a2 Parents: ff7525e Author: Christopher Tubbs <ctubb...@apache.org> Authored: Wed Mar 8 20:50:44 2017 -0500 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Wed Mar 8 20:50:44 2017 -0500 ---------------------------------------------------------------------- .../monitor/util/AccumuloMonitorAppender.java | 193 +++++++++++++------ .../util/AccumuloMonitorAppenderTest.java | 184 ++++++++++++++++++ 2 files changed, 315 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5afbacc0/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java index 8a855d0..a965f0c 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java @@ -18,10 +18,13 @@ package org.apache.accumulo.monitor.util; import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; @@ -30,16 +33,19 @@ import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.AsyncAppender; import org.apache.log4j.net.SocketAppender; import org.apache.zookeeper.data.Stat; import com.google.common.net.HostAndPort; -public class AccumuloMonitorAppender extends AsyncAppender { +public class AccumuloMonitorAppender extends AsyncAppender implements AutoCloseable { - private final ScheduledExecutorService executorService; - private final AtomicBoolean trackerScheduled; + final ScheduledExecutorService executorService; + final AtomicBoolean trackerScheduled; + private int frequency = 0; + private MonitorTracker tracker = null; /** * A Log4j Appender which follows the registered location of the active Accumulo monitor service, and forwards log messages to it @@ -54,12 +60,33 @@ public class AccumuloMonitorAppender extends AsyncAppender { }); } + public void setFrequency(int millis) { + if (millis > 0) { + frequency = millis; + } + } + + public int getFrequency() { + return frequency; + } + + // this is just for testing + void setTracker(MonitorTracker monitorTracker) { + tracker = monitorTracker; + } + @Override public void activateOptions() { // only schedule it once (in case options get activated more than once); not sure if this is possible if (trackerScheduled.compareAndSet(false, true)) { - // wait 5 seconds, then run every 5 seconds - executorService.scheduleAtFixedRate(new MonitorTracker(), 5, 5, TimeUnit.SECONDS); + if (frequency <= 0) { + // use default rate of 5 seconds between each check + frequency = 5000; + } + if (tracker == null) { + tracker = new MonitorTracker(this, new ZooCacheLocationSupplier(), new SocketAppenderFactory()); + } + executorService.scheduleWithFixedDelay(tracker, frequency, frequency, TimeUnit.MILLISECONDS); } super.activateOptions(); } @@ -72,80 +99,122 @@ public class AccumuloMonitorAppender extends AsyncAppender { super.close(); } - private class MonitorTracker implements Runnable { + static class MonitorLocation { + private final String location; + private final long modId; - private String path; - private ZooCache zooCache; - - private long lastModifiedTransactionId; - private SocketAppender lastSocketAppender; + public MonitorLocation(long modId, byte[] location) { + this.modId = modId; + this.location = location == null ? null : new String(location, UTF_8); + } - public MonitorTracker() { + public boolean hasLocation() { + return location != null; + } - // path and zooCache are lazily set the first time this tracker is run - // this allows the tracker to be constructed and scheduled during log4j initialization without - // triggering any actual logs from the Accumulo or ZooKeeper code - this.path = null; - this.zooCache = null; + public String getLocation() { + return location; + } - this.lastModifiedTransactionId = 0; - this.lastSocketAppender = null; + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof MonitorLocation) { + MonitorLocation other = (MonitorLocation) obj; + return modId == other.modId && Objects.equals(location, other.location); + } + return false; } @Override - public void run() { - try { - // lazily set up path and zooCache (see comment in constructor) - if (this.zooCache == null) { - Instance instance = HdfsZooInstance.getInstance(); - this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR; - this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); - } + public int hashCode() { + return Long.hashCode(modId); + } + } - // get the current location from the cache and update if necessary - Stat stat = new Stat(); - byte[] loc = zooCache.get(path, stat); - long modifiedTransactionId = stat.getMzxid(); - - // modifiedTransactionId will be 0 if no current location - // lastModifiedTransactionId will be 0 if we've never seen a location - if (modifiedTransactionId != lastModifiedTransactionId) { - // replace old socket on every change, even if new location is the same as old location - // if modifiedTransactionId changed, then the monitor restarted and the old socket is dead now - switchAppender(loc, modifiedTransactionId); - } - } catch (Exception e) { - // dump any non-fatal problems to the console, but let it run again - e.printStackTrace(); + private static class ZooCacheLocationSupplier implements Supplier<MonitorLocation> { + + // path and zooCache are lazily set the first time this tracker is run + // this allows the tracker to be constructed and scheduled during log4j initialization without + // triggering any actual logs from the Accumulo or ZooKeeper code + private String path = null; + private ZooCache zooCache = null; + + @Override + public MonitorLocation get() { + // lazily set up path and zooCache (see comment in constructor) + if (this.zooCache == null) { + Instance instance = HdfsZooInstance.getInstance(); + this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR; + this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); } + + // get the current location from the cache and update if necessary + Stat stat = new Stat(); + byte[] loc = zooCache.get(path, stat); + // mzxid is 0 if location does not exist and the non-zero transaction id of the last modification otherwise + return new MonitorLocation(stat.getMzxid(), loc); } + } - private void switchAppender(byte[] newLocation, long newModifiedTransactionId) { - // remove and close the last one, if it was non-null - if (lastSocketAppender != null) { - AccumuloMonitorAppender.this.removeAppender(lastSocketAppender); - lastSocketAppender.close(); - } + private static class SocketAppenderFactory implements Function<MonitorLocation,AppenderSkeleton> { + @Override + public AppenderSkeleton apply(MonitorLocation loc) { + int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue()); + HostAndPort remote = HostAndPort.fromString(loc.getLocation()); + + SocketAppender socketAppender = new SocketAppender(); + socketAppender.setApplication(System.getProperty("accumulo.application", "unknown")); + socketAppender.setRemoteHost(remote.getHostText()); + socketAppender.setPort(remote.getPortOrDefault(defaultPort)); - // create a new SocketAppender, if new location is non-null - if (newLocation != null) { + return socketAppender; + } + } - int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue()); - HostAndPort remote = HostAndPort.fromString(new String(newLocation, UTF_8)); + static class MonitorTracker implements Runnable { - SocketAppender socketAppender = new SocketAppender(); - socketAppender.setApplication(System.getProperty("accumulo.application", "unknown")); - socketAppender.setRemoteHost(remote.getHostText()); - socketAppender.setPort(remote.getPortOrDefault(defaultPort)); + private final AccumuloMonitorAppender parentAsyncAppender; + private final Supplier<MonitorLocation> currentLocationSupplier; + private final Function<MonitorLocation,AppenderSkeleton> appenderFactory; - socketAppender.activateOptions(); - AccumuloMonitorAppender.this.addAppender(socketAppender); + private MonitorLocation lastLocation; + private AppenderSkeleton lastSocketAppender; - lastSocketAppender = socketAppender; - } + public MonitorTracker(AccumuloMonitorAppender appender, Supplier<MonitorLocation> currentLocationSupplier, + Function<MonitorLocation,AppenderSkeleton> appenderFactory) { + this.parentAsyncAppender = Objects.requireNonNull(appender); + this.appenderFactory = Objects.requireNonNull(appenderFactory); + this.currentLocationSupplier = Objects.requireNonNull(currentLocationSupplier); - // update lastModifiedTransactionId, even if the new one is 0 (no new location) - lastModifiedTransactionId = newModifiedTransactionId; + this.lastLocation = new MonitorLocation(0, null); + this.lastSocketAppender = null; + } + + @Override + public void run() { + try { + MonitorLocation currentLocation = currentLocationSupplier.get(); + // detect change + if (!currentLocation.equals(lastLocation)) { + // clean up old appender + if (lastSocketAppender != null) { + parentAsyncAppender.removeAppender(lastSocketAppender); + lastSocketAppender.close(); + lastSocketAppender = null; + } + // create a new one + if (currentLocation.hasLocation()) { + lastSocketAppender = appenderFactory.apply(currentLocation); + lastSocketAppender.activateOptions(); + parentAsyncAppender.addAppender(lastSocketAppender); + } + // update the last location only if switching was successful + lastLocation = currentLocation; + } + } catch (Exception e) { + // dump any non-fatal problems to the console, but let it run again + e.printStackTrace(); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5afbacc0/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java b/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java new file mode 100644 index 0000000..cd4eb03 --- /dev/null +++ b/server/monitor/src/test/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppenderTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.util; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.fail; + +import java.util.Enumeration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.accumulo.monitor.util.AccumuloMonitorAppender.MonitorLocation; +import org.apache.accumulo.monitor.util.AccumuloMonitorAppender.MonitorTracker; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.Timeout; + +public class AccumuloMonitorAppenderTest { + + @Rule + public Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testActivateOptions() { + try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) { + appender.executorService.shutdown(); + // simulate tracker having already been scheduled + appender.trackerScheduled.compareAndSet(false, true); + appender.activateOptions(); + // activateOptions should not trigger a RejectedExecutionException, because we tricked it into thinking it was already called, and therefore it did not + // schedule the tracker after shutting down + } + + exception.expect(RejectedExecutionException.class); + try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) { + appender.executorService.shutdown(); + appender.activateOptions(); + fail("Calling activateOptions should have triggered a RejectedExecutionException"); + // this ensures that the activateOptions correctly attempts to schedule a worker + } + } + + @Test + public void testExecutorService() throws InterruptedException, ExecutionException { + ScheduledExecutorService executorService = null; + AtomicLong counter = new AtomicLong(2); + try (AccumuloMonitorAppender appender = new AccumuloMonitorAppender()) { + executorService = appender.executorService; + + // make sure executor service is started and running + Assert.assertEquals(false, executorService.isShutdown()); + Assert.assertEquals(false, executorService.isTerminated()); + + // make sure executor service executes tasks + ScheduledFuture<Long> future = executorService.schedule(() -> counter.getAndIncrement(), 1, TimeUnit.MILLISECONDS); + Assert.assertEquals(Long.valueOf(2), future.get()); + Assert.assertEquals(3, counter.get()); + + // schedule a task that won't finish + executorService.schedule(() -> counter.getAndIncrement(), 1, TimeUnit.DAYS); + + // make sure executor service is still running + Assert.assertEquals(false, executorService.isShutdown()); + Assert.assertEquals(false, executorService.isTerminated()); + } + // verify that closing the appender shuts down the executor service threads + Assert.assertEquals(true, executorService.isShutdown()); + executorService.awaitTermination(5, TimeUnit.SECONDS); + Assert.assertEquals(true, executorService.isTerminated()); + + // verify executor service did not wait for scheduled task to run + Assert.assertEquals(3, counter.get()); + } + + @Test + public void testMonitorTracker() throws InterruptedException { + AtomicLong currentLoc = new AtomicLong(0); + Supplier<MonitorLocation> locSupplier = () -> { + long loc = currentLoc.get(); + // for simplicity, create the location name from a number (0 represents no location) + byte[] location = loc == 0 ? null : ("loc" + loc).getBytes(UTF_8); + return new MonitorLocation(loc, location); + }; + Function<MonitorLocation,AppenderSkeleton> appenderFactory = newLocation -> new AppenderSkeleton() { + + { + this.name = "Appender for " + newLocation.getLocation(); + } + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + public void close() {} + + @Override + protected void append(LoggingEvent event) {} + + }; + + try (AccumuloMonitorAppender parent = new AccumuloMonitorAppender()) { + parent.setFrequency(1); // make it check frequently (every 1 millisecond) + parent.setTracker(new MonitorTracker(parent, locSupplier, appenderFactory)); + parent.activateOptions(); + + // initially there are no appenders + Assert.assertTrue(parent.getAllAppenders() == null); + updateLocAndVerify(currentLoc, parent, 0); + updateLocAndVerify(currentLoc, parent, 10); + + // verify it's the same after a few times + // this verifies the logic in the tracker's run method which compares current location with last to see if a change occurred + AppenderSkeleton lastAppender = (AppenderSkeleton) parent.getAllAppenders().nextElement(); + for (int x = 0; x < 10; x++) { + Thread.sleep(10); + AppenderSkeleton currentAppender = (AppenderSkeleton) parent.getAllAppenders().nextElement(); + Assert.assertSame(lastAppender, currentAppender); + } + + updateLocAndVerify(currentLoc, parent, 3); + updateLocAndVerify(currentLoc, parent, 0); + updateLocAndVerify(currentLoc, parent, 0); + updateLocAndVerify(currentLoc, parent, 12); + updateLocAndVerify(currentLoc, parent, 0); + updateLocAndVerify(currentLoc, parent, 335); + + updateLocAndVerify(currentLoc, parent, 0); + // verify we removed all the appenders + Assert.assertFalse(parent.getAllAppenders().hasMoreElements()); + } + } + + private static void updateLocAndVerify(AtomicLong currentLoc, AccumuloMonitorAppender parent, int newLoc) { + // set the new location + currentLoc.set(newLoc); + // wait for the appender to notice the change + while (!verifyAppender(parent, newLoc == 0 ? null : ("loc" + newLoc))) {} + } + + private static boolean verifyAppender(AccumuloMonitorAppender parent, String newLocName) { + Enumeration<?> childAppenders = parent.getAllAppenders(); + if (newLocName == null) { + return childAppenders == null || !childAppenders.hasMoreElements(); + } + if (childAppenders == null || !childAppenders.hasMoreElements()) { + return false; + } + AppenderSkeleton child = (AppenderSkeleton) childAppenders.nextElement(); + if (childAppenders.hasMoreElements()) { + Assert.fail("Appender should never have more than one child"); + } + return ("Appender for " + newLocName).equals(child.getName()); + } + +}