This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 93e44ec2ad limit thread and add queue to monitor append (#4879)
93e44ec2ad is described below

commit 93e44ec2adfd1d1fc308c3f5f054d8548e24592c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Oct 7 11:14:54 2024 -0400

    limit thread and add queue to monitor append (#4879)
    
    Add options to configure the AccumuloMonitorAppender as async (default)
    with a configurable number of concurrent maxThreads and queueSize. When
    the queue is full the monitor appender will now drop log messages.
    
    This implementation uses an executor and thread pool exclusive to the
    appender, rather than shared using a global one, which is the default
    behavior for HttpClient. The maxThreads and queueSize options only have
    an effect if async is true (which is the default).
    
    A user may wish to turn async off for this appender, and wrap this
    appender with AsyncAppender instead, which may have additional options
    available.
    
    * Additional improvements to monitor appender
    
    * Bump to log4j 2.24.0
    * Use the annotation processor to generate Log4j2Plugins.dat in the
      monitor jar's META-INF/, rather than rely on Log4j2's deprecated
      package scanning to find and register the AccumuloMonitorAppender
    * Simplify the generics for the AccumuloMonitorAppender.Builder
    * Add cleanup to executor, to make a best effort to not leave it running
      when the appender is reconstructed due to configuration changes
    * Simplify "canAppend" logic by just saving the executor as a member, so
      we can get the queue size from that
    * Logs stats about dropped messages, errors, and total messages sent
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 assemble/conf/log4j2-service.properties            |   3 +
 pom.xml                                            |   8 +-
 .../util/logging/AccumuloMonitorAppender.java      | 130 +++++++++++++++++++--
 3 files changed, 128 insertions(+), 13 deletions(-)

diff --git a/assemble/conf/log4j2-service.properties 
b/assemble/conf/log4j2-service.properties
index 01e5e28314..984c543da4 100644
--- a/assemble/conf/log4j2-service.properties
+++ b/assemble/conf/log4j2-service.properties
@@ -71,6 +71,9 @@ appender.monitor.type = AccumuloMonitor
 appender.monitor.name = MonitorLog
 appender.monitor.filter.threshold.type = ThresholdFilter
 appender.monitor.filter.threshold.level = warn
+#appender.monitor.async = true
+#appender.monitor.maxThreads = 2
+#appender.monitor.queueSize = 1024
 
 logger.zookeeper.name = org.apache.zookeeper
 logger.zookeeper.level = error
diff --git a/pom.xml b/pom.xml
index 1faae07633..a47435ace4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,6 +151,7 @@
     <version.curator>5.5.0</version.curator>
     <version.errorprone>2.24.1</version.errorprone>
     <version.hadoop>3.3.6</version.hadoop>
+    <version.log4j>2.24.0</version.log4j>
     <version.opentelemetry>1.34.1</version.opentelemetry>
     <version.powermock>2.0.9</version.powermock>
     <version.slf4j>2.0.12</version.slf4j>
@@ -198,7 +199,7 @@
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>
         <artifactId>log4j-bom</artifactId>
-        <version>2.23.1</version>
+        <version>${version.log4j}</version>
         <type>pom</type>
         <scope>import</scope>
       </dependency>
@@ -792,6 +793,11 @@
                 <artifactId>auto-service</artifactId>
                 <version>${version.auto-service}</version>
               </path>
+              <path>
+                <groupId>org.apache.logging.log4j</groupId>
+                <artifactId>log4j-core</artifactId>
+                <version>${version.log4j}</version>
+              </path>
             </annotationProcessorPaths>
           </configuration>
         </plugin>
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
index 7dfde0f3fa..f6f575eac0 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/logging/AccumuloMonitorAppender.java
@@ -26,8 +26,15 @@ import java.net.URI;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
 import java.net.http.HttpRequest.BodyPublishers;
+import java.net.http.HttpResponse;
 import java.net.http.HttpResponse.BodyHandlers;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
@@ -44,6 +51,7 @@ import org.apache.logging.log4j.core.LogEvent;
 import org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.logging.log4j.core.config.Property;
 import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
 
 import com.google.gson.Gson;
@@ -60,34 +68,86 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 public class AccumuloMonitorAppender extends AbstractAppender {
 
   @PluginBuilderFactory
-  public static <B extends Builder<B>> B newBuilder() {
-    return new Builder<B>().asBuilder();
+  public static Builder newBuilder() {
+    return new Builder();
   }
 
-  public static class Builder<B extends Builder<B>> extends 
AbstractAppender.Builder<B>
+  public static class Builder extends AbstractAppender.Builder<Builder>
       implements 
org.apache.logging.log4j.core.util.Builder<AccumuloMonitorAppender> {
 
+    @PluginBuilderAttribute
+    private boolean async = true;
+
+    @PluginBuilderAttribute
+    private int queueSize = 1024;
+
+    @PluginBuilderAttribute
+    private int maxThreads = 2;
+
+    public Builder setAsync(boolean async) {
+      this.async = async;
+      return this;
+    }
+
+    public boolean getAsync() {
+      return async;
+    }
+
+    public Builder setQueueSize(int size) {
+      queueSize = size;
+      return this;
+    }
+
+    public int getQueueSize() {
+      return queueSize;
+    }
+
+    public Builder setMaxThreads(int maxThreads) {
+      this.maxThreads = maxThreads;
+      return this;
+    }
+
+    public int getMaxThreads() {
+      return maxThreads;
+    }
+
     @Override
     public AccumuloMonitorAppender build() {
       return new AccumuloMonitorAppender(getName(), getFilter(), 
isIgnoreExceptions(),
-          getPropertyArray());
+          getPropertyArray(), getQueueSize(), getMaxThreads(), getAsync());
     }
 
   }
 
   private final Gson gson = new Gson();
-  private final HttpClient httpClient = HttpClient.newHttpClient();
+  private final HttpClient httpClient;
   private final Supplier<Optional<URI>> monitorLocator;
+  private final ThreadPoolExecutor executor;
+  private final boolean async;
+  private final int queueSize;
+  private final AtomicLong appends = new AtomicLong(0);
+  private final AtomicLong discards = new AtomicLong(0);
+  private final AtomicLong errors = new AtomicLong(0);
+  private final ConcurrentMap<Integer,AtomicLong> statusCodes = new 
ConcurrentSkipListMap<>();
 
   private ServerContext context;
   private String path;
   private Pair<Long,Optional<URI>> lastResult = new Pair<>(0L, 
Optional.empty());
 
   private AccumuloMonitorAppender(final String name, final Filter filter,
-      final boolean ignoreExceptions, final Property[] properties) {
+      final boolean ignoreExceptions, final Property[] properties, int 
queueSize, int maxThreads,
+      boolean async) {
     super(name, filter, null, ignoreExceptions, properties);
-    final ZcStat stat = new ZcStat();
-    monitorLocator = () -> {
+
+    this.executor = async ? new ThreadPoolExecutor(0, maxThreads, 30, 
TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>()) : null;
+    final var builder = HttpClient.newBuilder();
+    this.httpClient = (async ? builder.executor(executor) : builder).build();
+    this.queueSize = queueSize;
+    this.async = async;
+
+    final var stat = new ZcStat();
+    this.monitorLocator = () -> {
       // lazily set up context/path
       if (context == null) {
         context = new ServerContext(SiteConfiguration.auto());
@@ -106,9 +166,24 @@ public class AccumuloMonitorAppender extends 
AbstractAppender {
     };
   }
 
+  private String getStats() {
+    return "discards:" + discards.get() + " errors:" + errors.get() + " 
appends:" + appends.get()
+        + " statusCodes:" + statusCodes;
+  }
+
+  private void processResponse(HttpResponse<?> response) {
+    var statusCode = response.statusCode();
+    statusCodes.computeIfAbsent(statusCode, sc -> new 
AtomicLong()).getAndIncrement();
+    if (statusCode >= 400 && statusCode < 600) {
+      error("Unable to send HTTP in appender [" + getName() + "]. Status: " + 
statusCode + " "
+          + getStats());
+    }
+  }
+
   @Override
   public void append(final LogEvent event) {
-    monitorLocator.get().ifPresent(uri -> {
+    appends.getAndIncrement();
+    monitorLocator.get().ifPresentOrElse(uri -> {
       try {
         var pojo = new SingleLogEvent();
         pojo.timestamp = event.getTimeMillis();
@@ -122,14 +197,45 @@ public class AccumuloMonitorAppender extends 
AbstractAppender {
 
         var req = 
HttpRequest.newBuilder(uri).POST(BodyPublishers.ofString(jsonEvent, UTF_8))
             .setHeader("Content-Type", "application/json").build();
-        @SuppressWarnings("unused")
-        var future = httpClient.sendAsync(req, BodyHandlers.discarding());
+
+        if (async) {
+          if (executor.getQueue().size() < queueSize) {
+            httpClient.sendAsync(req, 
BodyHandlers.discarding()).thenAccept(this::processResponse)
+                .exceptionally(e -> {
+                  errors.getAndIncrement();
+                  error("Unable to send HTTP in appender [" + getName() + "] " 
+ getStats(), event,
+                      e);
+                  return null;
+                });
+          } else {
+            discards.getAndIncrement();
+            error("Unable to send HTTP in appender [" + getName() + "]. Queue 
full. " + getStats());
+          }
+        } else {
+          processResponse(httpClient.send(req, BodyHandlers.discarding()));
+        }
       } catch (final Exception e) {
-        error("Unable to send HTTP in appender [" + getName() + "]", event, e);
+        errors.getAndIncrement();
+        error("Unable to send HTTP in appender [" + getName() + "] " + 
getStats(), event, e);
       }
+    }, () -> {
+      discards.getAndIncrement();
+      error("Unable to send HTTP in appender [" + getName() + "]. No monitor 
is running. "
+          + getStats());
     });
   }
 
+  @Override
+  protected boolean stop(long timeout, TimeUnit timeUnit, boolean 
changeLifeCycleState) {
+    if (changeLifeCycleState) {
+      setStopping();
+    }
+    if (executor != null) {
+      executor.shutdown();
+    }
+    return super.stop(timeout, timeUnit, changeLifeCycleState);
+  }
+
   @SuppressFBWarnings(value = "INFORMATION_EXPOSURE_THROUGH_AN_ERROR_MESSAGE",
       justification = "throwable is intended to be printed to output stream, 
to send to monitor")
   private static String throwableToStacktrace(Throwable t) {

Reply via email to