This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 7415a252d8 Improve ConditionalLogger (#6159)
7415a252d8 is described below

commit 7415a252d8639e48cd24e593cc76406427a3cf79
Author: Christopher Tubbs <[email protected]>
AuthorDate: Fri Feb 27 15:58:14 2026 -0500

    Improve ConditionalLogger (#6159)
    
    * Use a slightly smaller/simpler implementation
    * Do not rely on slf4j 2.0 APIs
    * Do not extend AbstractLogger or rely on its implementation
    * Replace BiFunction with BiPredicate
    * Avoid serialization issue flagged by Java 21 compiler where
      ConditionalLogger had a non-transient, non-serializable field
      (condition), which made ConditionalLogger not serializable, while
      extending AbstractLogger, which was serializable
---
 .../accumulo/core/logging/ConditionalLogger.java   | 167 +++++++++------------
 .../spi/balancer/HostRegexTableLoadBalancer.java   |   5 +-
 .../core/logging/DeduplicatingLoggerTest.java      |   2 +-
 .../core/logging/EscalatingLoggerTest.java         |   5 +-
 .../coordinator/CompactionCoordinator.java         |  13 +-
 .../accumulo/manager/TabletGroupWatcher.java       |   5 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |   2 +-
 7 files changed, 88 insertions(+), 111 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java
index 6da6454f06..d82221a099 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java
@@ -18,17 +18,16 @@
  */
 package org.apache.accumulo.core.logging;
 
+import static java.util.Objects.requireNonNull;
+
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
-import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
 
 import org.apache.accumulo.core.util.Pair;
 import org.slf4j.Logger;
-import org.slf4j.Marker;
-import org.slf4j.event.Level;
-import org.slf4j.helpers.AbstractLogger;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -37,9 +36,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
  * Logger that wraps another Logger and only emits a log message once per the 
supplied duration.
  *
  */
-public abstract class ConditionalLogger extends AbstractLogger {
-
-  private static final long serialVersionUID = 1L;
+public abstract class ConditionalLogger {
 
   /**
    * A Logger implementation that will log a message at the supplied elevated 
level if it has not
@@ -49,30 +46,37 @@ public abstract class ConditionalLogger extends 
AbstractLogger {
    */
   public static class EscalatingLogger extends DeduplicatingLogger {
 
-    private static final long serialVersionUID = 1L;
-    private final Level elevatedLevel;
+    private final ConditionalLogAction elevatedLogAction;
 
     public EscalatingLogger(Logger log, Duration threshold, long 
maxCachedLogMessages,
-        Level elevatedLevel) {
+        ConditionalLogAction elevatedLogAction) {
       super(log, threshold, maxCachedLogMessages);
-      this.elevatedLevel = elevatedLevel;
+      this.elevatedLogAction = requireNonNull(elevatedLogAction);
     }
 
     @Override
-    protected void handleNormalizedLoggingCall(Level level, Marker marker, 
String messagePattern,
-        Object[] arguments, Throwable throwable) {
+    public void trace(String format, Object... arguments) {
+      log(elevatedLogAction, Logger::trace, format, arguments);
+    }
 
-      if (arguments == null) {
-        arguments = new Object[0];
-      }
-      if (!condition.apply(messagePattern, Arrays.asList(arguments))) {
-        
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern,
-            arguments);
-      } else {
-        
delegate.atLevel(elevatedLevel).addMarker(marker).setCause(throwable).log(messagePattern,
-            arguments);
-      }
+    @Override
+    public void debug(String format, Object... arguments) {
+      log(elevatedLogAction, Logger::debug, format, arguments);
+    }
+
+    @Override
+    public void info(String format, Object... arguments) {
+      log(elevatedLogAction, Logger::info, format, arguments);
+    }
 
+    @Override
+    public void warn(String format, Object... arguments) {
+      log(elevatedLogAction, Logger::warn, format, arguments);
+    }
+
+    @Override
+    public void error(String format, Object... arguments) {
+      log(elevatedLogAction, Logger::error, format, arguments);
     }
 
   }
@@ -82,10 +86,8 @@ public abstract class ConditionalLogger extends 
AbstractLogger {
    */
   public static class DeduplicatingLogger extends ConditionalLogger {
 
-    private static final long serialVersionUID = 1L;
-
     public DeduplicatingLogger(Logger log, Duration threshold, long 
maxCachedLogMessages) {
-      super(log, new BiFunction<>() {
+      super(log, new BiPredicate<>() {
 
         private final Cache<Pair<String,List<Object>>,Boolean> cache = 
Caffeine.newBuilder()
             
.expireAfterWrite(threshold).maximumSize(maxCachedLogMessages).build();
@@ -93,14 +95,20 @@ public abstract class ConditionalLogger extends 
AbstractLogger {
         private final ConcurrentMap<Pair<String,List<Object>>,Boolean> 
cacheMap = cache.asMap();
 
         /**
-         * Function that will return true if the message has not been seen in 
the supplied duration.
+         * Function that will return true if the message with the provided 
arguments (minus any
+         * included Throwable as the last argument) has not been seen in the 
supplied duration.
+         * Deduplication will only work if the arguments are of types that 
implement meaningful
+         * equals. This is not generally true of Throwables.
          *
          * @param msg log message
          * @param args log message arguments
          * @return true if message has not been seen in duration, else false.
          */
         @Override
-        public Boolean apply(String msg, List<Object> args) {
+        public boolean test(String msg, List<Object> args) {
+          if (!args.isEmpty() && args.get(args.size() - 1) instanceof 
Throwable) {
+            args = args.subList(0, args.size() - 1);
+          }
           return cacheMap.putIfAbsent(new Pair<>(msg, args), true) == null;
         }
 
@@ -110,85 +118,60 @@ public abstract class ConditionalLogger extends 
AbstractLogger {
   }
 
   protected final Logger delegate;
-  protected final BiFunction<String,List<Object>,Boolean> condition;
-
-  protected ConditionalLogger(Logger log, 
BiFunction<String,List<Object>,Boolean> condition) {
-    // this.delegate = new DelegateWrapper(log);
-    this.delegate = log;
-    this.condition = condition;
-  }
-
-  @Override
-  public boolean isTraceEnabled() {
-    return this.delegate.isTraceEnabled();
-  }
-
-  @Override
-  public boolean isTraceEnabled(Marker marker) {
-    return this.delegate.isTraceEnabled(marker);
-  }
-
-  @Override
-  public boolean isDebugEnabled() {
-    return this.delegate.isDebugEnabled();
-  }
-
-  @Override
-  public boolean isDebugEnabled(Marker marker) {
-    return this.delegate.isDebugEnabled(marker);
-  }
+  protected final BiPredicate<String,List<Object>> condition;
 
-  @Override
-  public boolean isInfoEnabled() {
-    return this.delegate.isInfoEnabled();
+  protected ConditionalLogger(Logger log, BiPredicate<String,List<Object>> 
condition) {
+    this.delegate = requireNonNull(log);
+    this.condition = requireNonNull(condition);
   }
 
-  @Override
-  public boolean isInfoEnabled(Marker marker) {
-    return this.delegate.isInfoEnabled(marker);
+  @FunctionalInterface
+  public interface ConditionalLogAction {
+    void log(Logger logger, String format, Object... arguments);
   }
 
-  @Override
-  public boolean isWarnEnabled() {
-    return this.delegate.isWarnEnabled();
-  }
-
-  @Override
-  public boolean isWarnEnabled(Marker marker) {
-    return this.delegate.isWarnEnabled(marker);
+  /**
+   * Conditionally executes the log action with the provided format string and 
arguments
+   *
+   * @param conditionTrueLogAction the log action to execute (e.g. 
Logger::warn, Logger::debug,
+   *        etc.) when the condition is true (optional, may be null)
+   * @param conditionFalseLogAction the log action to execute (e.g. 
Logger::warn, Logger::debug,
+   *        etc.) when the condition is false (optional, may be null)
+   * @param format the message format String for the logger
+   * @param arguments the arguments to the format String
+   */
+  protected final void log(ConditionalLogAction conditionTrueLogAction,
+      ConditionalLogAction conditionFalseLogAction, String format, Object... 
arguments) {
+    if (arguments == null) {
+      arguments = new Object[0];
+    }
+    if (condition.test(format, Arrays.asList(arguments))) {
+      if (conditionTrueLogAction != null) {
+        conditionTrueLogAction.log(delegate, format, arguments);
+      }
+    } else if (conditionFalseLogAction != null) {
+      conditionFalseLogAction.log(delegate, format, arguments);
+    }
   }
 
-  @Override
-  public boolean isErrorEnabled() {
-    return this.delegate.isErrorEnabled();
+  public void trace(String format, Object... arguments) {
+    log(Logger::trace, null, format, arguments);
   }
 
-  @Override
-  public boolean isErrorEnabled(Marker marker) {
-    return this.delegate.isErrorEnabled(marker);
+  public void debug(String format, Object... arguments) {
+    log(Logger::debug, null, format, arguments);
   }
 
-  @Override
-  public String getName() {
-    return this.delegate.getName();
+  public void info(String format, Object... arguments) {
+    log(Logger::info, null, format, arguments);
   }
 
-  @Override
-  protected String getFullyQualifiedCallerName() {
-    return this.delegate.getName();
+  public void warn(String format, Object... arguments) {
+    log(Logger::warn, null, format, arguments);
   }
 
-  @Override
-  protected void handleNormalizedLoggingCall(Level level, Marker marker, 
String messagePattern,
-      Object[] arguments, Throwable throwable) {
-
-    if (arguments == null) {
-      arguments = new Object[0];
-    }
-    if (condition.apply(messagePattern, Arrays.asList(arguments))) {
-      
delegate.atLevel(level).addMarker(marker).setCause(throwable).log(messagePattern,
 arguments);
-    }
-
+  public void error(String format, Object... arguments) {
+    log(Logger::error, null, format, arguments);
   }
 
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
index 4637d8c610..49309dc352 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
@@ -62,7 +62,6 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -102,8 +101,8 @@ public class HostRegexTableLoadBalancer extends 
TableLoadBalancer {
   private static final String PROP_PREFIX = 
Property.TABLE_ARBITRARY_PROP_PREFIX.getKey();
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HostRegexTableLoadBalancer.class);
-  private static final Logger MIGRATIONS_LOGGER =
-      new EscalatingLogger(LOG, Duration.ofMinutes(5), 1000, Level.WARN);
+  private static final EscalatingLogger MIGRATIONS_LOGGER =
+      new EscalatingLogger(LOG, Duration.ofMinutes(5), 1000, Logger::warn);
   public static final String HOST_BALANCER_PREFIX = PROP_PREFIX + 
"balancer.host.regex.";
   public static final String HOST_BALANCER_OOB_CHECK_KEY =
       PROP_PREFIX + "balancer.host.regex.oob.period";
diff --git 
a/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
index 826dfed144..e3fe83d7f9 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class DeduplicatingLoggerTest {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DeduplicatingLoggerTest.class);
-  private static final Logger TEST_LOGGER =
+  private static final DeduplicatingLogger TEST_LOGGER =
       new DeduplicatingLogger(LOG, Duration.ofMinutes(1), 100);
 
   @Test
diff --git 
a/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java 
b/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
index cd368e38a4..a119601cf0 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
@@ -33,13 +33,12 @@ import org.apache.logging.log4j.core.layout.PatternLayout;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
 
 public class EscalatingLoggerTest {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(EscalatingLoggerTest.class);
-  private static final Logger TEST_LOGGER =
-      new EscalatingLogger(LOG, Duration.ofSeconds(3), 100, Level.WARN);
+  private static final EscalatingLogger TEST_LOGGER =
+      new EscalatingLogger(LOG, Duration.ofSeconds(3), 100, Logger::warn);
 
   @Test
   public void test() throws InterruptedException {
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 387b1559b3..dcbdc0e2de 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -61,6 +61,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import 
org.apache.accumulo.core.fate.zookeeper.ServiceLockSupport.HAServiceLockWatcher;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.logging.ConditionalLogger.ConditionalLogAction;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -96,7 +97,6 @@ import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.event.Level;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -705,16 +705,13 @@ public class CompactionCoordinator extends AbstractServer 
implements
     for (var key : failureCounts.keySet()) {
       failureCounts.compute(key, (k, counts) -> {
         if (counts != null) {
-          Level level;
+          ConditionalLogAction logAction = Logger::debug;
           if (counts.failures > 0) {
-            level = Level.WARN;
+            logAction = Logger::warn;
           } else if (logSuccessAtTrace) {
-            level = Level.TRACE;
-          } else {
-            level = Level.DEBUG;
+            logAction = Logger::trace;
           }
-
-          LOG.atLevel(level).log("{} {} failures:{} successes:{} since last 
time this was logged ",
+          logAction.log(LOG, "{} {} failures:{} successes:{} since last time 
this was logged ",
               logPrefix, k, counts.failures, counts.successes);
         }
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 52b398d98e..45f55169a6 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -106,7 +106,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
-import org.slf4j.event.Level;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSortedSet;
@@ -115,8 +114,8 @@ import com.google.common.collect.Sets;
 
 abstract class TabletGroupWatcher extends AccumuloDaemonThread {
 
-  private static final Logger TABLET_UNLOAD_LOGGER =
-      new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, 
Level.INFO);
+  private static final EscalatingLogger TABLET_UNLOAD_LOGGER =
+      new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, 
Logger::info);
   private final Manager manager;
   private final TabletStateStore store;
   private final TabletGroupWatcher dependentWatcher;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 76f5088dbb..1b2e5d6100 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -146,7 +146,7 @@ import io.opentelemetry.context.Scope;
  */
 public class Tablet extends TabletBase {
   private static final Logger log = LoggerFactory.getLogger(Tablet.class);
-  private static final Logger CLOSING_STUCK_LOGGER =
+  private static final DeduplicatingLogger CLOSING_STUCK_LOGGER =
       new DeduplicatingLogger(log, Duration.ofMinutes(5), 1000);
 
   private final TabletServer tabletServer;

Reply via email to