This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 11.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
commit 5ae9abb066953c46b2414b5f54f52e56b3f9bc3d Author: Mark Thomas <ma...@apache.org> AuthorDate: Fri Mar 7 13:27:58 2025 +0000 Refactor TimeBucketCounter to support alternative implementations This is preparatory work for PR #794 --- .../apache/catalina/util/TimeBucketCounter.java | 185 +++++++-------------- ...cketCounter.java => TimeBucketCounterBase.java} | 177 ++++++++++---------- 2 files changed, 145 insertions(+), 217 deletions(-) diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java b/java/org/apache/catalina/util/TimeBucketCounter.java index 3b4726f7ff..78951623cc 100644 --- a/java/org/apache/catalina/util/TimeBucketCounter.java +++ b/java/org/apache/catalina/util/TimeBucketCounter.java @@ -14,122 +14,91 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.catalina.util; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.juli.logging.Log; -import org.apache.juli.logging.LogFactory; -import org.apache.tomcat.util.res.StringManager; /** - * This class maintains a thread safe hash map that has timestamp-based buckets followed by a string for a key, and a - * counter for a value. each time the increment() method is called it adds the key if it does not exist, increments its - * value and returns it. a maintenance thread cleans up keys that are prefixed by previous timestamp buckets. + * A fast counter that optimizes efficiency at the expense of approximate bucket indexing. */ -public class TimeBucketCounter { - - private static final Log log = LogFactory.getLog(TimeBucketCounter.class); - private static final StringManager sm = StringManager.getManager(TimeBucketCounter.class); - - /** - * Map to hold the buckets - */ - private final ConcurrentHashMap<String,AtomicInteger> map = new ConcurrentHashMap<>(); +public class TimeBucketCounter extends TimeBucketCounterBase { - /** - * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 for 65_536ms which is about 1:05 minute - */ + // Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 for 65_536ms which is about 1:05 minute. private final int numBits; - /** - * Ratio of actual duration to config duration - */ + // Ratio of actual duration to config duration private final double ratio; - /** - * The future allowing control of the background processor. - */ - private ScheduledFuture<?> maintenanceFuture; - private ScheduledFuture<?> monitorFuture; - private final ScheduledExecutorService executorService; - private final long sleeptime; - /** - * Creates a new TimeBucketCounter with the specified lifetime. - * - * @param bucketDuration duration in seconds, e.g. for 1 minute pass 60 - * @param executorService the executor service which will be used to run the maintenance - */ public TimeBucketCounter(int bucketDuration, ScheduledExecutorService executorService) { + super(getActualDuration(bucketDuration), executorService); + this.numBits = determineShiftBitsOfDuration(bucketDuration); + this.ratio = ratioToPowerOf2(bucketDuration * 1000); + } - this.executorService = executorService; - - int durationMillis = bucketDuration * 1000; - - int bits = 0; - int pof2 = nextPowerOf2(durationMillis); - int bitCheck = pof2; - while (bitCheck > 1) { - bitCheck = pof2 >> ++bits; - } - - this.numBits = bits; - this.ratio = ratioToPowerOf2(durationMillis); + /** + * {@inheritDoc} + * <p> + * Calculates the current time bucket index by shifting bits for fast division, e.g. shift 16 bits is the same as + * dividing by 65,536 which is about 1:05m. + */ + @Override + public long getBucketIndex(long timestamp) { + return timestamp >> this.numBits; + } - int cleanupsPerBucketDuration = (durationMillis >= 60_000) ? 6 : 3; - sleeptime = durationMillis / cleanupsPerBucketDuration; - // Start our thread - if (sleeptime > 0) { - monitorFuture = executorService.scheduleWithFixedDelay(new MaintenanceMonitor(), 0, 60, TimeUnit.SECONDS); - } + public int getNumBits() { + return numBits; } + /** - * Increments the counter for the passed identifier in the current time bucket and returns the new value. - * - * @param identifier an identifier for which we want to maintain count, e.g. IP Address + * The actual duration may differ from the configured duration because it is set to the next power of 2 value in + * order to perform very fast bit shift arithmetic. * - * @return the count within the current time bucket + * @return the actual bucket duration in milliseconds */ - public final int increment(String identifier) { - String key = getCurrentBucketPrefix() + "-" + identifier; - AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger()); - return ai.incrementAndGet(); + public int getActualDuration() { + return (int) Math.pow(2, getNumBits()); } + /** - * Calculates the current time bucket prefix by shifting bits for fast division, e.g. shift 16 bits is the same as - * dividing by 65,536 which is about 1:05m. + * Determines the bits of shift for the specific bucket duration in seconds, which used to figure out the correct + * bucket index. + * + * @param duration bucket duration in seconds * - * @return The current bucket prefix. + * @return bits to be shifted */ - public final int getCurrentBucketPrefix() { - return (int) (System.currentTimeMillis() >> this.numBits); + protected static final int determineShiftBitsOfDuration(int duration) { + int bits = 0; + int pof2 = nextPowerOf2(duration * 1000); + int bitCheck = pof2; + while (bitCheck > 1) { + bitCheck = pof2 >> ++bits; + } + return bits; } - public int getNumBits() { - return numBits; - } /** * The actual duration may differ from the configured duration because it is set to the next power of 2 value in * order to perform very fast bit shift arithmetic. * - * @return the actual bucket duration in milliseconds + * @param duration in seconds + * + * @return the actual bucket duration in seconds + * + * @see FastTimeBucketCounter#determineShiftBitsOfDuration(int) */ - public int getActualDuration() { - return (int) Math.pow(2, getNumBits()); + private static int getActualDuration(int duration) { + return (int) (1L << determineShiftBitsOfDuration(duration)) / 1000; } + /** * Returns the ratio between the configured duration param and the actual duration which will be set to the next * power of 2. We then multiply the configured requests param by the same ratio in order to compensate for the added @@ -137,18 +106,25 @@ public class TimeBucketCounter { * * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the configured duration of 60_000 */ + @Override public double getRatio() { return ratio; } + /** * Returns the ratio to the next power of 2 so that we can adjust the value. + * + * @param value of target duration in seconds + * + * @return the ratio to the next power of 2 so that we can adjust the value */ static double ratioToPowerOf2(int value) { double nextPO2 = nextPowerOf2(value); return Math.round((1000 * nextPO2 / value)) / 1000d; } + /** * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, for 1000. */ @@ -161,59 +137,12 @@ public class TimeBucketCounter { return valueOfHighestBit << 1; } - /** - * When we want to test a full bucket duration we need to sleep until the next bucket starts. - * - * @return the number of milliseconds until the next bucket - */ + + @Override public long getMillisUntilNextBucket() { long millis = System.currentTimeMillis(); long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> numBits) << numBits; long delta = nextTimeBucketMillis - millis; return delta; } - - /** - * Sets isRunning to false to terminate the maintenance thread. - */ - public void destroy() { - // Stop our thread - if (monitorFuture != null) { - monitorFuture.cancel(true); - monitorFuture = null; - } - if (maintenanceFuture != null) { - maintenanceFuture.cancel(true); - maintenanceFuture = null; - } - } - - private class Maintenance implements Runnable { - @Override - public void run() { - String currentBucketPrefix = String.valueOf(getCurrentBucketPrefix()); - ConcurrentHashMap.KeySetView<String,AtomicInteger> keys = map.keySet(); - // remove obsolete keys - keys.removeIf(k -> !k.startsWith(currentBucketPrefix)); - } - } - - private class MaintenanceMonitor implements Runnable { - @Override - public void run() { - if (sleeptime > 0 && (maintenanceFuture == null || maintenanceFuture.isDone())) { - if (maintenanceFuture != null && maintenanceFuture.isDone()) { - // There was an error executing the scheduled task, get it and log it - try { - maintenanceFuture.get(); - } catch (InterruptedException | ExecutionException e) { - log.error(sm.getString("timebucket.maintenance.error"), e); - } - } - maintenanceFuture = executorService.scheduleWithFixedDelay(new Maintenance(), sleeptime, sleeptime, - TimeUnit.MILLISECONDS); - } - } - } - } diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java b/java/org/apache/catalina/util/TimeBucketCounterBase.java similarity index 54% copy from java/org/apache/catalina/util/TimeBucketCounter.java copy to java/org/apache/catalina/util/TimeBucketCounterBase.java index 3b4726f7ff..b2b0bbee09 100644 --- a/java/org/apache/catalina/util/TimeBucketCounter.java +++ b/java/org/apache/catalina/util/TimeBucketCounterBase.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.catalina.util; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -30,62 +30,42 @@ import org.apache.tomcat.util.res.StringManager; /** * This class maintains a thread safe hash map that has timestamp-based buckets followed by a string for a key, and a - * counter for a value. each time the increment() method is called it adds the key if it does not exist, increments its - * value and returns it. a maintenance thread cleans up keys that are prefixed by previous timestamp buckets. + * counter for an integer value. Each time the increment() method is called it adds the key if it does not exist, + * increments its value and returns it. */ -public class TimeBucketCounter { +public abstract class TimeBucketCounterBase { - private static final Log log = LogFactory.getLog(TimeBucketCounter.class); - private static final StringManager sm = StringManager.getManager(TimeBucketCounter.class); + private static final Log log = LogFactory.getLog(TimeBucketCounterBase.class); + private static final StringManager sm = StringManager.getManager(TimeBucketCounterBase.class); - /** - * Map to hold the buckets - */ - private final ConcurrentHashMap<String,AtomicInteger> map = new ConcurrentHashMap<>(); - - /** - * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 for 65_536ms which is about 1:05 minute - */ - private final int numBits; + private static final String BUCKET_KEY_DELIMITER = "-"; - /** - * Ratio of actual duration to config duration - */ - private final double ratio; + // Map to hold the buckets + private final ConcurrentHashMap<String,AtomicInteger> map = new ConcurrentHashMap<>(); - /** - * The future allowing control of the background processor. - */ + // The future allowing control of the background processor. private ScheduledFuture<?> maintenanceFuture; private ScheduledFuture<?> monitorFuture; private final ScheduledExecutorService executorService; private final long sleeptime; + private int bucketDuration; + /** * Creates a new TimeBucketCounter with the specified lifetime. * * @param bucketDuration duration in seconds, e.g. for 1 minute pass 60 - * @param executorService the executor service which will be used to run the maintenance + * @param executorService the executor service that will be used to run the maintenance task + * + * @throws NullPointerException if executorService is <code>null</code>. */ - public TimeBucketCounter(int bucketDuration, ScheduledExecutorService executorService) { - + public TimeBucketCounterBase(int bucketDuration, ScheduledExecutorService executorService) { + Objects.requireNonNull(executorService); this.executorService = executorService; + this.bucketDuration = bucketDuration; - int durationMillis = bucketDuration * 1000; - - int bits = 0; - int pof2 = nextPowerOf2(durationMillis); - int bitCheck = pof2; - while (bitCheck > 1) { - bitCheck = pof2 >> ++bits; - } - - this.numBits = bits; - - this.ratio = ratioToPowerOf2(durationMillis); - - int cleanupsPerBucketDuration = (durationMillis >= 60_000) ? 6 : 3; - sleeptime = durationMillis / cleanupsPerBucketDuration; + int cleanupsPerBucketDuration = (bucketDuration >= 60) ? 6 : 3; + sleeptime = bucketDuration * 1000 / cleanupsPerBucketDuration; // Start our thread if (sleeptime > 0) { @@ -93,6 +73,23 @@ public class TimeBucketCounter { } } + + /** + * @return bucketDuration in seconds + */ + public int getBucketDuration() { + return bucketDuration; + } + + + /** + * Returns the ratio between the configured duration param and the actual duration. + * + * @return the ratio between the configured duration param and the actual duration. + */ + public abstract double getRatio(); + + /** * Increments the counter for the passed identifier in the current time bucket and returns the new value. * @@ -101,83 +98,76 @@ public class TimeBucketCounter { * @return the count within the current time bucket */ public final int increment(String identifier) { - String key = getCurrentBucketPrefix() + "-" + identifier; + String key = genKey(identifier); AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger()); return ai.incrementAndGet(); } + /** - * Calculates the current time bucket prefix by shifting bits for fast division, e.g. shift 16 bits is the same as - * dividing by 65,536 which is about 1:05m. + * Generates the key of timeBucket counter maps with the specific identifier, and the timestamp is implicitly + * equivalent to "now". * - * @return The current bucket prefix. + * @param identifier an identifier for which we want to maintain count + * + * @return key of timeBucket counter maps */ - public final int getCurrentBucketPrefix() { - return (int) (System.currentTimeMillis() >> this.numBits); + protected final String genKey(String identifier) { + return genKey(identifier, System.currentTimeMillis()); } - public int getNumBits() { - return numBits; - } /** - * The actual duration may differ from the configured duration because it is set to the next power of 2 value in - * order to perform very fast bit shift arithmetic. + * Generates the key of timeBucket counter maps with the specific identifier and timestamp. + * + * @param identifier of target request + * @param timestamp when target request received * - * @return the actual bucket duration in milliseconds + * @return key of timeBucket counter maps */ - public int getActualDuration() { - return (int) Math.pow(2, getNumBits()); + protected final String genKey(String identifier, long timestamp) { + return getBucketIndex(timestamp) + BUCKET_KEY_DELIMITER + identifier; } + /** - * Returns the ratio between the configured duration param and the actual duration which will be set to the next - * power of 2. We then multiply the configured requests param by the same ratio in order to compensate for the added - * time, if any. + * Calculate the bucket index for the specific timestamp. + * + * @param timestamp the specific timestamp in milliseconds * - * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the configured duration of 60_000 + * @return prefix the bucket key prefix for the specific timestamp */ - public double getRatio() { - return ratio; - } + protected abstract long getBucketIndex(long timestamp); - /** - * Returns the ratio to the next power of 2 so that we can adjust the value. - */ - static double ratioToPowerOf2(int value) { - double nextPO2 = nextPowerOf2(value); - return Math.round((1000 * nextPO2 / value)) / 1000d; - } /** - * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, for 1000. + * Returns current bucket prefix + * + * @return bucket index */ - static int nextPowerOf2(int value) { - int valueOfHighestBit = Integer.highestOneBit(value); - if (valueOfHighestBit == value) { - return value; - } - - return valueOfHighestBit << 1; + public int getCurrentBucketPrefix() { + return (int) getBucketIndex(System.currentTimeMillis()); } + /** * When we want to test a full bucket duration we need to sleep until the next bucket starts. + * <p> + * <strong>WARNING:</strong> This method is used for test purpose. * * @return the number of milliseconds until the next bucket + * + * @deprecated Will be made package private in Tomcat 12 onwards. */ - public long getMillisUntilNextBucket() { - long millis = System.currentTimeMillis(); - long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> numBits) << numBits; - long delta = nextTimeBucketMillis - millis; - return delta; - } + @Deprecated + public abstract long getMillisUntilNextBucket(); + /** - * Sets isRunning to false to terminate the maintenance thread. + * Stops threads created by this object and cleans up resources. */ public void destroy() { - // Stop our thread + map.clear(); if (monitorFuture != null) { monitorFuture.cancel(true); monitorFuture = null; @@ -188,13 +178,23 @@ public class TimeBucketCounter { } } + + /** + * Periodic evict, perform removal of obsolete bucket items. Absence of this operation may result in OOM after a + * long run. + */ + public void periodicEvict() { + String currentBucketPrefix = String.valueOf(getCurrentBucketPrefix()); + ConcurrentHashMap.KeySetView<String,AtomicInteger> keys = map.keySet(); + // remove obsolete keys + keys.removeIf(k -> !k.startsWith(currentBucketPrefix)); + } + + private class Maintenance implements Runnable { @Override public void run() { - String currentBucketPrefix = String.valueOf(getCurrentBucketPrefix()); - ConcurrentHashMap.KeySetView<String,AtomicInteger> keys = map.keySet(); - // remove obsolete keys - keys.removeIf(k -> !k.startsWith(currentBucketPrefix)); + periodicEvict(); } } @@ -215,5 +215,4 @@ public class TimeBucketCounter { } } } - } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org