Hi Jochen, wouldn't it be good to have a Jira issue for this change?
Regards, Benedikt <joc...@apache.org> schrieb am Do., 30. Juni 2016 um 11:04: > Author: jochen > Date: Thu Jun 30 09:04:21 2016 > New Revision: 1750760 > > URL: http://svn.apache.org/viewvc?rev=1750760&view=rev > Log: > Added the ObservableInputStream, and the > MessageDigestCalculatingInputStream. > > Added: > > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java > (with props) > > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java > (with props) > > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java > (with props) > > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java > (with props) > Modified: > commons/proper/io/trunk/src/changes/changes.xml > > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java > > Modified: commons/proper/io/trunk/src/changes/changes.xml > URL: > http://svn.apache.org/viewvc/commons/proper/io/trunk/src/changes/changes.xml?rev=1750760&r1=1750759&r2=1750760&view=diff > > ============================================================================== > --- commons/proper/io/trunk/src/changes/changes.xml (original) > +++ commons/proper/io/trunk/src/changes/changes.xml Thu Jun 30 09:04:21 > 2016 > @@ -46,6 +46,11 @@ The <action> type attribute can be add,u > > <body> > <!-- The release date is the date RC is cut --> > + <release version="2.7" date="Not yet published"> > + <action dev="jochen" type="add"> > + Added the ObservableInputStream, and the > MessageDigestCalculatingInputStream. > + </action> > + </release> > <release version="2.6" date="2016-MM-DD" description="New features > and bug fixes."> > <action issue="IO-511" dev="britter" type="fix" due-to="Ahmet > Celik"> > After a few unit tests, a few newly created directories not > cleaned completely. > > Added: > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java > URL: > http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java?rev=1750760&view=auto > > ============================================================================== > --- > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java > (added) > +++ > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java > Thu Jun 30 09:04:21 2016 > @@ -0,0 +1,84 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.commons.io.input; > + > +import java.io.IOException; > +import java.io.InputStream; > +import java.security.MessageDigest; > +import java.security.NoSuchAlgorithmException; > + > + > +/** > + * This class is an example for using an {@link ObservableInputStream}. It > + * creates its own {@link Observer}, which calculates a checksum using a > + * MessageDigest, for example an MD5 sum. > + * {@em Note}: Neither {@link ObservableInputStream}, nor {@link > MessageDigest}, > + * are thread safe. So is {@link MessageDigestCalculatingInputStream}. > + */ > +public class MessageDigestCalculatingInputStream extends > ObservableInputStream { > + public static class MessageDigestMaintainingObserver extends Observer > { > + private final MessageDigest md; > + > + public MessageDigestMaintainingObserver(MessageDigest pMd) { > + md = pMd; > + } > + > + @Override > + void data(int pByte) throws IOException { > + md.update((byte) pByte); > + } > + > + @Override > + void data(byte[] pBuffer, int pOffset, int pLength) throws > IOException { > + md.update(pBuffer, pOffset, pLength); > + } > + } > + > + private final MessageDigest messageDigest; > + > + /** Creates a new instance, which calculates a signature on the given > stream, > + * using the given {@link MessageDigest}. > + */ > + public MessageDigestCalculatingInputStream(InputStream pStream, > MessageDigest pDigest) { > + super(pStream); > + messageDigest = pDigest; > + add(new MessageDigestMaintainingObserver(pDigest)); > + } > + /** Creates a new instance, which calculates a signature on the given > stream, > + * using a {@link MessageDigest} with the given algorithm. > + */ > + public MessageDigestCalculatingInputStream(InputStream pStream, > String pAlgorithm) throws NoSuchAlgorithmException { > + this(pStream, MessageDigest.getInstance(pAlgorithm)); > + } > + /** Creates a new instance, which calculates a signature on the given > stream, > + * using a {@link MessageDigest} with the "MD5" algorithm. > + */ > + public MessageDigestCalculatingInputStream(InputStream pStream) > throws NoSuchAlgorithmException { > + this(pStream, MessageDigest.getInstance("MD5")); > + } > + > + /** Returns the {@link MessageDigest}, which is being used for > generating the > + * checksum. > + * {@em Note}: The checksum will only reflect the data, which has > been read so far. > + * This is probably not, what you expect. Make sure, that the > complete data has been > + * read, if that is what you want. The easiest way to do so is by > invoking > + * {@link #consume()}. > + */ > + public MessageDigest getMessageDigest() { > + return messageDigest; > + } > +} > > Propchange: > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java > > ------------------------------------------------------------------------------ > svn:mime-type = text/plain > > Added: > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java > URL: > http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java?rev=1750760&view=auto > > ============================================================================== > --- > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java > (added) > +++ > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java > Thu Jun 30 09:04:21 2016 > @@ -0,0 +1,238 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.commons.io.input; > + > +import java.io.IOException; > +import java.io.InputStream; > +import java.security.MessageDigest; > +import java.util.ArrayList; > +import java.util.List; > + > + > +/** > + * The {@link ObservableInputStream} allows, that an InputStream may be > consumed > + * by other receivers, apart from the thread, which is reading it. > + * The other consumers are implemented as instances of {@link Observer}. A > + * typical application may be the generation of a {@link MessageDigest} > on the > + * fly. > + * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread > safe, > + * as instances of InputStream usually aren't. > + * If you must access the stream from multiple threads, then > synchronization, locking, > + * or a similar means must be used. > + * @see MessageDigestCalculatingInputStream > + */ > +public class ObservableInputStream extends ProxyInputStream { > + public static abstract class Observer { > + /** Called to indicate, that {@link InputStream#read()} has been > invoked > + * on the {@link ObservableInputStream}, and will return a value. > + * @param pByte The value, which is being returned. This will > never be -1 (EOF), > + * because, in that case, {link #finished()} will be invoked > instead. > + */ > + void data(int pByte) throws IOException {} > + /** Called to indicate, that {@link InputStream#read(byte[])}, or > + * {@link InputStream#read(byte[], int, int)} have been called, > and are about to > + * invoke data. > + * @param pBuffer The byte array, which has been passed to the > read call, and where > + * data has been stored. > + * @param pOffset The offset within the byte array, where data > has been stored. > + * @param pLength The number of bytes, which have been stored in > the byte array. > + */ > + void data(byte[] pBuffer, int pOffset, int pLength) throws > IOException {} > + /** Called to indicate, that EOF has been seen on the underlying > stream. > + * This method may be called multiple times, if the reader keeps > invoking > + * either of the read methods, and they will consequently keep > returning > + * EOF. > + */ > + void finished() throws IOException {} > + /** Called to indicate, that the {@link ObservableInputStream} > has been closed. > + */ > + void closed() throws IOException {} > + /** > + * Called to indicate, that an error occurred on the underlying > stream. > + */ > + void error(IOException pException) throws IOException { throw > pException; } > + } > + > + private final List<Observer> observers = new ArrayList<Observer>(); > + > + public ObservableInputStream(InputStream pProxy) { > + super(pProxy); > + } > + > + public void add(Observer pObserver) { > + observers.add(pObserver); > + } > + > + public void remove(Observer pObserver) { > + observers.remove(pObserver); > + } > + > + public void removeAllObservers() { > + observers.clear(); > + } > + > + @Override > + public int read() throws IOException { > + int result = 0; > + IOException ioe = null; > + try { > + result = super.read(); > + } catch (IOException pException) { > + ioe = pException; > + } > + if (ioe != null) { > + noteError(ioe); > + } else if (result == -1) { > + noteFinished(); > + } else { > + noteDataByte(result); > + } > + return result; > + } > + > + @Override > + public int read(byte[] pBuffer) throws IOException { > + int result = 0; > + IOException ioe = null; > + try { > + result = super.read(pBuffer); > + } catch (IOException pException) { > + ioe = pException; > + } > + if (ioe != null) { > + noteError(ioe); > + } else if (result == -1) { > + noteFinished(); > + } else if (result > 0) { > + noteDataBytes(pBuffer, 0, result); > + } > + return result; > + } > + > + @Override > + public int read(byte[] pBuffer, int pOffset, int pLength) throws > IOException { > + int result = 0; > + IOException ioe = null; > + try { > + result = super.read(pBuffer, pOffset, pLength); > + } catch (IOException pException) { > + ioe = pException; > + } > + if (ioe != null) { > + noteError(ioe); > + } else if (result == -1) { > + noteFinished(); > + } else if (result > 0) { > + noteDataBytes(pBuffer, pOffset, result); > + } > + return result; > + } > + > + /** Notifies the observers by invoking {@link > Observer#data(byte[],int,int)} > + * with the given arguments. > + * @param pBuffer Passed to the observers. > + * @param pOffset Passed to the observers. > + * @param pLength Passed to the observers. > + * @throws IOException Some observer has thrown an exception, which > is being > + * passed down. > + */ > + protected void noteDataBytes(byte[] pBuffer, int pOffset, int > pLength) throws IOException { > + for (Observer observer : getObservers()) { > + observer.data(pBuffer, pOffset, pLength); > + } > + } > + > + /** Notifies the observers by invoking {@link Observer#finished()}. > + * @throws IOException Some observer has thrown an exception, which > is being > + * passed down. > + */ > + protected void noteFinished() throws IOException { > + for (Observer observer : getObservers()) { > + observer.finished(); > + } > + } > + > + /** Notifies the observers by invoking {@link Observer#data(int)} > + * with the given arguments. > + * @param pDataByte Passed to the observers. > + * @throws IOException Some observer has thrown an exception, which > is being > + * passed down. > + */ > + protected void noteDataByte(int pDataByte) throws IOException { > + for (Observer observer : getObservers()) { > + observer.data(pDataByte); > + } > + } > + > + /** Notifies the observers by invoking {@link > Observer#error(IOException)} > + * with the given argument. > + * @param pException Passed to the observers. > + * @throws IOException Some observer has thrown an exception, which > is being > + * passed down. This may be the same exception, which has been > passed as an > + * argument. > + */ > + protected void noteError(IOException pException) throws IOException { > + for (Observer observer : getObservers()) { > + observer.error(pException); > + } > + } > + > + /** Notifies the observers by invoking {@link Observer#finished()}. > + * @throws IOException Some observer has thrown an exception, which > is being > + * passed down. > + */ > + protected void noteClosed() throws IOException { > + for (Observer observer : getObservers()) { > + observer.closed(); > + } > + } > + > + protected List<Observer> getObservers() { > + return observers; > + } > + > + @Override > + public void close() throws IOException { > + IOException ioe = null; > + try { > + super.close(); > + } catch (IOException e) { > + ioe = e; > + } > + if (ioe == null) { > + noteClosed(); > + } else { > + noteError(ioe); > + } > + } > + > + /** Reads all data from the underlying {@link InputStream}, while > notifying the > + * observers. > + * @throws IOException The underlying {@link InputStream}, or either > of the > + * observers has thrown an exception. > + */ > + public void consume() throws IOException { > + final byte[] buffer = new byte[8192]; > + for (;;) { > + final int res = read(buffer); > + if (res == -1) { > + return; > + } > + } > + } > + > +} > > Propchange: > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/ObservableInputStream.java > > ------------------------------------------------------------------------------ > svn:mime-type = text/plain > > Modified: > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java > URL: > http://svn.apache.org/viewvc/commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java?rev=1750760&r1=1750759&r2=1750760&view=diff > > ============================================================================== > --- > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java > (original) > +++ > commons/proper/io/trunk/src/main/java/org/apache/commons/io/input/TeeInputStream.java > Thu Jun 30 09:04:21 2016 > @@ -35,6 +35,7 @@ import java.io.OutputStream; > * > * @version $Id$ > * @since 1.4 > + * @see ObservableInputStream > */ > public class TeeInputStream extends ProxyInputStream { > > > Added: > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java > URL: > http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java?rev=1750760&view=auto > > ============================================================================== > --- > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java > (added) > +++ > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java > Thu Jun 30 09:04:21 2016 > @@ -0,0 +1,48 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.commons.io.input; > + > +import static org.junit.Assert.*; > + > +import java.io.ByteArrayInputStream; > +import java.security.MessageDigest; > +import java.util.Random; > + > +import org.junit.Test; > + > +public class MessageDigestCalculatingInputStreamTest { > + public static byte[] generateRandomByteStream(int pSize) { > + final byte[] buffer = new byte[pSize]; > + final Random rnd = new Random(); > + rnd.nextBytes(buffer); > + return buffer; > + } > + > + @Test > + public void test() throws Exception { > + for (int i = 256; i < 8192; i = i*2) { > + final byte[] buffer = generateRandomByteStream(i); > + final MessageDigest md5Sum = MessageDigest.getInstance("MD5"); > + final byte[] expect = md5Sum.digest(buffer); > + final MessageDigestCalculatingInputStream md5InputStream = > new MessageDigestCalculatingInputStream(new ByteArrayInputStream(buffer)); > + md5InputStream.consume(); > + final byte[] got = md5InputStream.getMessageDigest().digest(); > + assertArrayEquals(expect, got); > + } > + } > + > +} > > Propchange: > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java > > ------------------------------------------------------------------------------ > svn:mime-type = text/plain > > Added: > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java > URL: > http://svn.apache.org/viewvc/commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java?rev=1750760&view=auto > > ============================================================================== > --- > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java > (added) > +++ > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java > Thu Jun 30 09:04:21 2016 > @@ -0,0 +1,134 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.commons.io.input; > + > +import static org.junit.Assert.*; > + > +import java.io.ByteArrayInputStream; > +import java.io.IOException; > + > +import org.apache.commons.io.input.ObservableInputStream; > +import org.apache.commons.io.input.ObservableInputStream.Observer; > +import org.junit.Test; > + > +public class ObservableInputStreamTest { > + private static class LastByteKeepingObserver extends Observer { > + private int lastByteSeen = -1; > + private boolean finished; > + private boolean closed; > + > + @Override > + void data(int pByte) throws IOException { > + super.data(pByte); > + lastByteSeen = pByte; > + } > + > + @Override > + void finished() throws IOException { > + super.finished(); > + finished = true; > + } > + > + @Override > + void closed() throws IOException { > + super.closed(); > + closed = true; > + } > + } > + private static class LastBytesKeepingObserver extends Observer { > + private byte[] buffer = null; > + private int offset = -1; > + private int length = -1; > + > + @Override > + void data(byte[] pBuffer, int pOffset, int pLength) throws > IOException { > + super.data(pBuffer, pOffset, pLength); > + buffer = pBuffer; > + offset = pOffset; > + length = pLength; > + } > + } > + > + /** Tests, that {@link Observer#data(int)} is called. > + */ > + @Test > + public void testDataByteCalled() throws Exception { > + final byte[] buffer = > MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096); > + final ObservableInputStream ois = new ObservableInputStream(new > ByteArrayInputStream(buffer)); > + final LastByteKeepingObserver lko = new LastByteKeepingObserver(); > + assertEquals(-1, lko.lastByteSeen); > + ois.read(); > + assertEquals(-1, lko.lastByteSeen); > + assertFalse(lko.finished); > + assertFalse(lko.closed); > + ois.add(lko); > + for (int i = 1; i < buffer.length; i++) { > + final int result = ois.read(); > + assertEquals((byte) result, buffer[i]); > + assertEquals(result, lko.lastByteSeen); > + assertFalse(lko.finished); > + assertFalse(lko.closed); > + } > + final int result = ois.read(); > + assertEquals(-1, result); > + assertTrue(lko.finished); > + assertFalse(lko.closed); > + ois.close(); > + assertTrue(lko.finished); > + assertTrue(lko.closed); > + } > + > + /** Tests, that {@link Observer#data(byte[],int,int)} is called. > + */ > + @Test > + public void testDataBytesCalled() throws Exception { > + final byte[] buffer = > MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096); > + ByteArrayInputStream bais = new ByteArrayInputStream(buffer); > + final ObservableInputStream ois = new ObservableInputStream(bais); > + final LastBytesKeepingObserver lko = new > LastBytesKeepingObserver(); > + final byte[] readBuffer = new byte[23]; > + assertEquals(null, lko.buffer); > + ois.read(readBuffer); > + assertEquals(null, lko.buffer); > + ois.add(lko); > + for (;;) { > + if (bais.available() >= 2048) { > + final int result = ois.read(readBuffer); > + if (result == -1) { > + ois.close(); > + break; > + } else { > + assertEquals(readBuffer, lko.buffer); > + assertEquals(0, lko.offset); > + assertEquals(readBuffer.length, lko.length); > + } > + } else { > + final int res = Math.min(11, bais.available()); > + final int result = ois.read(readBuffer, 1, 11); > + if (result == -1) { > + ois.close(); > + break; > + } else { > + assertEquals(readBuffer, lko.buffer); > + assertEquals(1, lko.offset); > + assertEquals(res, lko.length); > + } > + } > + } > + } > + > +} > > Propchange: > commons/proper/io/trunk/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java > > ------------------------------------------------------------------------------ > svn:mime-type = text/plain > > >