This is an automated email from the ASF dual-hosted git repository.

markt pushed a commit to branch 11.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git

commit 5ae9abb066953c46b2414b5f54f52e56b3f9bc3d
Author: Mark Thomas <ma...@apache.org>
AuthorDate: Fri Mar 7 13:27:58 2025 +0000

    Refactor TimeBucketCounter to support alternative implementations
    
    This is preparatory work for PR #794
---
 .../apache/catalina/util/TimeBucketCounter.java    | 185 +++++++--------------
 ...cketCounter.java => TimeBucketCounterBase.java} | 177 ++++++++++----------
 2 files changed, 145 insertions(+), 217 deletions(-)

diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java 
b/java/org/apache/catalina/util/TimeBucketCounter.java
index 3b4726f7ff..78951623cc 100644
--- a/java/org/apache/catalina/util/TimeBucketCounter.java
+++ b/java/org/apache/catalina/util/TimeBucketCounter.java
@@ -14,122 +14,91 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.catalina.util;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-import org.apache.tomcat.util.res.StringManager;
 
 /**
- * This class maintains a thread safe hash map that has timestamp-based 
buckets followed by a string for a key, and a
- * counter for a value. each time the increment() method is called it adds the 
key if it does not exist, increments its
- * value and returns it. a maintenance thread cleans up keys that are prefixed 
by previous timestamp buckets.
+ * A fast counter that optimizes efficiency at the expense of approximate 
bucket indexing.
  */
-public class TimeBucketCounter {
-
-    private static final Log log = LogFactory.getLog(TimeBucketCounter.class);
-    private static final StringManager sm = 
StringManager.getManager(TimeBucketCounter.class);
-
-    /**
-     * Map to hold the buckets
-     */
-    private final ConcurrentHashMap<String,AtomicInteger> map = new 
ConcurrentHashMap<>();
+public class TimeBucketCounter extends TimeBucketCounterBase {
 
-    /**
-     * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 
for 65_536ms which is about 1:05 minute
-     */
+    // Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 
for 65_536ms which is about 1:05 minute.
     private final int numBits;
 
-    /**
-     * Ratio of actual duration to config duration
-     */
+    // Ratio of actual duration to config duration
     private final double ratio;
 
-    /**
-     * The future allowing control of the background processor.
-     */
-    private ScheduledFuture<?> maintenanceFuture;
-    private ScheduledFuture<?> monitorFuture;
-    private final ScheduledExecutorService executorService;
-    private final long sleeptime;
 
-    /**
-     * Creates a new TimeBucketCounter with the specified lifetime.
-     *
-     * @param bucketDuration  duration in seconds, e.g. for 1 minute pass 60
-     * @param executorService the executor service which will be used to run 
the maintenance
-     */
     public TimeBucketCounter(int bucketDuration, ScheduledExecutorService 
executorService) {
+        super(getActualDuration(bucketDuration), executorService);
+        this.numBits = determineShiftBitsOfDuration(bucketDuration);
+        this.ratio = ratioToPowerOf2(bucketDuration * 1000);
+    }
 
-        this.executorService = executorService;
-
-        int durationMillis = bucketDuration * 1000;
-
-        int bits = 0;
-        int pof2 = nextPowerOf2(durationMillis);
-        int bitCheck = pof2;
-        while (bitCheck > 1) {
-            bitCheck = pof2 >> ++bits;
-        }
-
-        this.numBits = bits;
 
-        this.ratio = ratioToPowerOf2(durationMillis);
+    /**
+     * {@inheritDoc}
+     * <p>
+     * Calculates the current time bucket index by shifting bits for fast 
division, e.g. shift 16 bits is the same as
+     * dividing by 65,536 which is about 1:05m.
+     */
+    @Override
+    public long getBucketIndex(long timestamp) {
+        return timestamp >> this.numBits;
+    }
 
-        int cleanupsPerBucketDuration = (durationMillis >= 60_000) ? 6 : 3;
-        sleeptime = durationMillis / cleanupsPerBucketDuration;
 
-        // Start our thread
-        if (sleeptime > 0) {
-            monitorFuture = executorService.scheduleWithFixedDelay(new 
MaintenanceMonitor(), 0, 60, TimeUnit.SECONDS);
-        }
+    public int getNumBits() {
+        return numBits;
     }
 
+
     /**
-     * Increments the counter for the passed identifier in the current time 
bucket and returns the new value.
-     *
-     * @param identifier an identifier for which we want to maintain count, 
e.g. IP Address
+     * The actual duration may differ from the configured duration because it 
is set to the next power of 2 value in
+     * order to perform very fast bit shift arithmetic.
      *
-     * @return the count within the current time bucket
+     * @return the actual bucket duration in milliseconds
      */
-    public final int increment(String identifier) {
-        String key = getCurrentBucketPrefix() + "-" + identifier;
-        AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger());
-        return ai.incrementAndGet();
+    public int getActualDuration() {
+        return (int) Math.pow(2, getNumBits());
     }
 
+
     /**
-     * Calculates the current time bucket prefix by shifting bits for fast 
division, e.g. shift 16 bits is the same as
-     * dividing by 65,536 which is about 1:05m.
+     * Determines the bits of shift for the specific bucket duration in 
seconds, which used to figure out the correct
+     * bucket index.
+     *
+     * @param duration bucket duration in seconds
      *
-     * @return The current bucket prefix.
+     * @return bits to be shifted
      */
-    public final int getCurrentBucketPrefix() {
-        return (int) (System.currentTimeMillis() >> this.numBits);
+    protected static final int determineShiftBitsOfDuration(int duration) {
+        int bits = 0;
+        int pof2 = nextPowerOf2(duration * 1000);
+        int bitCheck = pof2;
+        while (bitCheck > 1) {
+            bitCheck = pof2 >> ++bits;
+        }
+        return bits;
     }
 
-    public int getNumBits() {
-        return numBits;
-    }
 
     /**
      * The actual duration may differ from the configured duration because it 
is set to the next power of 2 value in
      * order to perform very fast bit shift arithmetic.
      *
-     * @return the actual bucket duration in milliseconds
+     * @param duration in seconds
+     *
+     * @return the actual bucket duration in seconds
+     *
+     * @see FastTimeBucketCounter#determineShiftBitsOfDuration(int)
      */
-    public int getActualDuration() {
-        return (int) Math.pow(2, getNumBits());
+    private static int getActualDuration(int duration) {
+        return (int) (1L << determineShiftBitsOfDuration(duration)) / 1000;
     }
 
+
     /**
      * Returns the ratio between the configured duration param and the actual 
duration which will be set to the next
      * power of 2. We then multiply the configured requests param by the same 
ratio in order to compensate for the added
@@ -137,18 +106,25 @@ public class TimeBucketCounter {
      *
      * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the 
configured duration of 60_000
      */
+    @Override
     public double getRatio() {
         return ratio;
     }
 
+
     /**
      * Returns the ratio to the next power of 2 so that we can adjust the 
value.
+     *
+     * @param value of target duration in seconds
+     *
+     * @return the ratio to the next power of 2 so that we can adjust the value
      */
     static double ratioToPowerOf2(int value) {
         double nextPO2 = nextPowerOf2(value);
         return Math.round((1000 * nextPO2 / value)) / 1000d;
     }
 
+
     /**
      * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, 
for 1000.
      */
@@ -161,59 +137,12 @@ public class TimeBucketCounter {
         return valueOfHighestBit << 1;
     }
 
-    /**
-     * When we want to test a full bucket duration we need to sleep until the 
next bucket starts.
-     *
-     * @return the number of milliseconds until the next bucket
-     */
+
+    @Override
     public long getMillisUntilNextBucket() {
         long millis = System.currentTimeMillis();
         long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> 
numBits) << numBits;
         long delta = nextTimeBucketMillis - millis;
         return delta;
     }
-
-    /**
-     * Sets isRunning to false to terminate the maintenance thread.
-     */
-    public void destroy() {
-        // Stop our thread
-        if (monitorFuture != null) {
-            monitorFuture.cancel(true);
-            monitorFuture = null;
-        }
-        if (maintenanceFuture != null) {
-            maintenanceFuture.cancel(true);
-            maintenanceFuture = null;
-        }
-    }
-
-    private class Maintenance implements Runnable {
-        @Override
-        public void run() {
-            String currentBucketPrefix = 
String.valueOf(getCurrentBucketPrefix());
-            ConcurrentHashMap.KeySetView<String,AtomicInteger> keys = 
map.keySet();
-            // remove obsolete keys
-            keys.removeIf(k -> !k.startsWith(currentBucketPrefix));
-        }
-    }
-
-    private class MaintenanceMonitor implements Runnable {
-        @Override
-        public void run() {
-            if (sleeptime > 0 && (maintenanceFuture == null || 
maintenanceFuture.isDone())) {
-                if (maintenanceFuture != null && maintenanceFuture.isDone()) {
-                    // There was an error executing the scheduled task, get it 
and log it
-                    try {
-                        maintenanceFuture.get();
-                    } catch (InterruptedException | ExecutionException e) {
-                        
log.error(sm.getString("timebucket.maintenance.error"), e);
-                    }
-                }
-                maintenanceFuture = executorService.scheduleWithFixedDelay(new 
Maintenance(), sleeptime, sleeptime,
-                        TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
 }
diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java 
b/java/org/apache/catalina/util/TimeBucketCounterBase.java
similarity index 54%
copy from java/org/apache/catalina/util/TimeBucketCounter.java
copy to java/org/apache/catalina/util/TimeBucketCounterBase.java
index 3b4726f7ff..b2b0bbee09 100644
--- a/java/org/apache/catalina/util/TimeBucketCounter.java
+++ b/java/org/apache/catalina/util/TimeBucketCounterBase.java
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.catalina.util;
 
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -30,62 +30,42 @@ import org.apache.tomcat.util.res.StringManager;
 
 /**
  * This class maintains a thread safe hash map that has timestamp-based 
buckets followed by a string for a key, and a
- * counter for a value. each time the increment() method is called it adds the 
key if it does not exist, increments its
- * value and returns it. a maintenance thread cleans up keys that are prefixed 
by previous timestamp buckets.
+ * counter for an integer value. Each time the increment() method is called it 
adds the key if it does not exist,
+ * increments its value and returns it.
  */
-public class TimeBucketCounter {
+public abstract class TimeBucketCounterBase {
 
-    private static final Log log = LogFactory.getLog(TimeBucketCounter.class);
-    private static final StringManager sm = 
StringManager.getManager(TimeBucketCounter.class);
+    private static final Log log = 
LogFactory.getLog(TimeBucketCounterBase.class);
+    private static final StringManager sm = 
StringManager.getManager(TimeBucketCounterBase.class);
 
-    /**
-     * Map to hold the buckets
-     */
-    private final ConcurrentHashMap<String,AtomicInteger> map = new 
ConcurrentHashMap<>();
-
-    /**
-     * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 
for 65_536ms which is about 1:05 minute
-     */
-    private final int numBits;
+    private static final String BUCKET_KEY_DELIMITER = "-";
 
-    /**
-     * Ratio of actual duration to config duration
-     */
-    private final double ratio;
+    // Map to hold the buckets
+    private final ConcurrentHashMap<String,AtomicInteger> map = new 
ConcurrentHashMap<>();
 
-    /**
-     * The future allowing control of the background processor.
-     */
+    // The future allowing control of the background processor.
     private ScheduledFuture<?> maintenanceFuture;
     private ScheduledFuture<?> monitorFuture;
     private final ScheduledExecutorService executorService;
     private final long sleeptime;
+    private int bucketDuration;
+
 
     /**
      * Creates a new TimeBucketCounter with the specified lifetime.
      *
      * @param bucketDuration  duration in seconds, e.g. for 1 minute pass 60
-     * @param executorService the executor service which will be used to run 
the maintenance
+     * @param executorService the executor service that will be used to run 
the maintenance task
+     *
+     * @throws NullPointerException if executorService is <code>null</code>.
      */
-    public TimeBucketCounter(int bucketDuration, ScheduledExecutorService 
executorService) {
-
+    public TimeBucketCounterBase(int bucketDuration, ScheduledExecutorService 
executorService) {
+        Objects.requireNonNull(executorService);
         this.executorService = executorService;
+        this.bucketDuration = bucketDuration;
 
-        int durationMillis = bucketDuration * 1000;
-
-        int bits = 0;
-        int pof2 = nextPowerOf2(durationMillis);
-        int bitCheck = pof2;
-        while (bitCheck > 1) {
-            bitCheck = pof2 >> ++bits;
-        }
-
-        this.numBits = bits;
-
-        this.ratio = ratioToPowerOf2(durationMillis);
-
-        int cleanupsPerBucketDuration = (durationMillis >= 60_000) ? 6 : 3;
-        sleeptime = durationMillis / cleanupsPerBucketDuration;
+        int cleanupsPerBucketDuration = (bucketDuration >= 60) ? 6 : 3;
+        sleeptime = bucketDuration * 1000 / cleanupsPerBucketDuration;
 
         // Start our thread
         if (sleeptime > 0) {
@@ -93,6 +73,23 @@ public class TimeBucketCounter {
         }
     }
 
+
+    /**
+     * @return bucketDuration in seconds
+     */
+    public int getBucketDuration() {
+        return bucketDuration;
+    }
+
+
+    /**
+     * Returns the ratio between the configured duration param and the actual 
duration.
+     *
+     * @return the ratio between the configured duration param and the actual 
duration.
+     */
+    public abstract double getRatio();
+
+
     /**
      * Increments the counter for the passed identifier in the current time 
bucket and returns the new value.
      *
@@ -101,83 +98,76 @@ public class TimeBucketCounter {
      * @return the count within the current time bucket
      */
     public final int increment(String identifier) {
-        String key = getCurrentBucketPrefix() + "-" + identifier;
+        String key = genKey(identifier);
         AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger());
         return ai.incrementAndGet();
     }
 
+
     /**
-     * Calculates the current time bucket prefix by shifting bits for fast 
division, e.g. shift 16 bits is the same as
-     * dividing by 65,536 which is about 1:05m.
+     * Generates the key of timeBucket counter maps with the specific 
identifier, and the timestamp is implicitly
+     * equivalent to "now".
      *
-     * @return The current bucket prefix.
+     * @param identifier an identifier for which we want to maintain count
+     *
+     * @return key of timeBucket counter maps
      */
-    public final int getCurrentBucketPrefix() {
-        return (int) (System.currentTimeMillis() >> this.numBits);
+    protected final String genKey(String identifier) {
+        return genKey(identifier, System.currentTimeMillis());
     }
 
-    public int getNumBits() {
-        return numBits;
-    }
 
     /**
-     * The actual duration may differ from the configured duration because it 
is set to the next power of 2 value in
-     * order to perform very fast bit shift arithmetic.
+     * Generates the key of timeBucket counter maps with the specific 
identifier and timestamp.
+     *
+     * @param identifier of target request
+     * @param timestamp  when target request received
      *
-     * @return the actual bucket duration in milliseconds
+     * @return key of timeBucket counter maps
      */
-    public int getActualDuration() {
-        return (int) Math.pow(2, getNumBits());
+    protected final String genKey(String identifier, long timestamp) {
+        return getBucketIndex(timestamp) + BUCKET_KEY_DELIMITER + identifier;
     }
 
+
     /**
-     * Returns the ratio between the configured duration param and the actual 
duration which will be set to the next
-     * power of 2. We then multiply the configured requests param by the same 
ratio in order to compensate for the added
-     * time, if any.
+     * Calculate the bucket index for the specific timestamp.
+     *
+     * @param timestamp the specific timestamp in milliseconds
      *
-     * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the 
configured duration of 60_000
+     * @return prefix the bucket key prefix for the specific timestamp
      */
-    public double getRatio() {
-        return ratio;
-    }
+    protected abstract long getBucketIndex(long timestamp);
 
-    /**
-     * Returns the ratio to the next power of 2 so that we can adjust the 
value.
-     */
-    static double ratioToPowerOf2(int value) {
-        double nextPO2 = nextPowerOf2(value);
-        return Math.round((1000 * nextPO2 / value)) / 1000d;
-    }
 
     /**
-     * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, 
for 1000.
+     * Returns current bucket prefix
+     *
+     * @return bucket index
      */
-    static int nextPowerOf2(int value) {
-        int valueOfHighestBit = Integer.highestOneBit(value);
-        if (valueOfHighestBit == value) {
-            return value;
-        }
-
-        return valueOfHighestBit << 1;
+    public int getCurrentBucketPrefix() {
+        return (int) getBucketIndex(System.currentTimeMillis());
     }
 
+
     /**
      * When we want to test a full bucket duration we need to sleep until the 
next bucket starts.
+     * <p>
+     * <strong>WARNING:</strong> This method is used for test purpose.
      *
      * @return the number of milliseconds until the next bucket
+     *
+     * @deprecated Will be made package private in Tomcat 12 onwards.
      */
-    public long getMillisUntilNextBucket() {
-        long millis = System.currentTimeMillis();
-        long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> 
numBits) << numBits;
-        long delta = nextTimeBucketMillis - millis;
-        return delta;
-    }
+    @Deprecated
+    public abstract long getMillisUntilNextBucket();
+
 
     /**
-     * Sets isRunning to false to terminate the maintenance thread.
+     * Stops threads created by this object and cleans up resources.
      */
     public void destroy() {
-        // Stop our thread
+        map.clear();
         if (monitorFuture != null) {
             monitorFuture.cancel(true);
             monitorFuture = null;
@@ -188,13 +178,23 @@ public class TimeBucketCounter {
         }
     }
 
+
+    /**
+     * Periodic evict, perform removal of obsolete bucket items. Absence of 
this operation may result in OOM after a
+     * long run.
+     */
+    public void periodicEvict() {
+        String currentBucketPrefix = String.valueOf(getCurrentBucketPrefix());
+        ConcurrentHashMap.KeySetView<String,AtomicInteger> keys = map.keySet();
+        // remove obsolete keys
+        keys.removeIf(k -> !k.startsWith(currentBucketPrefix));
+    }
+
+
     private class Maintenance implements Runnable {
         @Override
         public void run() {
-            String currentBucketPrefix = 
String.valueOf(getCurrentBucketPrefix());
-            ConcurrentHashMap.KeySetView<String,AtomicInteger> keys = 
map.keySet();
-            // remove obsolete keys
-            keys.removeIf(k -> !k.startsWith(currentBucketPrefix));
+            periodicEvict();
         }
     }
 
@@ -215,5 +215,4 @@ public class TimeBucketCounter {
             }
         }
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to