Author: krosenvold Date: Mon Dec 22 15:24:02 2014 New Revision: 1647329 URL: http://svn.apache.org/r1647329 Log: COMPRESS-296 Parallel compression. Added StreamCompressor and ScatterZipOutputStream.
StreamCompressor is an extract of the deflation algorithm from ZipArchiveOutputStream, which unfortunately was too conflated with writing a file in a particular structure. Using the actual zip file format as an intermediate format for scatter-streams turned out to be fairly inefficient. ScatterZipOuputStream is 2-3x faster than using a zip file as intermediate format. It would be possibly to refactor ZipArchiveOutputStream to use StreamCompressor, but there would be a slight break in backward compatibility regarding the protected writeOut method, which is moved to the streamCompressor class. Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/StreamCompressor.java commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/StreamCompressorTest.java Modified: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveOutputStream.java commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/ZipTestCase.java Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java?rev=1647329&view=auto ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java (added) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java Mon Dec 22 15:24:02 2014 @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.commons.compress.archivers.zip; + + +import org.apache.commons.compress.utils.BoundedInputStream; + +import java.io.*; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.zip.Deflater; + +/** + * A zip output stream that is optimized for multi-threaded scatter/gather construction of zip files. + * <p/> + * The internal data format of the entries used by this class are entirely private to this class + * and are not part of any public api whatsoever. + * <p/> + * It is possible to extend this class to support different kinds of backing storage, the default + * implementation only supports file-based backing. + * <p/> + * Thread safety: This class supports multiple threads. But the "writeTo" method must be called + * by the thread that originally created the ZipArchiveEntry. + * + * @since 1.10 + */ +public abstract class ScatterZipOutputStream { + private final Queue<CompressedEntry> items = new ConcurrentLinkedQueue<CompressedEntry>(); + + private static class CompressedEntry { + final ZipArchiveEntry entry; + final long crc; + final long compressedSize; + final int method; + final long size; + + public CompressedEntry(ZipArchiveEntry entry, long crc, long compressedSize, int method, long size) { + this.entry = entry; + this.crc = crc; + this.compressedSize = compressedSize; + this.method = method; + this.size = size; + } + + public ZipArchiveEntry transferToArchiveEntry(){ + entry.setCompressedSize(compressedSize); + entry.setSize(size); + entry.setCrc(crc); + entry.setMethod(method); + return entry; + } + } + + /** + * Add an archive entry to this scatter stream. + * + * @param zipArchiveEntry The entry to write + * @param payload The content to write for the entry + * @param method The compression method + * @throws IOException If writing fails + */ + public void addArchiveEntry(ZipArchiveEntry zipArchiveEntry, InputStream payload, int method) throws IOException { + StreamCompressor sc = getStreamCompressor(); + sc.deflate(payload, method); + payload.close(); + items.add(new CompressedEntry(zipArchiveEntry, sc.getCrc32(), sc.getBytesWritten(), method, sc.getBytesRead())); + } + + /** + * Write the contents of this scatter stream to a target archive. + * + * @param target The archive to receive the contents of this #ScatterZipOutputStream + * @throws IOException If writing fails + */ + public void writeTo(ZipArchiveOutputStream target) throws IOException { + closeBackingStorage(); + InputStream data = getInputStream(); + for (CompressedEntry compressedEntry : items) { + final BoundedInputStream rawStream = new BoundedInputStream(data, compressedEntry.compressedSize); + target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); + rawStream.close(); + } + data.close(); + } + + /** + * Returns a stream compressor that can be used to compress the data. + * <p/> + * This method is expected to return the same instance every time. + * + * @return The stream compressor + * @throws FileNotFoundException + */ + protected abstract StreamCompressor getStreamCompressor() throws FileNotFoundException; + + /** + * An input stream that contains the scattered payload + * + * @return An InputStream, should be closed by the caller of this method. + * @throws IOException when something fails + */ + protected abstract InputStream getInputStream() throws IOException; + + + /** + * Closes whatever storage is backing this scatter stream + */ + protected abstract void closeBackingStorage() throws IOException; + + /** + * Create a ScatterZipOutputStream with default compression level that is backed by a file + * + * @param file The file to offload compressed data into. + * @return A ScatterZipOutputStream that is ready for use. + * @throws FileNotFoundException + */ + public static ScatterZipOutputStream fileBased(File file) throws FileNotFoundException { + return fileBased(file, Deflater.DEFAULT_COMPRESSION); + } + + /** + * Create a ScatterZipOutputStream that is backed by a file + * + * @param file The file to offload compressed data into. + * @param compressionLevel The compression level to use, @see #Deflater + * @return A ScatterZipOutputStream that is ready for use. + * @throws FileNotFoundException + */ + public static ScatterZipOutputStream fileBased(File file, int compressionLevel) throws FileNotFoundException { + return new FileScatterOutputStream(file, compressionLevel); + } + + private static class FileScatterOutputStream extends ScatterZipOutputStream { + final File target; + private StreamCompressor streamDeflater; + final FileOutputStream os; + + FileScatterOutputStream(File target, int compressionLevel) throws FileNotFoundException { + this.target = target; + os = new FileOutputStream(target); + streamDeflater = StreamCompressor.create(compressionLevel, os); + } + + @Override + protected StreamCompressor getStreamCompressor() throws FileNotFoundException { + return streamDeflater; + } + + @Override + protected InputStream getInputStream() throws IOException { + return new FileInputStream(target); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + public void closeBackingStorage() throws IOException { + os.close(); + } + } +} Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/StreamCompressor.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/StreamCompressor.java?rev=1647329&view=auto ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/StreamCompressor.java (added) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/StreamCompressor.java Mon Dec 22 15:24:02 2014 @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.commons.compress.archivers.zip; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.CRC32; +import java.util.zip.Deflater; + +/** + * Encapsulates a Deflater and crc calculator, handling multiple types of output streams. + * Currently #ZipEntry.DEFLATED and #ZipEntry.STORED are the only supported compression methods. + * + * @since 1.10 + */ +public abstract class StreamCompressor { + + /* + * Apparently Deflater.setInput gets slowed down a lot on Sun JVMs + * when it gets handed a really big buffer. See + * https://issues.apache.org/bugzilla/show_bug.cgi?id=45396 + * + * Using a buffer size of 8 kB proved to be a good compromise + */ + private static final int DEFLATER_BLOCK_SIZE = 8192; + + private final Deflater def; + + private final CRC32 crc = new CRC32(); + + int writtenToOutputStream = 0; + int sourcePayloadLength = 0; + long actualCrc; + + private final int bufferSize = 4096; + private final byte[] outputBuffer = new byte[bufferSize]; + private final byte[] readerBuf = new byte[bufferSize]; + + protected StreamCompressor(Deflater deflater) { + this.def = deflater; + } + + /** + * Create a stream compressor with the given compression level. + * + * @param compressionLevel The #Deflater compression level + * @param os The #OutputStream stream to receive output + * @return A stream compressor + */ + public static StreamCompressor create(int compressionLevel, OutputStream os) { + final Deflater deflater = new Deflater(compressionLevel, true); + return new OutputStreamCompressor(deflater, os); + } + + /** + * Create a stream compressor with the default compression level. + * + * @param os The #OutputStream stream to receive output + * @return A stream compressor + */ + public static StreamCompressor create( OutputStream os) { + return create(Deflater.DEFAULT_COMPRESSION, os); + } + + /** + * Create a stream compressor with the given compression level. + * + * @param compressionLevel The #Deflater compression level + * @param os The #DataOutput to receive output + * @return A stream compressor + */ + public static StreamCompressor create(int compressionLevel, DataOutput os) { + final Deflater deflater = new Deflater(compressionLevel, true); + return new DataOutputCompressor(deflater, os); + } + + /** + * The crc32 of the last deflated file + * @return the crc32 + */ + + public long getCrc32() { + return actualCrc; + } + + /** + * Return the number of bytes read from the source stream + * @return The number of bytes read, never negative + */ + public int getBytesRead() { + return sourcePayloadLength; + } + + /** + * The number of bytes written to the output + * @return The number of bytes, never negative + */ + public int getBytesWritten() { + return writtenToOutputStream; + } + + /** + * Deflate the given source using the supplied compression method + * @param source The source to compress + * @param method The #ZipArchiveEntry compression method + * @throws IOException When failures happen + */ + + public void deflate(InputStream source, int method) throws IOException { + reset(); + int length; + + while(( length = source.read(readerBuf, 0, readerBuf.length)) >= 0){ + crc.update(readerBuf, 0, length); + if (method == ZipArchiveEntry.DEFLATED) { + writeDeflated(readerBuf, 0, length); + } else { + writeOut(readerBuf, 0, length); + writtenToOutputStream += length; + } + sourcePayloadLength += length; + } + if (method == ZipArchiveEntry.DEFLATED) { + flushDeflater(); + } + actualCrc = crc.getValue(); + + + } + + private void reset(){ + crc.reset(); + def.reset(); + sourcePayloadLength = 0; + writtenToOutputStream = 0; + } + + private void flushDeflater() throws IOException { + def.finish(); + while (!def.finished()) { + deflate(); + } + } + + private void writeDeflated(byte[]b, int offset, int length) + throws IOException { + if (length > 0 && !def.finished()) { + if (length <= DEFLATER_BLOCK_SIZE) { + def.setInput(b, offset, length); + deflateUntilInputIsNeeded(); + } else { + final int fullblocks = length / DEFLATER_BLOCK_SIZE; + for (int i = 0; i < fullblocks; i++) { + def.setInput(b, offset + i * DEFLATER_BLOCK_SIZE, + DEFLATER_BLOCK_SIZE); + deflateUntilInputIsNeeded(); + } + final int done = fullblocks * DEFLATER_BLOCK_SIZE; + if (done < length) { + def.setInput(b, offset + done, length - done); + deflateUntilInputIsNeeded(); + } + } + } + } + + private void deflateUntilInputIsNeeded() throws IOException { + while (!def.needsInput()) { + deflate(); + } + } + + private void deflate() throws IOException { + int len = def.deflate(outputBuffer, 0, outputBuffer.length); + if (len > 0) { + writeOut(outputBuffer, 0, len); + writtenToOutputStream += len; + } + } + + protected abstract void writeOut(byte[] data, int offset, int length) throws IOException ; + + private static final class OutputStreamCompressor extends StreamCompressor { + private final OutputStream os; + + public OutputStreamCompressor(Deflater deflater, OutputStream os) { + super(deflater); + this.os = os; + } + + protected final void writeOut(byte[] data, int offset, int length) + throws IOException { + os.write(data, offset, length); + } + } + + private static final class DataOutputCompressor extends StreamCompressor { + private final DataOutput raf; + public DataOutputCompressor(Deflater deflater, DataOutput raf) { + super(deflater); + this.raf = raf; + } + + protected final void writeOut(byte[] data, int offset, int length) + throws IOException { + raf.write(data, offset, length); + } + } +} Modified: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveOutputStream.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveOutputStream.java?rev=1647329&r1=1647328&r2=1647329&view=diff ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveOutputStream.java (original) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveOutputStream.java Mon Dec 22 15:24:02 2014 @@ -505,24 +505,6 @@ public class ZipArchiveOutputStream exte } /** - * Make a copy of this stream with all its settings, but point to a new file. - * Used for scatter/gather operations to make several streams from a user-supplied master. - * - * @param newFile The file to use for the copy of this stream - * @return A copy of this stream - */ - public ZipArchiveOutputStream cloneWith(File newFile) throws IOException { - ZipArchiveOutputStream zos = new ZipArchiveOutputStream(newFile); - zos.setCreateUnicodeExtraFields(createUnicodeExtraFields); - zos.setMethod(method); - zos.setEncoding(encoding); - zos.setFallbackToUTF8(fallbackToUTF8); - zos.setUseLanguageEncodingFlag(useUTF8Flag); - zos.setUseZip64(zip64Mode); - return zos; - } - - /** * Ensures all bytes sent to the deflater are written to the stream. */ private void flushDeflater() throws IOException { Modified: commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/ZipTestCase.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/ZipTestCase.java?rev=1647329&r1=1647328&r2=1647329&view=diff ============================================================================== --- commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/ZipTestCase.java (original) +++ commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/ZipTestCase.java Mon Dec 22 15:24:02 2014 @@ -293,19 +293,6 @@ public final class ZipTestCase extends A } }; - - public void testCloneZipOutputStream( ) throws IOException { - File tempDir = createTempDir(); - File fred = new File(tempDir, "fred"); - ZipArchiveOutputStream zipArchiveOutputStream = new ZipArchiveOutputStream(fred); - File frank = new File(tempDir, "frank"); - ZipArchiveOutputStream actual = zipArchiveOutputStream.cloneWith(frank); - zipArchiveOutputStream.close(); - actual.close(); - assertTrue( fred.exists()); - assertTrue( frank.exists()); - } - public void testCopyRawEntriesFromFile () throws IOException { Added: commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java?rev=1647329&view=auto ============================================================================== --- commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java (added) +++ commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java Mon Dec 22 15:24:02 2014 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.commons.compress.archivers.zip; + +import org.apache.commons.compress.utils.IOUtils; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class ScatterZipOutputStreamTest { + + @Test + public void putArchiveEntry() throws Exception { + File scatteFile = File.createTempFile("scattertest", ".notzip"); + ScatterZipOutputStream scatterZipOutputStream = ScatterZipOutputStream.fileBased(scatteFile); + final byte[] B_PAYLOAD = "RBBBBBBS".getBytes(); + final byte[] A_PAYLOAD = "XAAY".getBytes(); + + ZipArchiveEntry zab = new ZipArchiveEntry("b.txt"); + scatterZipOutputStream.addArchiveEntry(zab, new ByteArrayInputStream(B_PAYLOAD), ZipArchiveEntry.DEFLATED); + + ZipArchiveEntry zae = new ZipArchiveEntry("a.txt"); + scatterZipOutputStream.addArchiveEntry(zae, new ByteArrayInputStream(A_PAYLOAD), ZipArchiveEntry.DEFLATED); + + File target = File.createTempFile("scattertest", ".zip"); + ZipArchiveOutputStream outputStream = new ZipArchiveOutputStream(target); + scatterZipOutputStream.writeTo( outputStream); + outputStream.close(); + + ZipFile zf = new ZipFile(target); + final ZipArchiveEntry b_entry = zf.getEntries("b.txt").iterator().next(); + assertEquals(8, b_entry.getSize()); + assertArrayEquals(B_PAYLOAD, IOUtils.toByteArray(zf.getInputStream(b_entry))); + + final ZipArchiveEntry a_entry = zf.getEntries("a.txt").iterator().next(); + assertEquals(4, a_entry.getSize()); + assertArrayEquals(A_PAYLOAD, IOUtils.toByteArray(zf.getInputStream(a_entry))); + } +} \ No newline at end of file Added: commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/StreamCompressorTest.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/StreamCompressorTest.java?rev=1647329&view=auto ============================================================================== --- commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/StreamCompressorTest.java (added) +++ commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/StreamCompressorTest.java Mon Dec 22 15:24:02 2014 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.commons.compress.archivers.zip; + +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.zip.ZipEntry; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class StreamCompressorTest { + + @Test + public void storedEntries() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + StreamCompressor sc = StreamCompressor.create( baos); + sc.deflate(new ByteArrayInputStream("A".getBytes()), ZipEntry.STORED); + sc.deflate(new ByteArrayInputStream("BAD".getBytes()), ZipEntry.STORED); + assertEquals(3, sc.getBytesRead()); + assertEquals(3, sc.getBytesWritten()); + assertEquals(344750961, sc.getCrc32()); + sc.deflate(new ByteArrayInputStream("CAFE".getBytes()), ZipEntry.STORED); + assertEquals("ABADCAFE", baos.toString()); + } + + @Test + public void deflatedEntries() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + StreamCompressor sc = StreamCompressor.create( baos); + sc.deflate(new ByteArrayInputStream("AAAAAABBBBBB".getBytes()), ZipEntry.DEFLATED); + assertEquals(12, sc.getBytesRead()); + assertEquals(8, sc.getBytesWritten()); + assertEquals(3299542, sc.getCrc32()); + + final byte[] actuals = baos.toByteArray(); + byte[] expected = new byte[]{115,116,4,1,39,48,0,0}; + // Note that this test really asserts stuff about the java Deflater, which might be a little bit brittle + assertArrayEquals(expected, actuals); + } +} \ No newline at end of file