This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
commit 4ea91d20dfa730bf2ce2120f7bdfbe0384d0c29a Author: Gary Gregory <gardgreg...@gmail.com> AuthorDate: Mon Sep 19 17:28:40 2022 -0400 Add IOBaseStream & IOStream --- src/changes/changes.xml | 2 +- .../apache/commons/io/function/IOBaseStream.java | 154 ++++++ .../commons/io/function/IOBaseStreamAdapter.java | 52 ++ .../org/apache/commons/io/function/IOStream.java | 597 +++++++++++++++++++++ .../commons/io/function/IOStreamAdapter.java | 45 ++ .../org/apache/commons/io/function/IOStreams.java | 62 +-- .../commons/io/function/UncheckedIOBaseStream.java | 88 +++ .../commons/io/input/ObservableInputStream.java | 3 +- .../org/apache/commons/io/function/EraseTest.java | 97 ++++ .../commons/io/function/IOBaseStreamTest.java | 349 ++++++++++++ .../apache/commons/io/function/IOIntStream.java | 27 + .../commons/io/function/IOIntStreamAdapter.java | 41 ++ .../apache/commons/io/function/IOStreamTest.java | 540 +++++++++++++++++++ .../apache/commons/io/function/IOSupplierTest.java | 4 +- .../apache/commons/io/function/PathBaseStream.java | 28 + .../org/apache/commons/io/function/PathStream.java | 28 + .../apache/commons/io/function/TestConstants.java | 12 +- .../org/apache/commons/io/function/TestUtils.java | 47 +- .../apache/commons/io/function/UncheckTest.java | 38 +- 19 files changed, 2133 insertions(+), 81 deletions(-) diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 7e2488b3..f10ca8b9 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -428,7 +428,7 @@ The <action> type attribute can be add,update,fix,remove. Add PathUtils.getLastModifiedFileTime(*). </action> <action dev="ggregory" type="add" due-to="Gary Gregory"> - Add IOBiFunction, IOTriFunction, IOQuadFunction, IOPredicate, IOIterator, IOSpliterator, FilesUncheck. + Add IOBiFunction, IOTriFunction, IOQuadFunction, IOPredicate, IOIterator, IOSpliterator, IOBaseStream, IOStream, FilesUncheck. </action> <action dev="ggregory" type="add" due-to="Gary Gregory"> Add IOUtils.consume(Reader). diff --git a/src/main/java/org/apache/commons/io/function/IOBaseStream.java b/src/main/java/org/apache/commons/io/function/IOBaseStream.java new file mode 100644 index 00000000..6ac9bee2 --- /dev/null +++ b/src/main/java/org/apache/commons/io/function/IOBaseStream.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.stream.BaseStream; +import java.util.stream.Stream; + +/** + * Like {@link BaseStream} but throws {@link IOException}. + * + * @param <T> the type of the stream elements. + * @param <S> the type of the IO stream extending {@code IOBaseStream}. + * @param <B> the type of the stream extending {@code BaseStream}. + * @since 2.12.0 + */ +public interface IOBaseStream<T, S extends IOBaseStream<T, S, B>, B extends BaseStream<T, B>> extends Closeable { + + /** + * Creates a {@link BaseStream} for this instance that throws {@link UncheckedIOException} instead of + * {@link IOException}. + * + * @return an {@link UncheckedIOException} {@link BaseStream}. + */ + @SuppressWarnings("unchecked") + default BaseStream<T, B> asBaseStream() { + return new UncheckedIOBaseStream<>((S) this); + } + + /** + * Like {@link BaseStream#close()}. + * + * @see BaseStream#close() + */ + @Override + default void close() { + unwrap().close(); + } + + /** + * Like {@link BaseStream#isParallel()}. + * + * @return See {@link BaseStream#isParallel() delegate}. + * @see BaseStream#isParallel() + */ + @SuppressWarnings("resource") // for unwrap() + default boolean isParallel() { + return unwrap().isParallel(); + } + + /** + * Like {@link BaseStream#iterator()}. + * + * @return See {@link BaseStream#iterator() delegate}. + * @see BaseStream#iterator() + */ + @SuppressWarnings("resource") // for unwrap() + default IOIterator<T> iterator() { + return IOIteratorAdapter.adapt(unwrap().iterator()); + } + + /** + * Like {@link BaseStream#onClose(Runnable)}. + * + * @param closeHandler See {@link BaseStream#onClose(Runnable) delegate}. + * @return See {@link BaseStream#onClose(Runnable) delegate}. + * @throws IOException if an I/O error occurs. + * @see BaseStream#onClose(Runnable) + */ + @SuppressWarnings({"unused", "resource"}) // throws IOException, unwrap() + default S onClose(final IORunnable closeHandler) throws IOException { + return wrap(unwrap().onClose(() -> Erase.run(closeHandler))); + } + + /** + * Like {@link BaseStream#parallel()}. + * + * @return See {@link BaseStream#parallel() delegate}. + * @see BaseStream#parallel() + */ + @SuppressWarnings({"resource", "unchecked"}) // for unwrap(), this + default S parallel() { + return isParallel() ? (S) this : wrap(unwrap().parallel()); + } + + /** + * Like {@link BaseStream#sequential()}. + * + * @return See {@link BaseStream#sequential() delegate}. + * @see BaseStream#sequential() + */ + @SuppressWarnings({"resource", "unchecked"}) // for unwrap(), this + default S sequential() { + return isParallel() ? wrap(unwrap().sequential()) : (S) this; + } + + /** + * Like {@link BaseStream#spliterator()}. + * + * @return See {@link BaseStream#spliterator() delegate}. + * @see BaseStream#spliterator() + */ + @SuppressWarnings("resource") // for unwrap() + default IOSpliterator<T> spliterator() { + return IOSpliteratorAdapter.adapt(unwrap().spliterator()); + } + + /** + * Like {@link BaseStream#unordered()}. + * + * @return See {@link BaseStream#unordered() delegate}. + * @see java.util.stream.BaseStream#unordered() + */ + @SuppressWarnings("resource") // for unwrap() + default S unordered() { + return wrap(unwrap().unordered()); + } + + /** + * Unwraps this instance and returns the underlying {@link Stream}. + * <p> + * Implementations may not have anything to unwrap and that behavior is undefined for now. + * </p> + * + * @return the underlying stream. + */ + B unwrap(); + + /** + * Wraps a {@link Stream}. + * + * @param delegate The delegate. + * @return An IO stream. + */ + S wrap(B delegate); + +} diff --git a/src/main/java/org/apache/commons/io/function/IOBaseStreamAdapter.java b/src/main/java/org/apache/commons/io/function/IOBaseStreamAdapter.java new file mode 100644 index 00000000..c1413dad --- /dev/null +++ b/src/main/java/org/apache/commons/io/function/IOBaseStreamAdapter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.util.Objects; +import java.util.stream.BaseStream; + +/** + * Abstracts an {@link IOBaseStream} implementation. + * + * Keep package-private for now. + * + * @param <T> the type of the stream elements. + * @param <S> the type of the stream extending {@code IOBaseStream}. + */ +abstract class IOBaseStreamAdapter<T, S extends IOBaseStream<T, S, B>, B extends BaseStream<T, B>> implements IOBaseStream<T, S, B> { + + /** + * The underlying base stream. + */ + private final B delegate; + + /** + * Constructs an instance. + * + * @param delegate the delegate. + */ + IOBaseStreamAdapter(final B delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + } + + @Override + public B unwrap() { + return delegate; + } + +} diff --git a/src/main/java/org/apache/commons/io/function/IOStream.java b/src/main/java/org/apache/commons/io/function/IOStream.java new file mode 100644 index 00000000..116a7a42 --- /dev/null +++ b/src/main/java/org/apache/commons/io/function/IOStream.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.IntFunction; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.function.UnaryOperator; +import java.util.stream.Collector; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.commons.io.IOExceptionList; + +/** + * Like {@link Stream} but throws {@link IOException}. + * + * @param <T> the type of the stream elements. + * @since 2.12.0 + */ +public interface IOStream<T> extends IOBaseStream<T, IOStream<T>, Stream<T>> { + + /** + * Constructs a new IOStream for the given Stream. + * + * @param <T> the type of the stream elements. + * @param stream The stream to delegate. + * @return a new IOStream. + */ + static <T> IOStream<T> adapt(final Stream<T> stream) { + return IOStreamAdapter.adapt(stream); + } + + /** + * This class' version of {@link Stream#empty()}. + * + * @param <T> the type of the stream elements + * @return an empty sequential {@code IOStreamImpl}. + * @see Stream#empty() + */ + static <T> IOStream<T> empty() { + return IOStreamAdapter.adapt(Stream.empty()); + } + + /** + * Performs an action for each element gathering any exceptions. + * + * @param action The action to apply to each element. + * @throws IOExceptionList if any I/O errors occur. + */ + default void forAll(final IOConsumer<T> action) throws IOExceptionList { + forAll(action, (i, e) -> e); + } + + /** + * Performs an action for each element gathering any exceptions. + * + * @param action The action to apply to each element. + * @param exSupplier The exception supplier. + * @throws IOExceptionList if any I/O errors occur. + */ + default void forAll(final IOConsumer<T> action, final BiFunction<Integer, IOException, IOException> exSupplier) throws IOExceptionList { + final AtomicReference<List<IOException>> causeList = new AtomicReference<>(); + final AtomicInteger index = new AtomicInteger(); + final IOConsumer<T> safeAction = IOStreams.toIOConsumer(action); + unwrap().forEach(e -> { + try { + safeAction.accept(e); + } catch (final IOException innerEx) { + if (causeList.get() == null) { + // Only allocate if required + causeList.set(new ArrayList<>()); + } + if (exSupplier != null) { + causeList.get().add(exSupplier.apply(index.get(), innerEx)); + } + } + index.incrementAndGet(); + }); + IOExceptionList.checkEmpty(causeList.get(), null); + } + + /** + * Like {@link Stream#iterate(Object, UnaryOperator)} but for IO. + * + * @param <T> the type of stream elements. + * @param seed the initial element. + * @param f a function to be applied to the previous element to produce a new element. + * @return a new sequential {@code IOStream}. + */ + static <T> IOStream<T> iterate(final T seed, final IOUnaryOperator<T> f) { + Objects.requireNonNull(f); + final Iterator<T> iterator = new Iterator<T>() { + @SuppressWarnings("unchecked") + T t = (T) IOStreams.NONE; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public T next() { + return t = t == IOStreams.NONE ? seed : Erase.apply(f, t); + } + }; + return adapt(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED | Spliterator.IMMUTABLE), false)); + } + + /** + * Null-safe version of {@link StreamSupport#stream(java.util.Spliterator, boolean)}. + * + * Copied from Apache Commons Lang. + * + * @param <T> the type of stream elements. + * @param values the elements of the new stream, may be {@code null}. + * @return the new stream on {@code values} or {@link Stream#empty()}. + */ + @SuppressWarnings("resource") // call to #empty() + static <T> IOStream<T> of(final Iterable<T> values) { + return values == null ? empty() : adapt(StreamSupport.stream(values.spliterator(), false)); + } + + /** + * Null-safe version of {@link Stream#of(Object[])} for an IO stream. + * + * @param <T> the type of stream elements. + * @param values the elements of the new stream, may be {@code null}. + * @return the new stream on {@code values} or {@link Stream#empty()}. + */ + @SuppressWarnings("resource") + @SafeVarargs // Creating a stream from an array is safe + static <T> IOStream<T> of(final T... values) { + return values == null || values.length == 0 ? empty() : adapt(Arrays.stream(values)); + } + + /** + * Returns a sequential {@code IOStreamImpl} containing a single element. + * + * @param t the single element + * @param <T> the type of stream elements + * @return a singleton sequential stream + */ + static <T> IOStream<T> of(final T t) { + return adapt(Stream.of(t)); + } + + /** + * Like {@link Stream#allMatch(java.util.function.Predicate)} but throws {@link IOException}. + * + * @param predicate {@link Stream#allMatch(java.util.function.Predicate)}. + * @return Like {@link Stream#allMatch(java.util.function.Predicate)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default boolean allMatch(final IOPredicate<? super T> predicate) throws IOException { + return unwrap().allMatch(t -> Erase.test(predicate, t)); + } + + /** + * Like {@link Stream#anyMatch(java.util.function.Predicate)} but throws {@link IOException}. + * + * @param predicate {@link Stream#anyMatch(java.util.function.Predicate)}. + * @return Like {@link Stream#anyMatch(java.util.function.Predicate)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default boolean anyMatch(final IOPredicate<? super T> predicate) throws IOException { + return unwrap().anyMatch(t -> Erase.test(predicate, t)); + } + + /** + * TODO Package-private for now, needs IOCollector? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#collect(Collector)}. + * + * Package private for now. + * + * @param <R> Like {@link Stream#collect(Collector)}. + * @param <A> Like {@link Stream#collect(Collector)}. + * @param collector Like {@link Stream#collect(Collector)}. + * @return Like {@link Stream#collect(Collector)}. + */ + default <R, A> R collect(final Collector<? super T, A, R> collector) { + return unwrap().collect(collector); + } + + /** + * Like + * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}. + * + * @param <R> Like + * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}. + * @param supplier Like + * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}. + * @param accumulator Like + * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}. + * @param combiner Like + * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}. + * @return Like + * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default <R> R collect(final IOSupplier<R> supplier, final IOBiConsumer<R, ? super T> accumulator, final IOBiConsumer<R, R> combiner) throws IOException { + return unwrap().collect(() -> Erase.get(supplier), (t, u) -> Erase.accept(accumulator, t, u), (t, u) -> Erase.accept(combiner, t, u)); + } + + /** + * Like {@link Stream#count()}. + * + * @return Like {@link Stream#count()}. + */ + default long count() { + return unwrap().count(); + } + + /** + * Like {@link Stream#distinct()}. + * + * @return Like {@link Stream#distinct()}. + */ + default IOStream<T> distinct() { + return adapt(unwrap().distinct()); + } + + /** + * Like {@link Stream#filter(java.util.function.Predicate)}. + * + * @param predicate Like {@link Stream#filter(java.util.function.Predicate)}. + * @return Like {@link Stream#filter(java.util.function.Predicate)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default IOStream<T> filter(final IOPredicate<? super T> predicate) throws IOException { + return adapt(unwrap().filter(t -> Erase.test(predicate, t))); + } + + /** + * Like {@link Stream#findAny()}. + * + * @return Like {@link Stream#findAny()}. + */ + default Optional<T> findAny() { + return unwrap().findAny(); + } + + /** + * Like {@link Stream#findFirst()}. + * + * @return Like {@link Stream#findFirst()}. + */ + default Optional<T> findFirst() { + return unwrap().findFirst(); + } + + /** + * Like {@link Stream#flatMap(java.util.function.Function)}. + * + * @param <R> Like {@link Stream#flatMap(java.util.function.Function)}. + * @param mapper Like {@link Stream#flatMap(java.util.function.Function)}. + * @return Like {@link Stream#flatMap(java.util.function.Function)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default <R> IOStream<R> flatMap(final IOFunction<? super T, ? extends IOStream<? extends R>> mapper) throws IOException { + return adapt(unwrap().flatMap(t -> Erase.apply(mapper, t).unwrap())); + } + + /** + * TODO Package-private for now, needs IODoubleStream? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#flatMapToDouble(java.util.function.Function)}. + * + * @param mapper Like {@link Stream#flatMapToDouble(java.util.function.Function)}. + * @return Like {@link Stream#flatMapToDouble(java.util.function.Function)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default DoubleStream flatMapToDouble(final IOFunction<? super T, ? extends DoubleStream> mapper) throws IOException { + return unwrap().flatMapToDouble(t -> Erase.apply(mapper, t)); + } + + /** + * TODO Package-private for now, needs IOIntStream? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#flatMapToInt(java.util.function.Function)}. + * + * @param mapper Like {@link Stream#flatMapToInt(java.util.function.Function)}. + * @return Like {@link Stream#flatMapToInt(java.util.function.Function)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default IntStream flatMapToInt(final IOFunction<? super T, ? extends IntStream> mapper) throws IOException { + return unwrap().flatMapToInt(t -> Erase.apply(mapper, t)); + } + + /** + * TODO Package-private for now, needs IOLongStream? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#flatMapToLong(java.util.function.Function)}. + * + * @param mapper Like {@link Stream#flatMapToLong(java.util.function.Function)}. + * @return Like {@link Stream#flatMapToLong(java.util.function.Function)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default LongStream flatMapToLong(final IOFunction<? super T, ? extends LongStream> mapper) throws IOException { + return unwrap().flatMapToLong(t -> Erase.apply(mapper, t)); + } + + /** + * Like {@link Stream#forEach(java.util.function.Consumer)} but throws {@link IOException}. + * + * @param action Like {@link Stream#forEach(java.util.function.Consumer)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default void forEach(final IOConsumer<? super T> action) throws IOException { + unwrap().forEach(e -> Erase.accept(action, e)); + } + + /** + * Like {@link Stream#forEachOrdered(java.util.function.Consumer)}. + * + * @param action Like {@link Stream#forEachOrdered(java.util.function.Consumer)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default void forEachOrdered(final IOConsumer<? super T> action) throws IOException { + unwrap().forEachOrdered(e -> Erase.accept(action, e)); + } + + /** + * Like {@link Stream#limit(long)}. + * + * @param maxSize Like {@link Stream#limit(long)}. + * @return Like {@link Stream#limit(long)}. + */ + default IOStream<T> limit(final long maxSize) { + return adapt(unwrap().limit(maxSize)); + } + + /** + * Like {@link Stream#map(java.util.function.Function)}. + * + * @param <R> Like {@link Stream#map(java.util.function.Function)}. + * @param mapper Like {@link Stream#map(java.util.function.Function)}. + * @return Like {@link Stream#map(java.util.function.Function)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default <R> IOStream<R> map(final IOFunction<? super T, ? extends R> mapper) throws IOException { + return adapt(unwrap().map(t -> Erase.apply(mapper, t))); + } + + /** + * TODO Package-private for now, needs IOToDoubleFunction? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#mapToDouble(ToDoubleFunction)}. + * + * Package private for now. + * + * @param mapper Like {@link Stream#mapToDouble(ToDoubleFunction)}. + * @return Like {@link Stream#mapToDouble(ToDoubleFunction)}. + */ + default DoubleStream mapToDouble(final ToDoubleFunction<? super T> mapper) { + return unwrap().mapToDouble(mapper); + } + + /** + * TODO Package-private for now, needs IOToIntFunction? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#mapToInt(ToIntFunction)}. + * + * Package private for now. + * + * @param mapper Like {@link Stream#mapToInt(ToIntFunction)}. + * @return Like {@link Stream#mapToInt(ToIntFunction)}. + */ + default IntStream mapToInt(final ToIntFunction<? super T> mapper) { + return unwrap().mapToInt(mapper); + } + + /** + * TODO Package-private for now, needs IOToLongFunction? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#mapToLong(ToLongFunction)}. + * + * Package private for now. + * + * @param mapper Like {@link Stream#mapToLong(ToLongFunction)}. + * @return Like {@link Stream#mapToLong(ToLongFunction)}. + */ + default LongStream mapToLong(final ToLongFunction<? super T> mapper) { + return unwrap().mapToLong(mapper); + } + + /** + * Like {@link Stream#max(java.util.Comparator)}. + * + * @param comparator Like {@link Stream#max(java.util.Comparator)}. + * @return Like {@link Stream#max(java.util.Comparator)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default Optional<T> max(final IOComparator<? super T> comparator) throws IOException { + return unwrap().max((t, u) -> Erase.compare(comparator, t, u)); + } + + /** + * Like {@link Stream#min(java.util.Comparator)}. + * + * @param comparator Like {@link Stream#min(java.util.Comparator)}. + * @return Like {@link Stream#min(java.util.Comparator)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default Optional<T> min(final IOComparator<? super T> comparator) throws IOException { + return unwrap().min((t, u) -> Erase.compare(comparator, t, u)); + } + + /** + * Like {@link Stream#noneMatch(java.util.function.Predicate)}. + * + * @param predicate Like {@link Stream#noneMatch(java.util.function.Predicate)}. + * @return Like {@link Stream#noneMatch(java.util.function.Predicate)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default boolean noneMatch(final IOPredicate<? super T> predicate) throws IOException { + return unwrap().noneMatch(t -> Erase.test(predicate, t)); + } + + /** + * Like {@link Stream#peek(java.util.function.Consumer)}. + * + * @param action Like {@link Stream#peek(java.util.function.Consumer)}. + * @return Like {@link Stream#peek(java.util.function.Consumer)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default IOStream<T> peek(final IOConsumer<? super T> action) throws IOException { + return adapt(unwrap().peek(t -> Erase.accept(action, t))); + } + + /** + * Like {@link Stream#reduce(java.util.function.BinaryOperator)}. + * + * @param accumulator Like {@link Stream#reduce(java.util.function.BinaryOperator)}. + * @return Like {@link Stream#reduce(java.util.function.BinaryOperator)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default Optional<T> reduce(final IOBinaryOperator<T> accumulator) throws IOException { + return unwrap().reduce((t, u) -> Erase.apply(accumulator, t, u)); + } + + /** + * Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}. + * + * @param identity Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}. + * @param accumulator Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}. + * @return Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default T reduce(final T identity, final IOBinaryOperator<T> accumulator) throws IOException { + return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u)); + } + + /** + * Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}. + * + * @param <U> Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}. + * @param identity Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}. + * @param accumulator Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}. + * @param combiner Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}. + * @return Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default <U> U reduce(final U identity, final IOBiFunction<U, ? super T, U> accumulator, final IOBinaryOperator<U> combiner) throws IOException { + return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u), (t, u) -> Erase.apply(combiner, t, u)); + } + + /** + * Like {@link Stream#skip(long)}. + * + * @param n Like {@link Stream#skip(long)}. + * @return Like {@link Stream#skip(long)}. + */ + default IOStream<T> skip(final long n) { + return adapt(unwrap().skip(n)); + } + + /** + * Like {@link Stream#sorted()}. + * + * @return Like {@link Stream#sorted()}. + */ + default IOStream<T> sorted() { + return adapt(unwrap().sorted()); + } + + /** + * Like {@link Stream#sorted(java.util.Comparator)}. + * + * @param comparator Like {@link Stream#sorted(java.util.Comparator)}. + * @return Like {@link Stream#sorted(java.util.Comparator)}. + * @throws IOException if an I/O error occurs. + */ + @SuppressWarnings("unused") // thrown by Erase. + default IOStream<T> sorted(final IOComparator<? super T> comparator) throws IOException { + return adapt(unwrap().sorted((t, u) -> Erase.compare(comparator, t, u))); + } + + /** + * Like {@link Stream#toArray()}. + * + * @return {@link Stream#toArray()}. + */ + default Object[] toArray() { + return unwrap().toArray(); + } + + /** + * TODO Package-private for now, needs IOIntFunction? + * + * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It + * would be ideal to have only one. + * + * Like {@link Stream#toArray(IntFunction)}. + * + * Package private for now. + * + * @param <A> Like {@link Stream#toArray(IntFunction)}. + * @param generator Like {@link Stream#toArray(IntFunction)}. + * @return Like {@link Stream#toArray(IntFunction)}. + */ + default <A> A[] toArray(final IntFunction<A[]> generator) { + return unwrap().toArray(generator); + } + +} diff --git a/src/main/java/org/apache/commons/io/function/IOStreamAdapter.java b/src/main/java/org/apache/commons/io/function/IOStreamAdapter.java new file mode 100644 index 00000000..aed21a7f --- /dev/null +++ b/src/main/java/org/apache/commons/io/function/IOStreamAdapter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.util.stream.Stream; + +/** + * Adapts an {@link Stream} as an {@link IOStream}. + * + * Keep package-private for now. + * + * @param <T> the type of the stream elements. + */ +final class IOStreamAdapter<T> extends IOBaseStreamAdapter<T, IOStream<T>, Stream<T>> implements IOStream<T> { + + @SuppressWarnings("resource") + static <T> IOStream<T> adapt(final Stream<T> delegate) { + return delegate != null ? new IOStreamAdapter<>(delegate) : IOStream.empty(); + } + + private IOStreamAdapter(final Stream<T> delegate) { + super(delegate); + } + + @Override + public IOStream<T> wrap(final Stream<T> delegate) { + return unwrap() == delegate ? this : adapt(delegate); + } + +} diff --git a/src/main/java/org/apache/commons/io/function/IOStreams.java b/src/main/java/org/apache/commons/io/function/IOStreams.java index 5dfd107f..70cac787 100644 --- a/src/main/java/org/apache/commons/io/function/IOStreams.java +++ b/src/main/java/org/apache/commons/io/function/IOStreams.java @@ -18,67 +18,34 @@ package org.apache.commons.io.function; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.commons.io.IOExceptionList; +import org.apache.commons.io.IOIndexedException; /** * Keep this code package-private for now. */ -class IOStreams { +final class IOStreams { - /** - * Accepts and throws an IOException. - * - * @param <T> The consumer type. - * @param consumer The consumer to accept. - * @param t the input argument. - * @throws IOException if an I/O error occurs; erased for the compiler. - */ - static <T> void accept(final IOConsumer<T> consumer, T t) { - try { - consumer.accept(t); - } catch (IOException ex) { - rethrow(ex); - } - } + static final Object NONE = new Object(); static <T> void forAll(final Stream<T> stream, final IOConsumer<T> action) throws IOExceptionList { forAll(stream, action, (i, e) -> e); } + @SuppressWarnings("resource") // adapt() static <T> void forAll(final Stream<T> stream, final IOConsumer<T> action, final BiFunction<Integer, IOException, IOException> exSupplier) throws IOExceptionList { - final AtomicReference<List<IOException>> causeList = new AtomicReference<>(); - final AtomicInteger index = new AtomicInteger(); - final IOConsumer<T> actualAction = toIOConsumer(action); - of(stream).forEach(e -> { - try { - actualAction.accept(e); - } catch (IOException ex) { - if (causeList.get() == null) { - // Only allocate if required - causeList.set(new ArrayList<>()); - } - if (exSupplier != null) { - causeList.get().add(exSupplier.apply(index.get(), ex)); - } - } - index.incrementAndGet(); - }); - IOExceptionList.checkEmpty(causeList.get(), null); + IOStream.adapt(stream).forAll(action, IOIndexedException::new); } @SuppressWarnings("unused") // IOStreams.rethrow() throws static <T> void forEach(final Stream<T> stream, final IOConsumer<T> action) throws IOException { final IOConsumer<T> actualAction = toIOConsumer(action); - of(stream).forEach(e -> accept(actualAction, e)); + of(stream).forEach(e -> Erase.accept(actualAction, e)); } /** @@ -112,20 +79,11 @@ class IOStreams { return values == null ? Stream.empty() : Stream.of(values); } - /** - * Throws the given throwable. - * - * @param <T> The throwable cast type. - * @param throwable The throwable to rethrow. - * @return nothing because we throw. - * @throws T Always thrown. - */ - @SuppressWarnings("unchecked") - static <T extends Throwable> RuntimeException rethrow(final Throwable throwable) throws T { - throw (T) throwable; // hack - } - static <T> IOConsumer<T> toIOConsumer(final IOConsumer<T> action) { return action != null ? action : IOConsumer.noop(); } + + private IOStreams() { + // no instances + } } diff --git a/src/main/java/org/apache/commons/io/function/UncheckedIOBaseStream.java b/src/main/java/org/apache/commons/io/function/UncheckedIOBaseStream.java new file mode 100644 index 00000000..3e033ee0 --- /dev/null +++ b/src/main/java/org/apache/commons/io/function/UncheckedIOBaseStream.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.Spliterator; +import java.util.stream.BaseStream; + +/** + * An {@link BaseStream} for a {@link IOBaseStream} that throws {@link UncheckedIOException} instead of + * {@link IOException}. + * + * Keep package-private for now. + * + * @param <T> the type of the stream elements. + * @param <S> the type of the IO stream extending {@code IOBaseStream}. + * @param <B> the type of the stream extending {@code BaseStream}. + */ +class UncheckedIOBaseStream<T, S extends IOBaseStream<T, S, B>, B extends BaseStream<T, B>> implements BaseStream<T, B> { + + private final S delegate; + + UncheckedIOBaseStream(final S delegate) { + this.delegate = delegate; + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public boolean isParallel() { + return delegate.isParallel(); + } + + @Override + public Iterator<T> iterator() { + return delegate.iterator().asIterator(); + } + + @SuppressWarnings("resource") + @Override + public B onClose(final Runnable closeHandler) { + return Uncheck.apply(delegate::onClose, () -> closeHandler.run()).unwrap(); + } + + @SuppressWarnings("resource") + @Override + public B parallel() { + return delegate.parallel().unwrap(); + } + + @SuppressWarnings("resource") + @Override + public B sequential() { + return delegate.sequential().unwrap(); + } + + @Override + public Spliterator<T> spliterator() { + return delegate.spliterator().unwrap(); + } + + @SuppressWarnings("resource") + @Override + public B unordered() { + return delegate.unordered().unwrap(); + } + +} diff --git a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java index ad4653ba..259f64c9 100644 --- a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java +++ b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Objects; import org.apache.commons.io.IOUtils; import org.apache.commons.io.function.IOConsumer; @@ -175,7 +174,7 @@ public class ObservableInputStream extends ProxyInputStream { } private void forEachObserver(final IOConsumer<Observer> action) throws IOException { - IOConsumer.forAll(Objects.requireNonNull(action), observers); + IOConsumer.forAll(action, observers); } /** diff --git a/src/test/java/org/apache/commons/io/function/EraseTest.java b/src/test/java/org/apache/commons/io/function/EraseTest.java new file mode 100644 index 00000000..cabaebb9 --- /dev/null +++ b/src/test/java/org/apache/commons/io/function/EraseTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +/** + * Tests {@code Erase}. + */ +class EraseTest { + + private final AtomicInteger intRef = new AtomicInteger(); + private final AtomicBoolean boolRef = new AtomicBoolean(); + + @Test + void testAcceptIOBiConsumerOfTUTU() { + Erase.accept((e, f) -> boolRef.set(intRef.compareAndSet(0, e)), 1, true); + assertEquals(1, intRef.get()); + assertTrue(boolRef.get()); + assertThrows(IOException.class, () -> Erase.accept(TestUtils.throwingIOBiConsumer(), null, 1)); + } + + @Test + void testAcceptIOConsumerOfTT() { + Erase.accept(e -> intRef.compareAndSet(0, e), 1); + assertEquals(1, intRef.get()); + assertThrows(IOException.class, () -> Erase.accept(TestUtils.throwingIOConsumer(), 1)); + } + + @Test + void testApplyIOBiFunctionOfQsuperTQsuperUQextendsRTU() { + assertTrue(Erase.<Integer, Boolean, Boolean>apply((i, b) -> boolRef.compareAndSet(false, intRef.compareAndSet(0, i.intValue())), 1, Boolean.TRUE)); + assertThrows(IOException.class, () -> Erase.apply(TestUtils.throwingIOBiFunction(), 1, Boolean.TRUE)); + } + + @Test + void testApplyIOFunctionOfQsuperTQextendsRT() { + assertTrue(Erase.<Integer, Boolean>apply(e -> intRef.compareAndSet(0, e), 1)); + assertThrows(IOException.class, () -> Erase.apply(TestUtils.throwingIOFunction(), 1)); + } + + @Test + void testCompare() { + assertEquals(0, Erase.compare(String::compareTo, "A", "A")); + assertEquals(-1, Erase.compare(String::compareTo, "A", "B")); + assertEquals(1, Erase.compare(String::compareTo, "B", "A")); + assertThrows(IOException.class, () -> Erase.compare(TestUtils.throwingIOComparator(), null, null)); + } + + @Test + void testGet() { + assertEquals(0, Erase.get(() -> intRef.get())); + assertThrows(IOException.class, () -> Erase.get(TestUtils.throwingIOSupplier())); + } + + @Test + void testRethrow() { + assertThrows(IOException.class, () -> Erase.rethrow(new IOException())); + } + + @Test + void testRun() { + Erase.run(() -> intRef.set(1)); + assertEquals(1, intRef.get()); + assertThrows(IOException.class, () -> Erase.run(TestUtils.throwingIORunnable())); + } + + @Test + void testTest() { + assertTrue(Erase.test(e -> intRef.compareAndSet(0, e), 1)); + assertThrows(IOException.class, () -> Erase.test(TestUtils.throwingIOPredicate(), 1)); + } + +} diff --git a/src/test/java/org/apache/commons/io/function/IOBaseStreamTest.java b/src/test/java/org/apache/commons/io/function/IOBaseStreamTest.java new file mode 100644 index 00000000..a77e55f2 --- /dev/null +++ b/src/test/java/org/apache/commons/io/function/IOBaseStreamTest.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertSame; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.BaseStream; +import java.util.stream.Stream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests {@link IOBaseStream}. + */ +public class IOBaseStreamTest { + + /** + * Implements IOBaseStream with generics. + */ + private static class IOBaseStreamFixture<T, S extends IOBaseStreamFixture<T, S, B>, B extends BaseStream<T, B>> implements IOBaseStream<T, S, B> { + + private final B baseStream; + + private IOBaseStreamFixture(final B baseStream) { + this.baseStream = baseStream; + } + + @Override + public B unwrap() { + return baseStream; + } + + @SuppressWarnings("unchecked") // We are this here + @Override + public S wrap(final B delegate) { + return delegate == baseStream ? (S) this : (S) new IOBaseStreamFixture<T, S, B>(delegate); + } + + } + + /** + * Implements IOBaseStream with a concrete type. + */ + private static class IOBaseStreamPathFixture<B extends BaseStream<Path, B>> extends IOBaseStreamFixture<Path, IOBaseStreamPathFixture<B>, B> { + + private IOBaseStreamPathFixture(final B baseStream) { + super(baseStream); + } + + @Override + public IOBaseStreamPathFixture<B> wrap(final B delegate) { + return delegate == unwrap() ? this : new IOBaseStreamPathFixture<>(delegate); + } + + } + + private static class MyRuntimeException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public MyRuntimeException(final String message) { + super(message); + } + + } + + /** Sanity check */ + private BaseStream<Path, ? extends BaseStream<Path, ?>> baseStream; + + /** Generic version */ + private IOBaseStreamFixture<Path, ? extends IOBaseStreamFixture<Path, ?, ?>, ?> ioBaseStream; + + /** Concrete version */ + private IOBaseStreamPathFixture<? extends BaseStream<Path, ?>> ioBaseStreamPath; + + /** Adapter version */ + private IOStream<Path> ioBaseStreamAdapter; + + @BeforeEach + public void beforeEach() { + baseStream = createStreamOfPaths(); + ioBaseStream = createIOBaseStream(); + ioBaseStreamPath = createIOBaseStreamPath(); + ioBaseStreamAdapter = createIOBaseStreamApapter(); + } + + private IOBaseStreamFixture<Path, ?, Stream<Path>> createIOBaseStream() { + return new IOBaseStreamFixture<>(createStreamOfPaths()); + } + + private IOStream<Path> createIOBaseStreamApapter() { + return IOStreamAdapter.adapt(createStreamOfPaths()); + } + + private IOBaseStreamPathFixture<Stream<Path>> createIOBaseStreamPath() { + return new IOBaseStreamPathFixture<>(createStreamOfPaths()); + } + + private Stream<Path> createStreamOfPaths() { + return Stream.of(TestConstants.ABS_PATH_A, TestConstants.ABS_PATH_B); + } + + @Test + @AfterEach + public void testClose() { + baseStream.close(); + ioBaseStream.close(); + ioBaseStreamPath.close(); + ioBaseStream.asBaseStream().close(); + ioBaseStreamPath.asBaseStream().close(); + } + + @SuppressWarnings("resource") // @AfterEach + @Test + public void testIsParallel() { + assertFalse(baseStream.isParallel()); + assertFalse(ioBaseStream.isParallel()); + assertFalse(ioBaseStream.asBaseStream().isParallel()); + assertFalse(ioBaseStreamPath.asBaseStream().isParallel()); + assertFalse(ioBaseStreamPath.isParallel()); + } + + @SuppressWarnings("resource") // @AfterEach + @Test + public void testIteratorPathIO() throws IOException { + final AtomicReference<Path> ref = new AtomicReference<>(); + ioBaseStream.iterator().forEachRemaining(e -> ref.set(e.toRealPath())); + assertEquals(TestConstants.ABS_PATH_B.toRealPath(), ref.get()); + // + ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.set(e.getFileName())); + assertEquals(TestConstants.ABS_PATH_B.getFileName(), ref.get()); + } + + @SuppressWarnings("resource") // @AfterEach + @Test + public void testIteratorSimple() throws IOException { + final AtomicInteger ref = new AtomicInteger(); + baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + ioBaseStream.iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(4, ref.get()); + ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(6, ref.get()); + } + + @SuppressWarnings("resource") + @Test + public void testOnClose() { + // Stream + testOnClose(baseStream); + testOnClose(ioBaseStream.asBaseStream()); + testOnClose(ioBaseStreamPath.asBaseStream()); + } + + @SuppressWarnings("resource") + private <T, S extends BaseStream<T, S>> void testOnClose(final BaseStream<T, S> stream) { + final AtomicReference<String> refA = new AtomicReference<>(); + final AtomicReference<String> refB = new AtomicReference<>(); + stream.onClose(() -> refA.set("A")); + stream.onClose(() -> { + throw new MyRuntimeException("B"); + }); + stream.onClose(() -> { + throw new MyRuntimeException("C"); + }); + stream.onClose(() -> refB.set("D")); + final MyRuntimeException e = assertThrows(MyRuntimeException.class, stream::close); + assertEquals("A", refA.get()); + assertEquals("D", refB.get()); + assertEquals("B", e.getMessage()); + final Throwable[] suppressed = e.getSuppressed(); + assertNotNull(suppressed); + assertEquals(1, suppressed.length); + assertEquals("C", suppressed[0].getMessage()); + } + + @SuppressWarnings("resource") + @Test + public void testParallel() throws IOException { + final AtomicInteger ref = new AtomicInteger(); + baseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + ioBaseStream.parallel().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(4, ref.get()); + final BaseStream<Path, ?> parallel = ioBaseStreamPath.asBaseStream().parallel(); + parallel.iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(6, ref.get()); + assertTrue(parallel.isParallel()); + } + + @SuppressWarnings("resource") // @AfterEach + @Test + public void testParallelParallel() { + try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) { + testParallelParallel(stream); + } + try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) { + testParallelParallel(stream); + } + try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) { + testParallelParallel(stream); + } + try (final IOBaseStreamFixture<Path, ?, Stream<Path>> stream = createIOBaseStream()) { + testParallelParallel(stream.asBaseStream()); + } + } + + @SuppressWarnings("resource") + private void testParallelParallel(final BaseStream<?, ?> stream) { + final BaseStream<?, ?> seq = stream.sequential(); + assertFalse(seq.isParallel()); + final BaseStream<?, ?> p1 = seq.parallel(); + assertTrue(p1.isParallel()); + final BaseStream<?, ?> p2 = p1.parallel(); + assertTrue(p1.isParallel()); + assertSame(p1, p2); + } + + @SuppressWarnings("resource") + private void testParallelParallel(final IOBaseStream<?, ?, ?> stream) { + final IOBaseStream<?, ?, ?> seq = stream.sequential(); + assertFalse(seq.isParallel()); + final IOBaseStream<?, ?, ?> p1 = seq.parallel(); + assertTrue(p1.isParallel()); + final IOBaseStream<?, ?, ?> p2 = p1.parallel(); + assertTrue(p1.isParallel()); + assertSame(p1, p2); + } + + @SuppressWarnings("resource") + @Test + public void testSequential() throws IOException { + final AtomicInteger ref = new AtomicInteger(); + baseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + ioBaseStream.sequential().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(4, ref.get()); + ioBaseStreamPath.asBaseStream().sequential().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(6, ref.get()); + } + + @SuppressWarnings("resource") // @AfterEach + @Test + public void testSequentialSequential() { + try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) { + testSequentialSequential(stream); + } + try (final IOBaseStream<?, ?, ?> stream = createIOBaseStreamPath()) { + testSequentialSequential(stream); + } + try (final IOBaseStream<?, ?, ?> stream = createIOBaseStream()) { + testSequentialSequential(stream.asBaseStream()); + } + } + + @SuppressWarnings("resource") + private void testSequentialSequential(final BaseStream<?, ?> stream) { + final BaseStream<?, ?> p = stream.parallel(); + assertTrue(p.isParallel()); + final BaseStream<?, ?> seq1 = p.sequential(); + assertFalse(seq1.isParallel()); + final BaseStream<?, ?> seq2 = seq1.sequential(); + assertFalse(seq1.isParallel()); + assertSame(seq1, seq2); + } + + @SuppressWarnings("resource") + private void testSequentialSequential(final IOBaseStream<?, ?, ?> stream) { + final IOBaseStream<?, ?, ?> p = stream.parallel(); + assertTrue(p.isParallel()); + final IOBaseStream<?, ?, ?> seq1 = p.sequential(); + assertFalse(seq1.isParallel()); + final IOBaseStream<?, ?, ?> seq2 = seq1.sequential(); + assertFalse(seq1.isParallel()); + assertSame(seq1, seq2); + } + + @SuppressWarnings("resource") // @AfterEach + @Test + public void testSpliterator() { + final AtomicInteger ref = new AtomicInteger(); + baseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + ioBaseStream.spliterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(4, ref.get()); + ioBaseStreamPath.asBaseStream().spliterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(6, ref.get()); + } + + @SuppressWarnings("resource") + @Test + public void testUnordered() throws IOException { + final AtomicInteger ref = new AtomicInteger(); + baseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + ioBaseStream.unordered().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(4, ref.get()); + ioBaseStreamPath.asBaseStream().unordered().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(6, ref.get()); + } + + @SuppressWarnings("resource") + @Test + public void testUnwrap() { + final AtomicInteger ref = new AtomicInteger(); + baseStream.iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + ioBaseStream.unwrap().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(4, ref.get()); + ioBaseStreamPath.asBaseStream().iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(6, ref.get()); + } + + @Test + public void testWrap() { + final Stream<Path> stream = createStreamOfPaths(); + @SuppressWarnings("resource") + final IOStream<Path> wrap = ioBaseStreamAdapter.wrap(stream); + assertNotNull(wrap); + assertEquals(stream, wrap.unwrap()); + } + +} diff --git a/src/test/java/org/apache/commons/io/function/IOIntStream.java b/src/test/java/org/apache/commons/io/function/IOIntStream.java new file mode 100644 index 00000000..961235e9 --- /dev/null +++ b/src/test/java/org/apache/commons/io/function/IOIntStream.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.util.stream.IntStream; + +/** + * Placeholder for future possible development and makes sure we can extend IOBaseStream cleanly with proper generics. + */ +interface IOIntStream extends IOBaseStream<Integer, IOIntStream, IntStream> { + // Placeholder for future possible development. +} diff --git a/src/test/java/org/apache/commons/io/function/IOIntStreamAdapter.java b/src/test/java/org/apache/commons/io/function/IOIntStreamAdapter.java new file mode 100644 index 00000000..7ab20d20 --- /dev/null +++ b/src/test/java/org/apache/commons/io/function/IOIntStreamAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.util.stream.IntStream; + +/** + * Placeholder for future possible development and makes sure we can extend IOBaseStreamAdapter cleanly with proper + * generics. + */ +class IOIntStreamAdapter extends IOBaseStreamAdapter<Integer, IOIntStream, IntStream> implements IOIntStream { + + static IOIntStream adapt(final IntStream stream) { + return new IOIntStreamAdapter(stream); + } + + private IOIntStreamAdapter(final IntStream stream) { + super(stream); + } + + @Override + public IOIntStream wrap(final IntStream delegate) { + return unwrap() == delegate ? this : IOIntStreamAdapter.adapt(delegate); + } + +} diff --git a/src/test/java/org/apache/commons/io/function/IOStreamTest.java b/src/test/java/org/apache/commons/io/function/IOStreamTest.java new file mode 100644 index 00000000..6391e9b4 --- /dev/null +++ b/src/test/java/org/apache/commons/io/function/IOStreamTest.java @@ -0,0 +1,540 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; + +/** + * Tests {@link IOStream}. + */ +public class IOStreamTest { + + private void compareAndSetIO(final AtomicReference<String> ref, final String expected, final String update) throws IOException { + TestUtils.compareAndSetThrowsIO(ref, expected, update); + } + + private void compareAndSetRE(final AtomicReference<String> ref, final String expected, final String update) { + TestUtils.compareAndSetThrowsRE(ref, expected, update); + } + + private void ioExceptionOnNull(final Object test) throws IOException { + if (test == null) { + throw new IOException("Unexpected"); + } + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testAdapt() { + assertEquals(0, IOStream.adapt((Stream<?>) null).count()); + assertEquals(0, IOStream.adapt(Stream.empty()).count()); + assertEquals(1, IOStream.adapt(Stream.of("A")).count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testAllMatch() throws IOException { + assertThrows(IOException.class, () -> IOStream.of("A", "B").allMatch(TestConstants.THROWING_IO_PREDICATE)); + assertTrue(IOStream.of("A", "B").allMatch(IOPredicate.alwaysTrue())); + assertFalse(IOStream.of("A", "B").allMatch(IOPredicate.alwaysFalse())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testAnyMatch() throws IOException { + assertThrows(IOException.class, () -> IOStream.of("A", "B").anyMatch(TestConstants.THROWING_IO_PREDICATE)); + assertTrue(IOStream.of("A", "B").anyMatch(IOPredicate.alwaysTrue())); + assertFalse(IOStream.of("A", "B").anyMatch(IOPredicate.alwaysFalse())); + } + + @Test + public void testClose() { + IOStream.of("A", "B").close(); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testCollectCollectorOfQsuperTAR() { + // TODO IOCollector? + IOStream.of("A", "B").collect(Collectors.toList()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testCollectSupplierOfRBiConsumerOfRQsuperTBiConsumerOfRR() throws IOException { + // TODO Need an IOCollector? + IOStream.of("A", "B").collect(() -> "A", (t, u) -> {}, (t, u) -> {}); + assertEquals("AB", Stream.of("A", "B").collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString()); + assertEquals("AB", IOStream.of("A", "B").collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString()); + // Exceptions + assertThrows(IOException.class, () -> IOStream.of("A", "B").collect(TestUtils.throwingIOSupplier(), (t, u) -> {}, (t, u) -> {})); + assertThrows(IOException.class, () -> IOStream.of("A", "B").collect(() -> "A", TestUtils.throwingIOBiConsumer(), (t, u) -> {})); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testCount() { + assertEquals(0, IOStream.of().count()); + assertEquals(1, IOStream.of("A").count()); + assertEquals(2, IOStream.of("A", "B").count()); + assertEquals(3, IOStream.of("A", "B", "C").count()); + assertEquals(3, IOStream.of("A", "A", "A").count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testDistinct() { + assertEquals(0, IOStream.of().distinct().count()); + assertEquals(1, IOStream.of("A").distinct().count()); + assertEquals(2, IOStream.of("A", "B").distinct().count()); + assertEquals(3, IOStream.of("A", "B", "C").distinct().count()); + assertEquals(1, IOStream.of("A", "A", "A").distinct().count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testEmpty() throws IOException { + assertEquals(0, Stream.empty().count()); + assertEquals(0, IOStream.empty().count()); + IOStream.empty().forEach(TestUtils.throwingIOConsumer()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testFilter() throws IOException { + IOStream.of("A").filter(TestConstants.THROWING_IO_PREDICATE); + // compile vs type + assertThrows(IOException.class, () -> IOStream.of("A").filter(TestConstants.THROWING_IO_PREDICATE).count()); + // compile vs inline lambda + assertThrows(IOException.class, () -> IOStream.of("A").filter(e -> { + throw new IOException("Failure"); + }).count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testFindAny() throws IOException { + // compile vs type + assertThrows(IOException.class, () -> IOStream.of("A").filter(TestConstants.THROWING_IO_PREDICATE).findAny()); + // compile vs inline lambda + assertThrows(IOException.class, () -> IOStream.of("A").filter(e -> { + throw new IOException("Failure"); + }).findAny()); + + assertTrue(IOStream.of("A", "B").filter(IOPredicate.alwaysTrue()).findAny().isPresent()); + assertFalse(IOStream.of("A", "B").filter(IOPredicate.alwaysFalse()).findAny().isPresent()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testFindFirst() throws IOException { + // compile vs type + assertThrows(IOException.class, () -> IOStream.of("A").filter(TestConstants.THROWING_IO_PREDICATE).findFirst()); + // compile vs inline lambda + assertThrows(IOException.class, () -> IOStream.of("A").filter(e -> { + throw new IOException("Failure"); + }).findAny()); + + assertTrue(IOStream.of("A", "B").filter(IOPredicate.alwaysTrue()).findFirst().isPresent()); + assertFalse(IOStream.of("A", "B").filter(IOPredicate.alwaysFalse()).findFirst().isPresent()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testFlatMap() throws IOException { + assertEquals(Arrays.asList("A", "B", "C", "D"), + IOStream.of(IOStream.of("A", "B"), IOStream.of("C", "D")).flatMap(IOFunction.identity()).collect(Collectors.toList())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testFlatMapToDouble() throws IOException { + assertEquals('A' + 'B', IOStream.of("A", "B").flatMapToDouble(e -> DoubleStream.of(e.charAt(0))).sum()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testFlatMapToInt() throws IOException { + assertEquals('A' + 'B', IOStream.of("A", "B").flatMapToInt(e -> IntStream.of(e.charAt(0))).sum()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testFlatMapToLong() throws IOException { + assertEquals('A' + 'B', IOStream.of("A", "B").flatMapToLong(e -> LongStream.of(e.charAt(0))).sum()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testForEachIOConsumerOfQsuperT() throws IOException { + // compile vs type + assertThrows(IOException.class, () -> IOStream.of("A").forEach(TestUtils.throwingIOConsumer())); + // compile vs inlnine + assertThrows(IOException.class, () -> IOStream.of("A").forEach(e -> { + throw new IOException("Failure"); + })); + assertThrows(IOException.class, () -> IOStream.of("A", "B").forEach(TestUtils.throwingIOConsumer())); + final StringBuilder sb = new StringBuilder(); + IOStream.of("A", "B").forEachOrdered(sb::append); + assertEquals("AB", sb.toString()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testForaAllIOConsumer() throws IOException { + // compile vs type + assertThrows(IOException.class, () -> IOStream.of("A").forAll(TestUtils.throwingIOConsumer())); + // compile vs inlnine + assertThrows(IOException.class, () -> IOStream.of("A").forAll(e -> { + throw new IOException("Failure"); + })); + assertThrows(IOException.class, () -> IOStream.of("A", "B").forAll(TestUtils.throwingIOConsumer())); + final StringBuilder sb = new StringBuilder(); + IOStream.of("A", "B").forAll(sb::append); + assertEquals("AB", sb.toString()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testForaAllIOConsumerBiFunction() throws IOException { + // compile vs type + assertThrows(IOException.class, () -> IOStream.of("A").forAll(TestUtils.throwingIOConsumer(), (i, e) -> e)); + // compile vs inlnine + assertThrows(IOException.class, () -> IOStream.of("A").forAll(e -> { + throw new IOException("Failure"); + }, (i, e) -> e)); + assertThrows(IOException.class, () -> IOStream.of("A", "B").forAll(TestUtils.throwingIOConsumer(), (i, e) -> e)); + final StringBuilder sb = new StringBuilder(); + IOStream.of("A", "B").forAll(sb::append, (i, e) -> e); + assertEquals("AB", sb.toString()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testForaAllIOConsumerBiFunctionNull() throws IOException { + // compile vs type + assertDoesNotThrow(() -> IOStream.of("A").forAll(TestUtils.throwingIOConsumer(), null)); + // compile vs inlnine + assertDoesNotThrow(() -> IOStream.of("A").forAll(e -> { + throw new IOException("Failure"); + }, null)); + assertDoesNotThrow(() -> IOStream.of("A", "B").forAll(TestUtils.throwingIOConsumer(), null)); + final StringBuilder sb = new StringBuilder(); + IOStream.of("A", "B").forAll(sb::append, null); + assertEquals("AB", sb.toString()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testForEachOrdered() throws IOException { + // compile vs type + assertThrows(IOException.class, () -> IOStream.of("A").forEach(TestUtils.throwingIOConsumer())); + // compile vs inlnine + assertThrows(IOException.class, () -> IOStream.of("A").forEach(e -> { + throw new IOException("Failure"); + })); + assertThrows(IOException.class, () -> IOStream.of("A", "B").forEach(TestUtils.throwingIOConsumer())); + final StringBuilder sb = new StringBuilder(); + IOStream.of("A", "B").forEachOrdered(sb::append); + assertEquals("AB", sb.toString()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testIsParallel() { + assertFalse(IOStream.of("A", "B").isParallel()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testIterateException() throws IOException { + final IOStream<Long> stream = IOStream.iterate(1L, TestUtils.throwingIOUnaryOperator()); + final IOIterator<Long> iterator = stream.iterator(); + assertEquals(1L, iterator.next()); + assertThrows(IOException.class, () -> iterator.next()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testIterateLong() throws IOException { + final IOStream<Long> stream = IOStream.iterate(1L, i -> i + 1); + final IOIterator<Long> iterator = stream.iterator(); + assertEquals(1L, iterator.next()); + assertEquals(2L, iterator.next()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testIterator() throws IOException { + final AtomicInteger ref = new AtomicInteger(); + IOStream.of("A", "B").iterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testLimit() { + assertEquals(1, IOStream.of("A", "B").limit(1).count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testMap() throws IOException { + assertEquals(Arrays.asList("AC", "BC"), IOStream.of("A", "B").map(e -> e + "C").collect(Collectors.toList())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testMapToDouble() { + assertArrayEquals(new double[] {Double.parseDouble("1"), Double.parseDouble("2")}, IOStream.of("1", "2").mapToDouble(Double::parseDouble).toArray()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testMapToInt() { + assertArrayEquals(new int[] {1, 2}, IOStream.of("1", "2").mapToInt(Integer::parseInt).toArray()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testMapToLong() { + assertArrayEquals(new long[] {1L, 2L}, IOStream.of("1", "2").mapToLong(Long::parseLong).toArray()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testMax() throws IOException { + assertEquals("B", IOStream.of("A", "B").max(String::compareTo).get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testMin() throws IOException { + assertEquals("A", IOStream.of("A", "B").min(String::compareTo).get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testNoneMatch() throws IOException { + assertThrows(IOException.class, () -> IOStream.of("A", "B").noneMatch(TestConstants.THROWING_IO_PREDICATE)); + assertFalse(IOStream.of("A", "B").noneMatch(IOPredicate.alwaysTrue())); + assertTrue(IOStream.of("A", "B").noneMatch(IOPredicate.alwaysFalse())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testOfArray() { + assertEquals(0, IOStream.of((String[]) null).count()); + assertEquals(0, IOStream.of().count()); + assertEquals(2, IOStream.of("A", "B").count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testOfOne() { + assertEquals(1, IOStream.of("A").count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testOfIterable() { + assertEquals(0, IOStream.of((Iterable<?>) null).count()); + assertEquals(0, IOStream.of(Collections.emptyList()).count()); + assertEquals(0, IOStream.of(Collections.emptySet()).count()); + assertEquals(0, IOStream.of(Collections.emptySortedSet()).count()); + assertEquals(1, IOStream.of(Arrays.asList("a")).count()); + assertEquals(2, IOStream.of(Arrays.asList("a", "b")).count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testOnClose() throws IOException { + assertThrows(IOException.class, () -> IOStream.of("A").onClose(TestConstants.THROWING_IO_RUNNABLE).close()); + final AtomicReference<String> ref = new AtomicReference<>(); + IOStream.of("A").onClose(() -> compareAndSetIO(ref, null, "new1")).close(); + assertEquals("new1", ref.get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testOnCloseMultipleHandlers() throws IOException { + // + final AtomicReference<String> ref = new AtomicReference<>(); + // Sanity check + ref.set(null); + final RuntimeException thrownRE = assertThrows(RuntimeException.class, () -> { + // @formatter:off + final Stream<String> stream = Stream.of("A") + .onClose(() -> compareAndSetRE(ref, null, "new1")) + .onClose(() -> TestConstants.throwRuntimeException("Failure 2")); + // @formatter:on + stream.close(); + }); + assertEquals("new1", ref.get()); + assertEquals("Failure 2", thrownRE.getMessage()); + assertEquals(0, thrownRE.getSuppressed().length); + // Test + ref.set(null); + final IOException thrownIO = assertThrows(IOException.class, () -> { + // @formatter:off + final IOStream<String> stream = IOStream.of("A") + .onClose(() -> compareAndSetIO(ref, null, "new1")) + .onClose(() -> TestConstants.throwIOException("Failure 2")); + // @formatter:on + stream.close(); + }); + assertEquals("new1", ref.get()); + assertEquals("Failure 2", thrownIO.getMessage()); + assertEquals(0, thrownIO.getSuppressed().length); + // + final IOException thrownB = assertThrows(IOException.class, () -> { + // @formatter:off + final IOStream<String> stream = IOStream.of("A") + .onClose(TestConstants.throwIOException("Failure 1")) + .onClose(TestConstants.throwIOException("Failure 2")); + // @formatter:on + stream.close(); + }); + assertEquals("Failure 1", thrownB.getMessage()); + assertEquals(0, thrownB.getSuppressed().length); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testParallel() { + assertEquals(2, IOStream.of("A", "B").parallel().count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testPeek() throws IOException { + final AtomicReference<String> ref = new AtomicReference<>(); + assertEquals(1, IOStream.of("A").peek(e -> compareAndSetIO(ref, null, e)).count()); + assertEquals("A", ref.get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testReduceBinaryOperatorOfT() throws IOException { + assertEquals("AB", IOStream.of("A", "B").reduce((t, u) -> t + u).get()); + assertEquals(TestConstants.ABS_PATH_A.toRealPath(), + IOStream.of(TestConstants.ABS_PATH_A, TestConstants.ABS_PATH_B).reduce((t, u) -> t.toRealPath()).get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testReduceTBinaryOperatorOfT() throws IOException { + assertEquals("_AB", IOStream.of("A", "B").reduce("_", (t, u) -> t + u)); + assertEquals(TestConstants.ABS_PATH_A.toRealPath(), + IOStream.of(TestConstants.ABS_PATH_A, TestConstants.ABS_PATH_B).reduce(TestConstants.ABS_PATH_A, (t, u) -> t.toRealPath())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testReduceUBiFunctionOfUQsuperTUBinaryOperatorOfU() throws IOException { + assertEquals("_AB", IOStream.of("A", "B").reduce("_", (t, u) -> t + u, (t, u) -> t + u)); + assertEquals(TestConstants.ABS_PATH_A.toRealPath(), IOStream.of(TestConstants.ABS_PATH_A, TestConstants.ABS_PATH_B).reduce(TestConstants.ABS_PATH_A, + (t, u) -> t.toRealPath(), (t, u) -> u.toRealPath())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testSequential() { + assertEquals(2, IOStream.of("A", "B").sequential().count()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testSkip() throws IOException { + final AtomicReference<String> ref = new AtomicReference<>(); + assertEquals(1, IOStream.of("A", "B").skip(1).peek(e -> compareAndSetIO(ref, null, e)).count()); + assertEquals("B", ref.get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testSorted() throws IOException { + assertEquals(Arrays.asList("A", "B", "C", "D"), IOStream.of("D", "A", "B", "C").sorted().collect(Collectors.toList())); + assertEquals(Arrays.asList("A", "B", "C", "D"), IOStream.of("D", "A", "B", "C").sorted().peek(this::ioExceptionOnNull).collect(Collectors.toList())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testSortedComparatorOfQsuperT() throws IOException { + assertEquals(Arrays.asList("A", "B", "C", "D"), IOStream.of("D", "A", "B", "C").sorted(String::compareTo).collect(Collectors.toList())); + assertEquals(Arrays.asList("A", "B", "C", "D"), + IOStream.of("D", "A", "B", "C").sorted(String::compareTo).peek(this::ioExceptionOnNull).collect(Collectors.toList())); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testSpliterator() { + final AtomicInteger ref = new AtomicInteger(); + IOStream.of("A", "B").spliterator().forEachRemaining(e -> ref.incrementAndGet()); + assertEquals(2, ref.get()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testToArray() { + assertArrayEquals(new String[] {"A", "B"}, IOStream.of("A", "B").toArray()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testToArrayIntFunctionOfA() { + assertArrayEquals(new String[] {"A", "B"}, IOStream.of("A", "B").toArray(String[]::new)); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testUnordered() { + // Sanity check + assertArrayEquals(new String[] {"A", "B"}, Stream.of("A", "B").unordered().toArray()); + // Test + assertArrayEquals(new String[] {"A", "B"}, IOStream.of("A", "B").unordered().toArray()); + } + + @SuppressWarnings("resource") // custom stream not recognized by compiler warning machinery + @Test + public void testUnwrap() { + final Stream<String> unwrap = IOStream.of("A", "B").unwrap(); + assertNotNull(unwrap); + assertEquals(2, unwrap.count()); + } + +} diff --git a/src/test/java/org/apache/commons/io/function/IOSupplierTest.java b/src/test/java/org/apache/commons/io/function/IOSupplierTest.java index 981a65bc..4b18f48f 100644 --- a/src/test/java/org/apache/commons/io/function/IOSupplierTest.java +++ b/src/test/java/org/apache/commons/io/function/IOSupplierTest.java @@ -51,7 +51,7 @@ public class IOSupplierTest { @Test public void testAsSupplier() { assertThrows(UncheckedIOException.class, () -> TestConstants.THROWING_IO_SUPPLIER.asSupplier().get()); - assertEquals("new1", getThrowsNone(() -> TestUtils.compareAndSetThrows(ref1, "new1"))); + assertEquals("new1", getThrowsNone(() -> TestUtils.compareAndSetThrowsIO(ref1, "new1"))); assertEquals("new1", ref1.get()); assertNotEquals(TestConstants.THROWING_IO_SUPPLIER.asSupplier(), TestConstants.THROWING_IO_SUPPLIER.asSupplier()); } @@ -62,7 +62,7 @@ public class IOSupplierTest { assertThrows(IOException.class, () -> { throw new IOException(); }); - assertEquals("new1", getThrows(() -> TestUtils.compareAndSetThrows(ref1, "new1"))); + assertEquals("new1", getThrows(() -> TestUtils.compareAndSetThrowsIO(ref1, "new1"))); assertEquals("new1", ref1.get()); } diff --git a/src/test/java/org/apache/commons/io/function/PathBaseStream.java b/src/test/java/org/apache/commons/io/function/PathBaseStream.java new file mode 100644 index 00000000..174d1120 --- /dev/null +++ b/src/test/java/org/apache/commons/io/function/PathBaseStream.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.nio.file.Path; +import java.util.stream.BaseStream; + +/** + * Test fixture. + */ +interface PathBaseStream extends BaseStream<Path, PathBaseStream> { + // empty +} diff --git a/src/test/java/org/apache/commons/io/function/PathStream.java b/src/test/java/org/apache/commons/io/function/PathStream.java new file mode 100644 index 00000000..ba2b7f33 --- /dev/null +++ b/src/test/java/org/apache/commons/io/function/PathStream.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.io.function; + +import java.nio.file.Path; +import java.util.stream.Stream; + +/** + * Test fixture. + */ +interface PathStream extends Stream<Path> { + // empty +} diff --git a/src/test/java/org/apache/commons/io/function/TestConstants.java b/src/test/java/org/apache/commons/io/function/TestConstants.java index 16c9c95b..5f0581c7 100644 --- a/src/test/java/org/apache/commons/io/function/TestConstants.java +++ b/src/test/java/org/apache/commons/io/function/TestConstants.java @@ -62,8 +62,16 @@ class TestConstants { throw new UncheckedIOException(new IOException("Failure")); }; - private static <T> T throwIOException() throws IOException { - throw new IOException("Failure"); + static <T> T throwIOException() throws IOException { + return throwIOException("Failure"); + } + + static <T> T throwIOException(final String message) throws IOException { + throw new IOException(message); + } + + static <T> T throwRuntimeException(final String message) { + throw new RuntimeException(message); } } diff --git a/src/test/java/org/apache/commons/io/function/TestUtils.java b/src/test/java/org/apache/commons/io/function/TestUtils.java index db22fdd0..9b6f3aa7 100644 --- a/src/test/java/org/apache/commons/io/function/TestUtils.java +++ b/src/test/java/org/apache/commons/io/function/TestUtils.java @@ -22,27 +22,68 @@ import java.util.concurrent.atomic.AtomicReference; class TestUtils { - static <T> T compareAndSetThrows(final AtomicReference<T> ref, final T update) throws IOException { - return compareAndSetThrows(ref, null, update); + static <T> T compareAndSetThrowsIO(final AtomicReference<T> ref, final T update) throws IOException { + return compareAndSetThrowsIO(ref, null, update); } - static <T> T compareAndSetThrows(final AtomicReference<T> ref, final T expected, final T update) throws IOException { + static <T> T compareAndSetThrowsIO(final AtomicReference<T> ref, final T expected, final T update) throws IOException { if (!ref.compareAndSet(expected, update)) { throw new IOException("Unexpected"); } return ref.get(); // same as update } + static <T> T compareAndSetThrowsRE(final AtomicReference<T> ref, final T expected, final T update) { + if (!ref.compareAndSet(expected, update)) { + throw new RuntimeException("Unexpected"); + } + return ref.get(); // same as update + } + + @SuppressWarnings("unchecked") + static <T, U> IOBiConsumer<T, U> throwingIOBiConsumer() { + return (IOBiConsumer<T, U>) TestConstants.THROWING_IO_BI_CONSUMER; + } + + @SuppressWarnings("unchecked") + static <T, U, V> IOBiFunction<T, U, V> throwingIOBiFunction() { + return (IOBiFunction<T, U, V>) TestConstants.THROWING_IO_BI_FUNCTION; + } + @SuppressWarnings("unchecked") static <T> IOBinaryOperator<T> throwingIOBinaryOperator() { return (IOBinaryOperator<T>) TestConstants.THROWING_IO_BINARY_OPERATOR; } + @SuppressWarnings("unchecked") + static <T> IOComparator<T> throwingIOComparator() { + return (IOComparator<T>) TestConstants.THROWING_IO_COMPARATOR; + } + @SuppressWarnings("unchecked") static <T> IOConsumer<T> throwingIOConsumer() { return (IOConsumer<T>) TestConstants.THROWING_IO_CONSUMER; } + @SuppressWarnings("unchecked") + static <T, U> IOFunction<T, U> throwingIOFunction() { + return (IOFunction<T, U>) TestConstants.THROWING_IO_FUNCTION; + } + + @SuppressWarnings("unchecked") + static <T> IOPredicate<T> throwingIOPredicate() { + return (IOPredicate<T>) TestConstants.THROWING_IO_PREDICATE; + } + + static IORunnable throwingIORunnable() { + return TestConstants.THROWING_IO_RUNNABLE; + } + + @SuppressWarnings("unchecked") + static <T> IOSupplier<T> throwingIOSupplier() { + return (IOSupplier<T>) TestConstants.THROWING_IO_SUPPLIER; + } + @SuppressWarnings("unchecked") static <T> IOUnaryOperator<T> throwingIOUnaryOperator() { return (IOUnaryOperator<T>) TestConstants.THROWING_IO_UNARY_OPERATOR; diff --git a/src/test/java/org/apache/commons/io/function/UncheckTest.java b/src/test/java/org/apache/commons/io/function/UncheckTest.java index dbb73299..ee73abc6 100644 --- a/src/test/java/org/apache/commons/io/function/UncheckTest.java +++ b/src/test/java/org/apache/commons/io/function/UncheckTest.java @@ -54,8 +54,8 @@ public class UncheckTest { }, null, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.accept(TestConstants.THROWING_IO_BI_CONSUMER, null, null)); Uncheck.accept((t, u) -> { - TestUtils.compareAndSetThrows(ref1, t); - TestUtils.compareAndSetThrows(ref2, u); + TestUtils.compareAndSetThrowsIO(ref1, t); + TestUtils.compareAndSetThrowsIO(ref2, u); }, "new1", "new2"); assertEquals("new1", ref1.get()); assertEquals("new2", ref2.get()); @@ -67,7 +67,7 @@ public class UncheckTest { throw new IOException(); }, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.accept(TestUtils.throwingIOConsumer(), null)); - Uncheck.accept(t -> TestUtils.compareAndSetThrows(ref1, t), "new1"); + Uncheck.accept(t -> TestUtils.compareAndSetThrowsIO(ref1, t), "new1"); assertEquals("new1", ref1.get()); } @@ -78,9 +78,9 @@ public class UncheckTest { }, null, null, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.accept(TestConstants.THROWING_IO_TRI_CONSUMER, null, null, null)); Uncheck.accept((t, u, v) -> { - TestUtils.compareAndSetThrows(ref1, t); - TestUtils.compareAndSetThrows(ref2, u); - TestUtils.compareAndSetThrows(ref3, v); + TestUtils.compareAndSetThrowsIO(ref1, t); + TestUtils.compareAndSetThrowsIO(ref2, u); + TestUtils.compareAndSetThrowsIO(ref3, v); }, "new1", "new2", "new3"); assertEquals("new1", ref1.get()); assertEquals("new2", ref2.get()); @@ -94,8 +94,8 @@ public class UncheckTest { }, null, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.apply(TestConstants.THROWING_IO_BI_FUNCTION, null, null)); assertEquals("new0", Uncheck.apply((t, u) -> { - TestUtils.compareAndSetThrows(ref1, t); - TestUtils.compareAndSetThrows(ref2, u); + TestUtils.compareAndSetThrowsIO(ref1, t); + TestUtils.compareAndSetThrowsIO(ref2, u); return "new0"; }, "new1", "new2")); assertEquals("new1", ref1.get()); @@ -108,7 +108,7 @@ public class UncheckTest { throw new IOException(); }, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.apply(TestConstants.THROWING_IO_FUNCTION, null)); - Uncheck.apply(t -> TestUtils.compareAndSetThrows(ref1, t), "new1"); + Uncheck.apply(t -> TestUtils.compareAndSetThrowsIO(ref1, t), "new1"); assertEquals("new1", ref1.get()); } @@ -119,10 +119,10 @@ public class UncheckTest { }, null, null, null, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.apply(TestConstants.THROWING_IO_QUAD_FUNCTION, null, null, null, null)); assertEquals("new0", Uncheck.apply((t, u, v, w) -> { - TestUtils.compareAndSetThrows(ref1, t); - TestUtils.compareAndSetThrows(ref2, u); - TestUtils.compareAndSetThrows(ref3, v); - TestUtils.compareAndSetThrows(ref4, w); + TestUtils.compareAndSetThrowsIO(ref1, t); + TestUtils.compareAndSetThrowsIO(ref2, u); + TestUtils.compareAndSetThrowsIO(ref3, v); + TestUtils.compareAndSetThrowsIO(ref4, w); return "new0"; }, "new1", "new2", "new3", "new4")); assertEquals("new1", ref1.get()); @@ -138,9 +138,9 @@ public class UncheckTest { }, null, null, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.apply(TestConstants.THROWING_IO_TRI_FUNCTION, null, null, null)); assertEquals("new0", Uncheck.apply((t, u, v) -> { - TestUtils.compareAndSetThrows(ref1, t); - TestUtils.compareAndSetThrows(ref2, u); - TestUtils.compareAndSetThrows(ref3, v); + TestUtils.compareAndSetThrowsIO(ref1, t); + TestUtils.compareAndSetThrowsIO(ref2, u); + TestUtils.compareAndSetThrowsIO(ref3, v); return "new0"; }, "new1", "new2", "new3")); assertEquals("new1", ref1.get()); @@ -154,7 +154,7 @@ public class UncheckTest { throw new IOException(); })); assertThrows(UncheckedIOException.class, () -> Uncheck.get(TestConstants.THROWING_IO_SUPPLIER)); - assertEquals("new1", Uncheck.get(() -> TestUtils.compareAndSetThrows(ref1, "new1"))); + assertEquals("new1", Uncheck.get(() -> TestUtils.compareAndSetThrowsIO(ref1, "new1"))); assertEquals("new1", ref1.get()); } @@ -164,7 +164,7 @@ public class UncheckTest { throw new IOException(); })); assertThrows(UncheckedIOException.class, () -> Uncheck.run(TestConstants.THROWING_IO_RUNNABLE)); - Uncheck.run(() -> TestUtils.compareAndSetThrows(ref1, "new1")); + Uncheck.run(() -> TestUtils.compareAndSetThrowsIO(ref1, "new1")); assertEquals("new1", ref1.get()); } @@ -174,7 +174,7 @@ public class UncheckTest { throw new IOException(); }, null)); assertThrows(UncheckedIOException.class, () -> Uncheck.test(TestConstants.THROWING_IO_PREDICATE, null)); - assertTrue(Uncheck.test(t -> TestUtils.compareAndSetThrows(ref1, t).equals(t), "new1")); + assertTrue(Uncheck.test(t -> TestUtils.compareAndSetThrowsIO(ref1, t).equals(t), "new1")); assertEquals("new1", ref1.get()); }