Repository: commons-compress Updated Branches: refs/heads/COMPRESS-207 [created] 3cddfaaff
Proposed change for implementing COMPRESS-207 This is an incomplete proposal since on reading bzip is currently firing any events. Writing doesn't even count bytes as of Compress 1.11. Project: http://git-wip-us.apache.org/repos/asf/commons-compress/repo Commit: http://git-wip-us.apache.org/repos/asf/commons-compress/commit/3cddfaaf Tree: http://git-wip-us.apache.org/repos/asf/commons-compress/tree/3cddfaaf Diff: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/3cddfaaf Branch: refs/heads/COMPRESS-207 Commit: 3cddfaaff0ea4fbacc6aceef941efa7ca681c84f Parents: bee2612 Author: Stefan Bodewig <bode...@apache.org> Authored: Sun Mar 27 18:22:56 2016 +0200 Committer: Stefan Bodewig <bode...@apache.org> Committed: Sun Mar 27 18:22:56 2016 +0200 ---------------------------------------------------------------------- .../compressors/CompressionProgressEvent.java | 78 +++++++++++++++++++ .../CompressionProgressListener.java | 34 ++++++++ .../compressors/CompressorInputStream.java | 38 +++++++++ .../compressors/CompressorOutputStream.java | 58 +++++++++++++- .../bzip2/BZip2CompressorInputStream.java | 9 ++- .../compress/compressors/BZip2TestCase.java | 37 +++++++++ src/test/resources/COMPRESS-207.bz2 | Bin 0 -> 1883486 bytes 7 files changed, 251 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/commons-compress/blob/3cddfaaf/src/main/java/org/apache/commons/compress/compressors/CompressionProgressEvent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/compressors/CompressionProgressEvent.java b/src/main/java/org/apache/commons/compress/compressors/CompressionProgressEvent.java new file mode 100644 index 0000000..122d1ca --- /dev/null +++ b/src/main/java/org/apache/commons/compress/compressors/CompressionProgressEvent.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.commons.compress.compressors; + +import java.util.EventObject; + +/** + * Notification of (de)compression progress. + * @Immutable + */ +public class CompressionProgressEvent extends EventObject { + + private final int blockNumber, streamNumber; + private final long bytesProcessed; + + /** + * Creates a new event. + * + * @param source the stream creating the event + * @param blockNumber number of the block that is getting processed now + * @param streamNumer number of the stream that is getting + * processed now + * @param bytesProcessed number of bytes read or written when the event is triggered + */ + public CompressionProgressEvent(Object source, int blockNumber, int streamNumber, + long bytesProcessed) { + super(source); + this.blockNumber = blockNumber; + this.streamNumber = streamNumber; + this.bytesProcessed = bytesProcessed; + } + + /** + * The current block number. + * + * <p>Will always be 0 if the stream doesn't use blocks.</p> + * + * @return number of the block that is getting processed now + */ + public int getBlockNumber() { + return blockNumber; + } + + /** + * The current stream number. + * + * <p>Will always be 0 unless concatenated streams are used.</p> + * + * @return number of the stream that is getting processed now + */ + public int getStreamNumber() { + return streamNumber; + } + + /** + * The number of bytes processed so far. + * @return number of bytes read or written when the event is triggered + */ + public long getBytesProcessed() { + return bytesProcessed; + } +} http://git-wip-us.apache.org/repos/asf/commons-compress/blob/3cddfaaf/src/main/java/org/apache/commons/compress/compressors/CompressionProgressListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/compressors/CompressionProgressListener.java b/src/main/java/org/apache/commons/compress/compressors/CompressionProgressListener.java new file mode 100644 index 0000000..22396aa --- /dev/null +++ b/src/main/java/org/apache/commons/compress/compressors/CompressionProgressListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.commons.compress.compressors; + +/** + * Is notified of (de)compression progress. + * + * <p>Not all stream implementations support tracking of + * progress. Those that don't simply never invoke the {@link #notify} + * method.</p> + */ +public interface CompressionProgressListener { + /** + * Is notified of (de)compression progress. + * @param event encapsulates the progress + */ + void notify(CompressionProgressEvent event); +} http://git-wip-us.apache.org/repos/asf/commons-compress/blob/3cddfaaf/src/main/java/org/apache/commons/compress/compressors/CompressorInputStream.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/compressors/CompressorInputStream.java b/src/main/java/org/apache/commons/compress/compressors/CompressorInputStream.java index 52b161b..45ceafe 100644 --- a/src/main/java/org/apache/commons/compress/compressors/CompressorInputStream.java +++ b/src/main/java/org/apache/commons/compress/compressors/CompressorInputStream.java @@ -19,9 +19,13 @@ package org.apache.commons.compress.compressors; import java.io.InputStream; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; public abstract class CompressorInputStream extends InputStream { private long bytesRead = 0; + private final List<CompressionProgressListener> listeners = + new CopyOnWriteArrayList<CompressionProgressListener>(); /** * Increments the counter of already read bytes. @@ -77,4 +81,38 @@ public abstract class CompressorInputStream extends InputStream { public long getBytesRead() { return bytesRead; } + + /** + * Adds a listener that is notified of decompression progress. + * + * <p>Not all streams support progress notifications.</p> + * + * @param l the listener to add + */ + public void addCompressionProgressListener(CompressionProgressListener l) { + listeners.add(l); + } + + /** + * Removes a listener that is notified of decompression progress. + * + * @param l the listener to remove + */ + public void removeCompressionProgressListener(CompressionProgressListener l) { + listeners.remove(l); + } + + /** + * Notifies all listeners of progress. + * + * @param blockNumber number of the block that is getting processed now + * @param streamNumer number of the stream that is getting + * processed now + */ + protected void fireProgress(int blockNumber, int streamNumber) { + CompressionProgressEvent e = new CompressionProgressEvent(this, blockNumber, streamNumber, getBytesRead()); + for (CompressionProgressListener l : listeners) { + l.notify(e); + } + } } http://git-wip-us.apache.org/repos/asf/commons-compress/blob/3cddfaaf/src/main/java/org/apache/commons/compress/compressors/CompressorOutputStream.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/compressors/CompressorOutputStream.java b/src/main/java/org/apache/commons/compress/compressors/CompressorOutputStream.java index 51eee9c..b0f7bf9 100644 --- a/src/main/java/org/apache/commons/compress/compressors/CompressorOutputStream.java +++ b/src/main/java/org/apache/commons/compress/compressors/CompressorOutputStream.java @@ -19,7 +19,63 @@ package org.apache.commons.compress.compressors; import java.io.OutputStream; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; public abstract class CompressorOutputStream extends OutputStream { - // TODO + + private long bytesWritten = 0; + private final List<CompressionProgressListener> listeners = + new CopyOnWriteArrayList<CompressionProgressListener>(); + + /** + * Adds a listener that is notified of compression progress. + * + * <p>Not all streams support progress notifications.</p> + * + * @param l the listener to add + */ + public void addCompressionProgressListener(CompressionProgressListener l) { + listeners.add(l); + } + + /** + * Removes a listener that is notified of compression progress. + * + * @param l the listener to remove + */ + public void removeCompressionProgressListener(CompressionProgressListener l) { + listeners.remove(l); + } + + /** + * Notifies all listeners of progress. + * + * @param blockNumber number of the block that is getting processed now + * @param streamNumer number of the stream that is getting + * processed now + */ + protected void fireProgress(int blockNumber, int streamNumber) { + CompressionProgressEvent e = new CompressionProgressEvent(this, blockNumber, streamNumber, getBytesWritten()); + for (CompressionProgressListener l : listeners) { + l.notify(e); + } + } + + /** + * Increments the counter of already written bytes. + * + * @param written the number of bytes written + */ + protected void count(long written) { + bytesWritten += written; + } + + /** + * Returns the current number of bytes written from this stream. + * @return the number of written bytes + */ + public long getBytesWritten() { + return bytesWritten; + } } http://git-wip-us.apache.org/repos/asf/commons-compress/blob/3cddfaaf/src/main/java/org/apache/commons/compress/compressors/bzip2/BZip2CompressorInputStream.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/compressors/bzip2/BZip2CompressorInputStream.java b/src/main/java/org/apache/commons/compress/compressors/bzip2/BZip2CompressorInputStream.java index 4456860..bcfe577 100644 --- a/src/main/java/org/apache/commons/compress/compressors/bzip2/BZip2CompressorInputStream.java +++ b/src/main/java/org/apache/commons/compress/compressors/bzip2/BZip2CompressorInputStream.java @@ -90,6 +90,8 @@ public class BZip2CompressorInputStream extends CompressorInputStream implements private int su_tPos; private char su_z; + private int currentBlock, currentStream; + /** * All memory intensive stuff. This field is initialized by initBlock(). */ @@ -175,10 +177,10 @@ public class BZip2CompressorInputStream extends CompressorInputStream implements int b; while (destOffs < hi && ((b = read0()) >= 0)) { dest[destOffs++] = (byte) b; - count(1); } int c = (destOffs == offs) ? -1 : (destOffs - offs); + count(c); return c; } @@ -311,7 +313,7 @@ public class BZip2CompressorInputStream extends CompressorInputStream implements this.data = new Data(this.blockSize100k); } - // currBlockNo++; + fireProgress(currentBlock++, currentStream); getAndMoveToFrontDecode(); this.crc.initialiseCRC(); @@ -347,6 +349,9 @@ public class BZip2CompressorInputStream extends CompressorInputStream implements throw new IOException("BZip2 CRC error"); } + currentStream++; + currentBlock = 0; + // Look for the next .bz2 stream if decompressing // concatenated files. return !decompressConcatenated || !init(false); http://git-wip-us.apache.org/repos/asf/commons-compress/blob/3cddfaaf/src/test/java/org/apache/commons/compress/compressors/BZip2TestCase.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/commons/compress/compressors/BZip2TestCase.java b/src/test/java/org/apache/commons/compress/compressors/BZip2TestCase.java index 5132166..e736125 100644 --- a/src/test/java/org/apache/commons/compress/compressors/BZip2TestCase.java +++ b/src/test/java/org/apache/commons/compress/compressors/BZip2TestCase.java @@ -25,6 +25,8 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.compress.AbstractTestCase; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; @@ -133,4 +135,39 @@ public final class BZip2TestCase extends AbstractTestCase { } } + @Test + public void testCOMPRESS207Listeners() throws Exception { + File inputFile = getFile("COMPRESS-207.bz2"); + FileInputStream fInputStream = new FileInputStream(inputFile); + final List<Integer> blockNumbers = new ArrayList<Integer>(); + final List<Long> readPositions = new ArrayList<Long>(); + final BZip2CompressorInputStream in = new BZip2CompressorInputStream(fInputStream); + + CompressionProgressListener blockListener = new CompressionProgressListener() { + + public void notify(CompressionProgressEvent e) { + assertSame(in, e.getSource()); + blockNumbers.add(e.getBlockNumber()); + readPositions.add(e.getBytesProcessed()); + } + }; + in.addCompressionProgressListener(blockListener); + + while(in.read() >= 0); + in.close(); + + // we miss the initial block event which is triggered by the constructor + assertEquals(4, blockNumbers.size()); + System.err.println(blockNumbers); + for (int i = 0; i < 4; i++) { + assertEquals(i + 1, blockNumbers.get(i).intValue()); + } + + assertEquals(4, readPositions.size()); + assertEquals(Long.valueOf(899907), readPositions.get(0)); + assertEquals(Long.valueOf(1799817), readPositions.get(1)); + assertEquals(Long.valueOf(2699710), readPositions.get(2)); + assertEquals(Long.valueOf(3599604), readPositions.get(3)); + } + } http://git-wip-us.apache.org/repos/asf/commons-compress/blob/3cddfaaf/src/test/resources/COMPRESS-207.bz2 ---------------------------------------------------------------------- diff --git a/src/test/resources/COMPRESS-207.bz2 b/src/test/resources/COMPRESS-207.bz2 new file mode 100644 index 0000000..7a60553 Binary files /dev/null and b/src/test/resources/COMPRESS-207.bz2 differ