Author: krosenvold Date: Wed Jan 14 07:06:22 2015 New Revision: 1651575 URL: http://svn.apache.org/r1651575 Log: Added ZipArchiveEntryRequest class
ZipArchiveEntry is not thread safe, and the hand-off between the creating thread and the executorService actually doing the compression has been somewhat of a tightrope-walking effort, since we cannot reliably read fields off the ZipArchiveEntry Furthermore, to achieve true maximum IO performance in the gather-phase it would be required that Zip headers be created in the parallel part of the compression run, which was not possible prior to this commit. The ZipArchiveEntryRequest has clear and well-defined thread semantics and can cater for any future algorithmic improvements that may want to try to take performance to the very edge of what is achievable. To my understanding this will not be for this next relasease :) Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java Modified: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java Modified: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java?rev=1651575&r1=1651574&r2=1651575&view=diff ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java (original) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java Wed Jan 14 07:06:22 2015 @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.Atomi import java.util.zip.Deflater; import static java.util.Collections.synchronizedList; +import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; /** * Creates a zip in parallel by using multiple threadlocal #ScatterZipOutputStream instances. @@ -132,7 +133,7 @@ public class ParallelScatterZipCreator { * Submit a callable for compression * @param callable The callable to run */ - public void submit(Callable<Object> callable) { + public final void submit(Callable<Object> callable) { futures.add(es.submit(callable)); } @@ -149,20 +150,15 @@ public class ParallelScatterZipCreator { * @return A callable that will be used to check for errors */ - public Callable<Object> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { + public final Callable<Object> createCallable(ZipArchiveEntry zipArchiveEntry, InputStreamSupplier source) { final int method = zipArchiveEntry.getMethod(); if (method == ZipMethod.UNKNOWN_CODE) { throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry"); } + final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); return new Callable<Object>() { public Object call() throws Exception { - final ScatterZipOutputStream streamToUse = tlScatterStreams.get(); - InputStream payload = source.get(); - try { - streamToUse.addArchiveEntry(zipArchiveEntry, payload, method); - } finally { - payload.close(); - } + tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequest); return null; } }; Modified: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java?rev=1651575&r1=1651574&r2=1651575&view=diff ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java (original) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java Wed Jan 14 07:06:22 2015 @@ -49,25 +49,30 @@ public class ScatterZipOutputStream impl private final StreamCompressor streamCompressor; private static class CompressedEntry { - final ZipArchiveEntry entry; + final ZipArchiveEntryRequest zipArchiveEntryRequest; final long crc; final long compressedSize; - final int method; final long size; - public CompressedEntry(ZipArchiveEntry entry, long crc, long compressedSize, int method, long size) { - this.entry = entry; + public CompressedEntry(ZipArchiveEntryRequest zipArchiveEntryRequest, long crc, long compressedSize, long size) { + this.zipArchiveEntryRequest = zipArchiveEntryRequest; this.crc = crc; this.compressedSize = compressedSize; - this.method = method; this.size = size; } + /** + * Update the original ZipArchiveEntry witg sizes/crc + * Do not use this methods from threads that did not create the instance itself ! + * @return the zipeArchiveEntry that is basis for this request + */ + public ZipArchiveEntry transferToArchiveEntry(){ + ZipArchiveEntry entry = zipArchiveEntryRequest.getZipArchiveEntry(); entry.setCompressedSize(compressedSize); entry.setSize(size); entry.setCrc(crc); - entry.setMethod(method); + entry.setMethod(zipArchiveEntryRequest.getMethod()); return entry; } } @@ -81,16 +86,18 @@ public class ScatterZipOutputStream impl /** * Add an archive entry to this scatter stream. * - * @param zipArchiveEntry The entry to write - * @param payload The content to write for the entry. The caller is responsible for closing this. - * @param method The compression method + * @param zipArchiveEntryRequest The entry to write. * @throws IOException If writing fails */ - public void addArchiveEntry(ZipArchiveEntry zipArchiveEntry, InputStream payload, int method) throws IOException { - streamCompressor.deflate(payload, method); - items.add(new CompressedEntry(zipArchiveEntry, streamCompressor.getCrc32(), - streamCompressor.getBytesWrittenForLastEntry(), method, - streamCompressor.getBytesRead())); + public void addArchiveEntry(ZipArchiveEntryRequest zipArchiveEntryRequest) throws IOException { + final InputStream payloadStream = zipArchiveEntryRequest.getPayloadStream(); + try { + streamCompressor.deflate(payloadStream, zipArchiveEntryRequest.getMethod()); + } finally { + payloadStream.close(); + } + items.add(new CompressedEntry(zipArchiveEntryRequest, streamCompressor.getCrc32(), + streamCompressor.getBytesWrittenForLastEntry(), streamCompressor.getBytesRead())); } /** Added: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java?rev=1651575&view=auto ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java (added) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java Wed Jan 14 07:06:22 2015 @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.commons.compress.archivers.zip; + +import java.io.InputStream; + +/** + * A Thread-safe representation of a ZipArchiveEntry that is used to add entries to parallel archives. + */ +public class ZipArchiveEntryRequest { + /* + The zipArchiveEntry is not thread safe, and cannot be safely accessed by the getters of this class. + It is safely accessible during the construction part of this class and also after the + thread pools have been shut down. + */ + private final ZipArchiveEntry zipArchiveEntry; + private final InputStreamSupplier payloadSupplier; + private final int method; + + + private ZipArchiveEntryRequest(ZipArchiveEntry zipArchiveEntry, InputStreamSupplier payloadSupplier) { + // this constructor has "safe" access to all member variables on zipArchiveEntry + this.zipArchiveEntry = zipArchiveEntry; + this.payloadSupplier = payloadSupplier; + this.method = zipArchiveEntry.getMethod(); + } + + /** + * Create a ZipArchiveEntryRequest + * @param zipArchiveEntry The entry to use + * @param payloadSupplier The payload that will be added to the zip entry. + * @return The newly created request + */ + public static ZipArchiveEntryRequest createZipArchiveEntryRequest(ZipArchiveEntry zipArchiveEntry, InputStreamSupplier payloadSupplier) { + return new ZipArchiveEntryRequest(zipArchiveEntry, payloadSupplier); + } + + /** + * The paylaod that will be added to this zip entry + * @return The input stream. + */ + public InputStream getPayloadStream() { + return payloadSupplier.get(); + } + + /** + * The compression method to use + * @return The compression method to use + */ + public int getMethod(){ + return method; + } + + + /** + * Gets the underlying entry. Do not use this methods from threads that did not create the instance itself ! + * @return the zipeArchiveEntry that is basis for this request + */ + ZipArchiveEntry getZipArchiveEntry() { + return zipArchiveEntry; + } +} Modified: commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java?rev=1651575&r1=1651574&r2=1651575&view=diff ============================================================================== --- commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java (original) +++ commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java Wed Jan 14 07:06:22 2015 @@ -22,7 +22,9 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.InputStream; +import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -36,12 +38,14 @@ public class ScatterZipOutputStreamTest final byte[] A_PAYLOAD = "XAAY".getBytes(); ZipArchiveEntry zab = new ZipArchiveEntry("b.txt"); - ByteArrayInputStream payload = new ByteArrayInputStream(B_PAYLOAD); - scatterZipOutputStream.addArchiveEntry(zab, payload, ZipArchiveEntry.DEFLATED); + zab.setMethod(ZipArchiveEntry.DEFLATED); + final ByteArrayInputStream payload = new ByteArrayInputStream(B_PAYLOAD); + scatterZipOutputStream.addArchiveEntry(createZipArchiveEntryRequest(zab, createPayloadSupplier(payload))); ZipArchiveEntry zae = new ZipArchiveEntry("a.txt"); + zae.setMethod(ZipArchiveEntry.DEFLATED); ByteArrayInputStream payload1 = new ByteArrayInputStream(A_PAYLOAD); - scatterZipOutputStream.addArchiveEntry(zae, payload1, ZipArchiveEntry.DEFLATED); + scatterZipOutputStream.addArchiveEntry(createZipArchiveEntryRequest(zae, createPayloadSupplier(payload1))); File target = File.createTempFile("scattertest", ".zip"); ZipArchiveOutputStream outputStream = new ZipArchiveOutputStream(target); @@ -58,4 +62,12 @@ public class ScatterZipOutputStreamTest assertEquals(4, a_entry.getSize()); assertArrayEquals(A_PAYLOAD, IOUtils.toByteArray(zf.getInputStream(a_entry))); } + + private InputStreamSupplier createPayloadSupplier(final ByteArrayInputStream payload) { + return new InputStreamSupplier() { + public InputStream get() { + return payload; + } + }; + } } \ No newline at end of file