Repository: accumulo Updated Branches: refs/heads/1.6.1-SNAPSHOT 4d5ba9daf -> 5a72c37ea refs/heads/master 49b1f08c3 -> 428ed9a0f
ACCUMULO-2343 Add AsyncSocketAppender AsyncSocketAppender is a Log4J AsyncAppender with its own internal SocketAppender. Configuration for either appender can be set on the AsyncSocketAppender itself. An AsyncSocketAppender can be configured using a Log4J properties file, while an ordinary AsyncAppender cannot. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5a72c37e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5a72c37e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5a72c37e Branch: refs/heads/1.6.1-SNAPSHOT Commit: 5a72c37eadeca58e670517611f39824f4307321e Parents: 4d5ba9d Author: Bill Havanki <bhava...@cloudera.com> Authored: Thu Apr 17 17:01:20 2014 -0400 Committer: Bill Havanki <bhava...@cloudera.com> Committed: Mon May 5 09:16:36 2014 -0400 ---------------------------------------------------------------------- .../accumulo/core/util/AsyncSocketAppender.java | 112 +++++++++++++++++++ .../core/util/AsyncSocketAppenderTest.java | 79 +++++++++++++ .../test/functional/ConfigurableMacIT.java | 3 + .../test/functional/MonitorLoggingIT.java | 101 +++++++++++++++++ test/src/test/resources/conf/generic_logger.xml | 83 ++++++++++++++ test/src/test/resources/conf/monitor_logger.xml | 64 +++++++++++ 6 files changed, 442 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java b/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java new file mode 100644 index 0000000..baae9ba --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/AsyncSocketAppender.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.AsyncAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.net.SocketAppender; +import org.apache.log4j.spi.LoggingEvent; + +/** + * An asynchronous appender that maintains its own internal socket appender. Unlike <code>AsyncAppender</code>, this appender can be configured with a Log4J + * properties file, although in that case no additional appenders can be added. + */ +public class AsyncSocketAppender extends AsyncAppender { + + private final SocketAppender socketAppender; + private final AtomicBoolean activated = new AtomicBoolean(false); + + /** + * Creates a new appender. + */ + public AsyncSocketAppender() { + socketAppender = new SocketAppender(); + } + + /** + * Creates a new appender using the given socket appender internally. Use this constructor for testing only. + */ + AsyncSocketAppender(SocketAppender socketAppender) { + this.socketAppender = socketAppender; + } + + @Override + public void append(final LoggingEvent event) { + // Lazy attachment, to avoid calling non-final method in constructor + if (!isAttached(socketAppender)) { + addAppender(socketAppender); + } + + // Lazy activation / connection too, to allow setting host and port + if (activated.compareAndSet(false, true)) { + socketAppender.activateOptions(); + } + + super.append(event); + } + + // SocketAppender delegate methods + + public String getApplication() { + return socketAppender.getApplication(); + } + + // super.getLocationInfo() will always agree with socketAppender + public int getPort() { + return socketAppender.getPort(); + } + + public int getReconnectionDelay() { + return socketAppender.getReconnectionDelay(); + } + + public String getRemoteHost() { + return socketAppender.getRemoteHost(); + } + + public boolean isAdvertiseViaMulticastDNS() { + return socketAppender.isAdvertiseViaMulticastDNS(); + } + + public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) { + socketAppender.setAdvertiseViaMulticastDNS(advertiseViaMulticastDNS); + } + + public void setApplication(String lapp) { + socketAppender.setApplication(lapp); + } + + @Override + public void setLocationInfo(boolean locationInfo) { + super.setLocationInfo(locationInfo); + socketAppender.setLocationInfo(locationInfo); + } + + public void setPort(int port) { + socketAppender.setPort(port); + } + + public void setReconnectionDelay(int delay) { + socketAppender.setReconnectionDelay(delay); + } + + public void setRemoteHost(String host) { + socketAppender.setRemoteHost(host); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java b/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java new file mode 100644 index 0000000..414125a --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/util/AsyncSocketAppenderTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util; + +import org.apache.log4j.Logger; +import org.apache.log4j.Priority; +import org.apache.log4j.net.SocketAppender; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; +import org.easymock.Capture; +import static org.easymock.EasyMock.*; + +public class AsyncSocketAppenderTest { + private SocketAppender sa; + private AsyncSocketAppender asa; + + @Before + public void setUp() throws Exception { + sa = createMock(SocketAppender.class); + } + + @Test + public void testDelegates() { + asa = new AsyncSocketAppender(); + asa.setApplication("myapp"); + asa.setLocationInfo(true); + asa.setPort(1234); + asa.setReconnectionDelay(56); + asa.setRemoteHost("remotehost"); + assertEquals("myapp", asa.getApplication()); + assertEquals(true, asa.getLocationInfo()); // not really delegating + assertEquals(1234, asa.getPort()); + assertEquals(56, asa.getReconnectionDelay()); + assertEquals("remotehost", asa.getRemoteHost()); + } + + @Test + public void testSetLocationInfo() { + sa.setLocationInfo(true); + replay(sa); + asa = new AsyncSocketAppender(sa); + asa.setLocationInfo(true); + verify(sa); + } + + @Test + public void testAppend() { + asa = new AsyncSocketAppender(sa); + assertFalse(asa.isAttached(sa)); + LoggingEvent event1 = new LoggingEvent("java.lang.String", Logger.getRootLogger(), Priority.INFO, "event1", null); + LoggingEvent event2 = new LoggingEvent("java.lang.Integer", Logger.getRootLogger(), Priority.WARN, "event2", null); + sa.activateOptions(); + sa.doAppend(event1); + sa.doAppend(event2); + sa.close(); + replay(sa); + asa.doAppend(event1); + asa.doAppend(event2); + asa.close(); // forces events to be appended to socket appender + assertTrue(asa.isAttached(sa)); + verify(sa); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java index d9bed7f..59b0977 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java @@ -44,6 +44,8 @@ public class ConfigurableMacIT extends AbstractMacIT { public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {} + public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {} + @Before public void setUp() throws Exception { MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl( @@ -63,6 +65,7 @@ public class ConfigurableMacIT extends AbstractMacIT { coreSite.writeXml(out); out.close(); } + beforeClusterStart(cfg); cluster.start(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java b/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java new file mode 100644 index 0000000..2dadafe --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/MonitorLoggingIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URL; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.monitor.Monitor; +import org.apache.accumulo.server.util.Admin; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.junit.Test; + +public class MonitorLoggingIT extends ConfigurableMacIT { + public static final Logger log = Logger.getLogger(MonitorLoggingIT.class); + + @Override + public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception { + File confDir = cfg.getConfDir(); + try { + FileUtils.copyFileToDirectory(new File(MonitorLoggingIT.class.getResource("/conf/generic_logger.xml").toURI()), confDir); + FileUtils.copyFileToDirectory(new File(MonitorLoggingIT.class.getResource("/conf/monitor_logger.xml").toURI()), confDir); + } catch (Exception e) { + log.error("Failed to copy Log4J XML files to conf dir", e); + } + } + + private static final int NUM_LOCATION_PASSES = 5; + private static final long LOCATION_DELAY = 5000L; + + @Test + public void logToMonitor() throws Exception { + // Start the monitor. + log.debug("Starting Monitor"); + Process monitor = cluster.exec(Monitor.class); + + // Get monitor location to ensure it is running. + String monitorLocation = null; + for (int i = 0; i < NUM_LOCATION_PASSES; i++) { + Thread.sleep(LOCATION_DELAY); + try { + monitorLocation = getMonitor(); + break; + } catch (KeeperException e) { + log.debug("Monitor not up yet, trying again in " + LOCATION_DELAY + " ms"); + } + } + assertNotNull("Monitor failed to start within " + (LOCATION_DELAY * NUM_LOCATION_PASSES) + " ms", monitorLocation); + log.debug("Monitor running at " + monitorLocation); + + // Attempt a scan with an invalid iterator to force a log message in the monitor. + try { + Connector c = getConnector(); + Scanner s = c.createScanner("accumulo.root", new Authorizations()); + IteratorSetting cfg = new IteratorSetting(100, "incorrect", "java.lang.String"); + s.addScanIterator(cfg); + s.iterator().next(); + } catch (Exception e) { + // expected, the iterator was bad + } + Thread.sleep(5000L); // extra precaution to ensure monitor has opportunity to log + + // Verify messages were received at the monitor. + URL url = new URL("http://" + monitorLocation + "/log"); + log.debug("Fetching web page " + url); + String result = FunctionalTestUtils.readAll(url.openStream()); + assertTrue("No log messages found", result.contains("<pre class='logevent'>")); + + // Shutdown cleanly. + log.debug("Stopping mini accumulo cluster"); + Process shutdown = cluster.exec(Admin.class, "stopAll"); + shutdown.waitFor(); + assertTrue(shutdown.exitValue() == 0); + log.debug("success!"); + monitor.destroy(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/resources/conf/generic_logger.xml ---------------------------------------------------------------------- diff --git a/test/src/test/resources/conf/generic_logger.xml b/test/src/test/resources/conf/generic_logger.xml new file mode 100644 index 0000000..db79efe --- /dev/null +++ b/test/src/test/resources/conf/generic_logger.xml @@ -0,0 +1,83 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <!-- Write out everything at the DEBUG level to the debug log --> + <appender name="A2" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.debug.log"/> + <param name="MaxFileSize" value="1000MB"/> + <param name="MaxBackupIndex" value="10"/> + <param name="Threshold" value="DEBUG"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %m%n"/> + </layout> + </appender> + + <!-- Write out INFO and higher to the regular log --> + <appender name="A3" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.log"/> + <param name="MaxFileSize" value="1000MB"/> + <param name="MaxBackupIndex" value="10"/> + <param name="Threshold" value="INFO"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %m%n"/> + </layout> + </appender> + + <!-- Send all logging data to a centralized logger --> + <appender name="N1" class="org.apache.log4j.net.SocketAppender"> + <param name="remoteHost" value="${org.apache.accumulo.core.host.log}"/> + <param name="port" value="${org.apache.accumulo.core.host.log.port}"/> + <param name="application" value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/> + <param name="Threshold" value="WARN"/> + </appender> + + <!-- If the centralized logger is down, buffer the log events, but drop them if it stays down --> + <appender name="ASYNC" class="org.apache.log4j.AsyncAppender"> + <appender-ref ref="N1" /> + </appender> + + <!-- Log accumulo events to the debug, normal and remote logs. --> + <logger name="org.apache.accumulo" additivity="false"> + <level value="DEBUG"/> + <appender-ref ref="A2" /> + <appender-ref ref="A3" /> + <appender-ref ref="ASYNC" /> + </logger> + + <logger name="org.apache.accumulo.core.file.rfile.bcfile"> + <level value="INFO"/> + </logger> + + <logger name="org.mortbay.log"> + <level value="WARN"/> + </logger> + + <logger name="org.apache.zookeeper"> + <level value="ERROR"/> + </logger> + + <!-- Log non-accumulo events to the debug and normal logs. --> + <root> + <level value="INFO"/> + <appender-ref ref="A2" /> + <appender-ref ref="A3" /> + </root> + +</log4j:configuration> http://git-wip-us.apache.org/repos/asf/accumulo/blob/5a72c37e/test/src/test/resources/conf/monitor_logger.xml ---------------------------------------------------------------------- diff --git a/test/src/test/resources/conf/monitor_logger.xml b/test/src/test/resources/conf/monitor_logger.xml new file mode 100644 index 0000000..91a7671 --- /dev/null +++ b/test/src/test/resources/conf/monitor_logger.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + <!-- Write out everything at the DEBUG level to the debug log --> + <appender name="A2" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.debug.log"/> + <param name="MaxFileSize" value="100MB"/> + <param name="MaxBackupIndex" value="10"/> + <param name="Threshold" value="DEBUG"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %X{application} %m%n"/> + </layout> + </appender> + + <!-- Write out INFO and higher to the regular log --> + <appender name="A3" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${org.apache.accumulo.core.dir.log}/${org.apache.accumulo.core.application}_${org.apache.accumulo.core.ip.localhost.hostname}.log"/> + <param name="MaxFileSize" value="100MB"/> + <param name="MaxBackupIndex" value="10"/> + <param name="Threshold" value="INFO"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %X{application} %m%n"/> + </layout> + </appender> + + <!-- Keep the last few log messages for display to the user --> + <appender name="GUI" class="org.apache.accumulo.server.monitor.LogService"> + <param name="keep" value="40"/> + <param name="Threshold" value="WARN"/> + </appender> + + <!-- Log accumulo messages to debug, normal and GUI --> + <logger name="org.apache.accumulo" additivity="false"> + <level value="DEBUG"/> + <appender-ref ref="A2" /> + <appender-ref ref="A3" /> + <appender-ref ref="GUI" /> + </logger> + + <!-- Log non-accumulo messages to debug, normal logs. --> + <root> + <level value="INFO"/> + <appender-ref ref="A2" /> + <appender-ref ref="A3" /> + </root> + +</log4j:configuration>