This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 93e44ec2ad limit thread and add queue to monitor append (#4879) 93e44ec2ad is described below commit 93e44ec2adfd1d1fc308c3f5f054d8548e24592c Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Oct 7 11:14:54 2024 -0400 limit thread and add queue to monitor append (#4879) Add options to configure the AccumuloMonitorAppender as async (default) with a configurable number of concurrent maxThreads and queueSize. When the queue is full the monitor appender will now drop log messages. This implementation uses an executor and thread pool exclusive to the appender, rather than shared using a global one, which is the default behavior for HttpClient. The maxThreads and queueSize options only have an effect if async is true (which is the default). A user may wish to turn async off for this appender, and wrap this appender with AsyncAppender instead, which may have additional options available. * Additional improvements to monitor appender * Bump to log4j 2.24.0 * Use the annotation processor to generate Log4j2Plugins.dat in the monitor jar's META-INF/, rather than rely on Log4j2's deprecated package scanning to find and register the AccumuloMonitorAppender * Simplify the generics for the AccumuloMonitorAppender.Builder * Add cleanup to executor, to make a best effort to not leave it running when the appender is reconstructed due to configuration changes * Simplify "canAppend" logic by just saving the executor as a member, so we can get the queue size from that * Logs stats about dropped messages, errors, and total messages sent Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- assemble/conf/log4j2-service.properties | 3 + pom.xml | 8 +- .../util/logging/AccumuloMonitorAppender.java | 130 +++++++++++++++++++-- 3 files changed, 128 insertions(+), 13 deletions(-) diff --git a/assemble/conf/log4j2-service.properties b/assemble/conf/log4j2-service.properties index 01e5e28314..984c543da4 100644 --- a/assemble/conf/log4j2-service.properties +++ b/assemble/conf/log4j2-service.properties @@ -71,6 +71,9 @@ appender.monitor.type = AccumuloMonitor appender.monitor.name = MonitorLog appender.monitor.filter.threshold.type = ThresholdFilter appender.monitor.filter.threshold.level = warn +#appender.monitor.async = true +#appender.monitor.maxThreads = 2 +#appender.monitor.queueSize = 1024 logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = error diff --git a/pom.xml b/pom.xml index 1faae07633..a47435ace4 100644 --- a/pom.xml +++ b/pom.xml @@ -151,6 +151,7 @@ <version.curator>5.5.0</version.curator> <version.errorprone>2.24.1</version.errorprone> <version.hadoop>3.3.6</version.hadoop> + <version.log4j>2.24.0</version.log4j> <version.opentelemetry>1.34.1</version.opentelemetry> <version.powermock>2.0.9</version.powermock> <version.slf4j>2.0.12</version.slf4j> @@ -198,7 +199,7 @@ <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-bom</artifactId> - <version>2.23.1</version> + <version>${version.log4j}</version> <type>pom</type> <scope>import</scope> </dependency> @@ -792,6 +793,11 @@ <artifactId>auto-service</artifactId> <version>${version.auto-service}</version> </path> + <path> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${version.log4j}</version> + </path> </annotationProcessorPaths> </configuration> </plugin> diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java index 7dfde0f3fa..f6f575eac0 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java @@ -26,8 +26,15 @@ import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; import java.util.Optional; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.accumulo.core.Constants; @@ -44,6 +51,7 @@ import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; import com.google.gson.Gson; @@ -60,34 +68,86 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class AccumuloMonitorAppender extends AbstractAppender { @PluginBuilderFactory - public static <B extends Builder<B>> B newBuilder() { - return new Builder<B>().asBuilder(); + public static Builder newBuilder() { + return new Builder(); } - public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B> + public static class Builder extends AbstractAppender.Builder<Builder> implements org.apache.logging.log4j.core.util.Builder<AccumuloMonitorAppender> { + @PluginBuilderAttribute + private boolean async = true; + + @PluginBuilderAttribute + private int queueSize = 1024; + + @PluginBuilderAttribute + private int maxThreads = 2; + + public Builder setAsync(boolean async) { + this.async = async; + return this; + } + + public boolean getAsync() { + return async; + } + + public Builder setQueueSize(int size) { + queueSize = size; + return this; + } + + public int getQueueSize() { + return queueSize; + } + + public Builder setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + return this; + } + + public int getMaxThreads() { + return maxThreads; + } + @Override public AccumuloMonitorAppender build() { return new AccumuloMonitorAppender(getName(), getFilter(), isIgnoreExceptions(), - getPropertyArray()); + getPropertyArray(), getQueueSize(), getMaxThreads(), getAsync()); } } private final Gson gson = new Gson(); - private final HttpClient httpClient = HttpClient.newHttpClient(); + private final HttpClient httpClient; private final Supplier<Optional<URI>> monitorLocator; + private final ThreadPoolExecutor executor; + private final boolean async; + private final int queueSize; + private final AtomicLong appends = new AtomicLong(0); + private final AtomicLong discards = new AtomicLong(0); + private final AtomicLong errors = new AtomicLong(0); + private final ConcurrentMap<Integer,AtomicLong> statusCodes = new ConcurrentSkipListMap<>(); private ServerContext context; private String path; private Pair<Long,Optional<URI>> lastResult = new Pair<>(0L, Optional.empty()); private AccumuloMonitorAppender(final String name, final Filter filter, - final boolean ignoreExceptions, final Property[] properties) { + final boolean ignoreExceptions, final Property[] properties, int queueSize, int maxThreads, + boolean async) { super(name, filter, null, ignoreExceptions, properties); - final ZcStat stat = new ZcStat(); - monitorLocator = () -> { + + this.executor = async ? new ThreadPoolExecutor(0, maxThreads, 30, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()) : null; + final var builder = HttpClient.newBuilder(); + this.httpClient = (async ? builder.executor(executor) : builder).build(); + this.queueSize = queueSize; + this.async = async; + + final var stat = new ZcStat(); + this.monitorLocator = () -> { // lazily set up context/path if (context == null) { context = new ServerContext(SiteConfiguration.auto()); @@ -106,9 +166,24 @@ public class AccumuloMonitorAppender extends AbstractAppender { }; } + private String getStats() { + return "discards:" + discards.get() + " errors:" + errors.get() + " appends:" + appends.get() + + " statusCodes:" + statusCodes; + } + + private void processResponse(HttpResponse<?> response) { + var statusCode = response.statusCode(); + statusCodes.computeIfAbsent(statusCode, sc -> new AtomicLong()).getAndIncrement(); + if (statusCode >= 400 && statusCode < 600) { + error("Unable to send HTTP in appender [" + getName() + "]. Status: " + statusCode + " " + + getStats()); + } + } + @Override public void append(final LogEvent event) { - monitorLocator.get().ifPresent(uri -> { + appends.getAndIncrement(); + monitorLocator.get().ifPresentOrElse(uri -> { try { var pojo = new SingleLogEvent(); pojo.timestamp = event.getTimeMillis(); @@ -122,14 +197,45 @@ public class AccumuloMonitorAppender extends AbstractAppender { var req = HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(jsonEvent, UTF_8)) .setHeader("Content-Type", "application/json").build(); - @SuppressWarnings("unused") - var future = httpClient.sendAsync(req, BodyHandlers.discarding()); + + if (async) { + if (executor.getQueue().size() < queueSize) { + httpClient.sendAsync(req, BodyHandlers.discarding()).thenAccept(this::processResponse) + .exceptionally(e -> { + errors.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "] " + getStats(), event, + e); + return null; + }); + } else { + discards.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "]. Queue full. " + getStats()); + } + } else { + processResponse(httpClient.send(req, BodyHandlers.discarding())); + } } catch (final Exception e) { - error("Unable to send HTTP in appender [" + getName() + "]", event, e); + errors.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "] " + getStats(), event, e); } + }, () -> { + discards.getAndIncrement(); + error("Unable to send HTTP in appender [" + getName() + "]. No monitor is running. " + + getStats()); }); } + @Override + protected boolean stop(long timeout, TimeUnit timeUnit, boolean changeLifeCycleState) { + if (changeLifeCycleState) { + setStopping(); + } + if (executor != null) { + executor.shutdown(); + } + return super.stop(timeout, timeUnit, changeLifeCycleState); + } + @SuppressFBWarnings(value = "INFORMATION_EXPOSURE_THROUGH_AN_ERROR_MESSAGE", justification = "throwable is intended to be printed to output stream, to send to monitor") private static String throwableToStacktrace(Throwable t) {