This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push: new 1dd8108a9 Add ThrottledInputStream.Builder.setMaxBytes(int, ChronoUnit) new 2224df7b9 Merge branch 'master' of https://gitbox.apache.org/repos/asf/commons-io.git 1dd8108a9 is described below commit 1dd8108a9bf22a95613e56fdca24abdebf842ce2 Author: Gary Gregory <garydgreg...@gmail.com> AuthorDate: Tue Nov 26 16:48:15 2024 -0500 Add ThrottledInputStream.Builder.setMaxBytes(int, ChronoUnit) - Update Javadoc example --- src/changes/changes.xml | 2 +- .../commons/io/input/ThrottledInputStream.java | 108 ++++++++++++++++---- .../commons/io/input/ThrottledInputStreamTest.java | 113 ++++++++++++++++++--- 3 files changed, 189 insertions(+), 34 deletions(-) diff --git a/src/changes/changes.xml b/src/changes/changes.xml index d81185eb7..4fe84f2a0 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -49,6 +49,7 @@ The <action> type attribute can be add,update,fix,remove. <release version="2.18.1" date="YYYY-MM-DD" description="Version 2.18.1: Java 8 is required."> <!-- FIX --> <!-- ADD --> + <action dev="ggregory" type="add" issue="IO-860" due-to="Nico Strecker, Gary Gregory">Add ThrottledInputStream.Builder.setMaxBytes(int, ChronoUnit).</action> <!-- UPDATE --> </release> <release version="2.18.0" date="2024-11-16" description="Version 2.18.0: Java 8 is required."> @@ -63,7 +64,6 @@ The <action> type attribute can be add,update,fix,remove. <action dev="ggregory" type="fix" due-to="Éamonn McManus">Use Unicode escapes for superscript characters. #701.</action> <action dev="ggregory" type="fix" issue="IO-863" due-to="Éamonn McManus, Gary Gregory">Recent incompatible change to FileUtils.listFiles re extensions, see also IO-856.</action> <action dev="ggregory" type="fix" issue="IO-857" due-to="Dmitry, Gary Gregory">Javadoc: Update details for PathUtils "clean" behavior.</action> - <!-- ADD --> <action dev="ggregory" type="add" due-to="Gary Gregory">Add @FunctionalInterface to ClassNameMatcher.</action> <action dev="ggregory" type="add" due-to="Gary Gregory">Add ValidatingObjectInputStream.Builder and ValidatingObjectInputStream.builder().</action> diff --git a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java index 1fcc78183..575fa99e6 100644 --- a/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java +++ b/src/main/java/org/apache/commons/io/input/ThrottledInputStream.java @@ -22,15 +22,15 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.Objects; import java.util.concurrent.TimeUnit; /** - * Provides bandwidth throttling on a specified InputStream. It is implemented as a wrapper on top of another InputStream instance. The throttling works by - * examining the number of bytes read from the underlying InputStream from the beginning, and sleep()ing for a time interval if the byte-transfer is found - * exceed the specified tolerable maximum. (Thus, while the read-rate might exceed the maximum for a short interval, the average tends towards the - * specified maximum, overall.) + * Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream, + * and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a + * short interval, the average tends towards the specified maximum, overall. * <p> - * To build an instance, see {@link Builder} + * To build an instance, call {@link #builder()}. * </p> * <p> * Inspired by Apache HBase's class of the same name. @@ -49,7 +49,7 @@ public final class ThrottledInputStream extends CountingInputStream { * <pre>{@code * ThrottledInputStream in = ThrottledInputStream.builder() * .setPath(Paths.get("MyFile.xml")) - * .setMaxBytesPerSecond(100_000) + * .setMaxBytes(100_000, ChronoUnit.SECONDS) * .get(); * } * </pre> @@ -57,14 +57,14 @@ public final class ThrottledInputStream extends CountingInputStream { * <pre>{@code * ThrottledInputStream in = ThrottledInputStream.builder() * .setFile(new File("MyFile.xml")) - * .setMaxBytesPerSecond(100_000) + * .setMaxBytes(100_000, ChronoUnit.SECONDS) * .get(); * } * </pre> * <pre>{@code * ThrottledInputStream in = ThrottledInputStream.builder() * .setInputStream(inputStream) - * .setMaxBytesPerSecond(100_000) + * .setMaxBytes(100_000, ChronoUnit.SECONDS) * .get(); * } * </pre> @@ -77,7 +77,7 @@ public final class ThrottledInputStream extends CountingInputStream { /** * Effectively not throttled. */ - private long maxBytesPerSecond = Long.MAX_VALUE; + private double maxBytesPerSecond = Double.MAX_VALUE; /** * Builds a new {@link ThrottledInputStream}. @@ -98,19 +98,87 @@ public final class ThrottledInputStream extends CountingInputStream { * @throws IOException if an I/O error occurs. * @see #getInputStream() */ - @SuppressWarnings("resource") @Override public ThrottledInputStream get() throws IOException { return new ThrottledInputStream(this); } + // package private for testing. + double getMaxBytesPerSecond() { + return maxBytesPerSecond; + } + + /** + * Sets the maximum bytes per time period unit. + * <p> + * For example, to throttle reading to 100K per second, use: + * </p> + * <pre> + * builder.setMaxBytes(100_000, ChronoUnit.SECONDS) + * </pre> + * <p> + * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on. + * </p> + * + * @param value the maximum bytes + * @param chronoUnit a duration scale goal. + * @return this instance. + * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. + * @since 2.19.0 + */ + public Builder setMaxBytes(final int value, final ChronoUnit chronoUnit) { + setMaxBytes(value, chronoUnit.getDuration()); + return asThis(); + } + + /** + * Sets the maximum bytes per duration. + * <p> + * For example, to throttle reading to 100K per second, use: + * </p> + * <pre> + * builder.setMaxBytes(100_000, Duration.ofSeconds(1)) + * </pre> + * <p> + * To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on. + * </p> + * + * @param value the maximum bytes + * @param duration a duration goal. + * @return this instance. + * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. + */ + // Consider making public in the future + Builder setMaxBytes(final int value, final Duration duration) { + setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value); + return asThis(); + } + /** * Sets the maximum bytes per second. * * @param maxBytesPerSecond the maximum bytes per second. + * @return this instance. + * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. */ - public void setMaxBytesPerSecond(final long maxBytesPerSecond) { + private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) { + if (maxBytesPerSecond <= 0) { + throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0."); + } this.maxBytesPerSecond = maxBytesPerSecond; + return asThis(); + } + + /** + * Sets the maximum bytes per second. + * + * @param maxBytesPerSecond the maximum bytes per second. + * @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0. + */ + public void setMaxBytesPerSecond(final long maxBytesPerSecond) { + setMaxBytesPerSecond((double) maxBytesPerSecond); + // TODO 3.0 + // return asThis(); } } @@ -124,24 +192,22 @@ public final class ThrottledInputStream extends CountingInputStream { return new Builder(); } - static long toSleepMillis(final long bytesRead, final long maxBytesPerSec, final long elapsedMillis) { - if (elapsedMillis < 0) { - throw new IllegalArgumentException("The elapsed time should be greater or equal to zero"); - } + // package private for testing + static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) { if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) { return 0; } // We use this class to load the single source file, so the bytesRead // and maxBytesPerSec aren't greater than Double.MAX_VALUE. // We can get the precise sleep time by using the double value. - final long millis = (long) ((double) bytesRead / (double) maxBytesPerSec * 1000 - elapsedMillis); + final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis); if (millis <= 0) { return 0; } return millis; } - private final long maxBytesPerSecond; + private final double maxBytesPerSecond; private final long startTime = System.currentTimeMillis(); private Duration totalSleepDuration = Duration.ZERO; @@ -171,8 +237,13 @@ public final class ThrottledInputStream extends CountingInputStream { return getByteCount() / elapsedSeconds; } + // package private for testing. + double getMaxBytesPerSecond() { + return maxBytesPerSecond; + } + private long getSleepMillis() { - return toSleepMillis(getByteCount(), maxBytesPerSecond, System.currentTimeMillis() - startTime); + return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond); } /** @@ -180,6 +251,7 @@ public final class ThrottledInputStream extends CountingInputStream { * * @return Duration spent in sleep. */ + // package private for testing Duration getTotalSleepDuration() { return totalSleepDuration; } diff --git a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java index a072329e3..f8aa00c8f 100644 --- a/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java +++ b/src/test/java/org/apache/commons/io/input/ThrottledInputStreamTest.java @@ -18,15 +18,18 @@ package org.apache.commons.io.input; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrowsExactly; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.io.InputStream; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.ThrottledInputStream.Builder; import org.apache.commons.io.test.CustomIOException; import org.junit.jupiter.api.Test; @@ -56,13 +59,90 @@ public class ThrottledInputStreamTest extends ProxyInputStreamTest<ThrottledInpu // Throwing final String message = "test exception message"; // @formatter:off - try (InputStream bounded = ThrottledInputStream.builder() + try (InputStream inputStream = ThrottledInputStream.builder() .setCharSequence("Hi") .setAfterRead(i -> { throw new CustomIOException(message); }) .get()) { - assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(bounded)).getMessage()); + assertEquals(message, assertThrowsExactly(CustomIOException.class, () -> IOUtils.consume(inputStream)).getMessage()); + } + // @formatter:on + } + + @Test + public void testBuilder() throws IOException { + final Builder builder = ThrottledInputStream.builder(); + assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytesPerSecond(-1)); + assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytesPerSecond(0)); + assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytes(1, Duration.ZERO.minusMillis(1))); + assertThrows(IllegalArgumentException.class, () -> builder.setMaxBytes(1, Duration.ZERO)); + assertThrows(NullPointerException.class, () -> builder.setMaxBytes(1, (Duration) null)); + assertThrows(NullPointerException.class, () -> builder.setMaxBytes(1, (ChronoUnit) null)); + // + // 2 bytes per second + builder.setMaxBytesPerSecond(2); + assertEquals(2.0, builder.getMaxBytesPerSecond()); + // @formatter:off + try (ThrottledInputStream inputStream = builder + .setInputStream(createOriginInputStream()) + .get()) { + assertEquals(2.0, builder.getMaxBytesPerSecond()); + assertEquals(2.0, inputStream.getMaxBytesPerSecond()); + } + try (ThrottledInputStream inputStream = builder + .setInputStream(createOriginInputStream()) + .setMaxBytes(2, ChronoUnit.SECONDS) + .get()) { + assertEquals(2.0, builder.getMaxBytesPerSecond()); + assertEquals(2.0, inputStream.getMaxBytesPerSecond()); + } + // @formatter:on + Duration maxBytesPer = Duration.ofSeconds(1); + // @formatter:off + try (ThrottledInputStream inputStream = builder + .setInputStream(createOriginInputStream()) + .setMaxBytes(2, maxBytesPer) + .get()) { + assertEquals(2.0, builder.getMaxBytesPerSecond()); + assertEquals(2.0, inputStream.getMaxBytesPerSecond()); + } + // + // 1 bytes per 1/2 second (30_000 millis) + // @formatter:on + maxBytesPer = maxBytesPer.dividedBy(2); + // @formatter:off + try (ThrottledInputStream inputStream = builder + .setInputStream(createOriginInputStream()) + .setMaxBytes(1, maxBytesPer) + .get()) { + assertEquals(0.5, inputStream.getMaxBytesPerSecond()); + } + // 1 byte/millis + try (ThrottledInputStream inputStream = builder + .setInputStream(createOriginInputStream()) + .setMaxBytes(1, ChronoUnit.MILLIS) + .get()) { + assertEquals(0.001, inputStream.getMaxBytesPerSecond()); + } + // @formatter:on + // 1 byte per 10_0011 millis. + maxBytesPer = Duration.ofSeconds(20).plusMillis(11); + // @formatter:off + try (ThrottledInputStream inputStream = builder + .setInputStream(createOriginInputStream()) + .setMaxBytes(1, maxBytesPer) + .get()) { + assertEquals(20.011, inputStream.getMaxBytesPerSecond()); + } + // @formatter:on + // Javadoc example + // @formatter:off + try (ThrottledInputStream inputStream = builder + .setInputStream(createOriginInputStream()) + .setMaxBytes(100_000, ChronoUnit.SECONDS) + .get()) { + assertEquals(100_000.0, inputStream.getMaxBytesPerSecond()); } // @formatter:on } @@ -70,21 +150,24 @@ public class ThrottledInputStreamTest extends ProxyInputStreamTest<ThrottledInpu @Test public void testCalSleepTimeMs() { // case 0: initial - no read, no sleep - assertEquals(0, ThrottledInputStream.toSleepMillis(0, 10_000, 1_000)); - + assertEquals(0, ThrottledInputStream.toSleepMillis(0, 1_000, 10_000)); // case 1: no threshold - assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, 0, 1_000)); - assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, -1, 1_000)); - + assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, 1_000, 0)); + assertEquals(0, ThrottledInputStream.toSleepMillis(Long.MAX_VALUE, 1_000, -1)); // case 2: too fast - assertEquals(1500, ThrottledInputStream.toSleepMillis(5, 2, 1_000)); - assertEquals(500, ThrottledInputStream.toSleepMillis(5, 2, 2_000)); - assertEquals(6500, ThrottledInputStream.toSleepMillis(15, 2, 1_000)); - - // case 3: too slow - assertEquals(0, ThrottledInputStream.toSleepMillis(1, 2, 1_000)); - assertEquals(0, ThrottledInputStream.toSleepMillis(2, 2, 2_000)); - assertEquals(0, ThrottledInputStream.toSleepMillis(1, 2, 1_000)); + assertEquals(1500, ThrottledInputStream.toSleepMillis(5, 1_000, 2)); + assertEquals(500, ThrottledInputStream.toSleepMillis(5, 2_000, 2)); + assertEquals(6500, ThrottledInputStream.toSleepMillis(15, 1_000, 2)); + assertEquals(4000, ThrottledInputStream.toSleepMillis(5, 1_000, 1)); + assertEquals(9000, ThrottledInputStream.toSleepMillis(5, 1_000, 0.5)); + assertEquals(99000, ThrottledInputStream.toSleepMillis(5, 1_000, 0.05)); + // case 3: too slow, no sleep needed + assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 2)); + assertEquals(0, ThrottledInputStream.toSleepMillis(2, 2_000, 2)); + assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 2)); + assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 2.0)); + assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 1)); + assertEquals(0, ThrottledInputStream.toSleepMillis(1, 1_000, 1.0)); } @Test