This is an automated email from the ASF dual-hosted git repository. bodewig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-compress.git
The following commit(s) were added to refs/heads/master by this push: new 38342b8 COMPRESS-485 keep zip entries order in parallel zip creation 38342b8 is described below commit 38342b8446e9c03ee758511c309f98e2a85ff599 Author: Hervé Boutemy <hbout...@apache.org> AuthorDate: Mon May 6 08:22:38 2019 +0200 COMPRESS-485 keep zip entries order in parallel zip creation this will ease Reproducible Builds when creating zip or jar archives thanks to Arnaud Nauwynck for the great help --- .../archivers/zip/ParallelScatterZipCreator.java | 39 +++++++++++--------- .../archivers/zip/ScatterZipOutputStream.java | 42 ++++++++++++++++++++++ src/site/xdoc/zip.xml | 19 +++++----- .../zip/ParallelScatterZipCreatorTest.java | 5 ++- 4 files changed, 80 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java index 820cd58..c5010c0 100644 --- a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java +++ b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java @@ -41,9 +41,9 @@ import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.c /** * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances. * <p> - * Note that this class generally makes no guarantees about the order of things written to - * the output file. Things that need to come in a specific order (manifests, directories) - * must be handled by the client of this class, usually by writing these things to the + * Note that until 1.18, this class generally made no guarantees about the order of things written to + * the output file. Things that needed to come in a specific order (manifests, directories) + * had to be handled by the client of this class, usually by writing these things to the * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p> * <p> * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of @@ -55,7 +55,7 @@ public class ParallelScatterZipCreator { private final List<ScatterZipOutputStream> streams = synchronizedList(new ArrayList<ScatterZipOutputStream>()); private final ExecutorService es; private final ScatterGatherBackingStoreSupplier backingStoreSupplier; - private final List<Future<Object>> futures = new ArrayList<>(); + private final List<Future<ScatterZipOutputStream>> futures = new ArrayList<>(); private final long startedAt = System.currentTimeMillis(); private long compressionDoneAt = 0; @@ -157,7 +157,7 @@ public class ParallelScatterZipCreator { * * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller. */ - public final void submit(final Callable<Object> callable) { + public final void submit(final Callable<ScatterZipOutputStream> callable) { futures.add(es.submit(callable)); } @@ -179,17 +179,18 @@ public class ParallelScatterZipCreator { * will be propagated through the callable. */ - public final Callable<Object> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { + public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { final int method = zipArchiveEntry.getMethod(); if (method == ZipMethod.UNKNOWN_CODE) { throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry); } final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source); - return new Callable<Object>() { + return new Callable<ScatterZipOutputStream>() { @Override - public Object call() throws Exception { - tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequest); - return null; + public ScatterZipOutputStream call() throws Exception { + ScatterZipOutputStream scatterStream = tlScatterStreams.get(); + scatterStream.addArchiveEntry(zipArchiveEntryRequest); + return scatterStream; } }; } @@ -210,12 +211,13 @@ public class ParallelScatterZipCreator { * will be propagated through the callable. * @since 1.13 */ - public final Callable<Object> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { - return new Callable<Object>() { + public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) { + return new Callable<ScatterZipOutputStream>() { @Override - public Object call() throws Exception { - tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequestSupplier.get()); - return null; + public ScatterZipOutputStream call() throws Exception { + ScatterZipOutputStream scatterStream = tlScatterStreams.get(); + scatterStream.addArchiveEntry(zipArchiveEntryRequestSupplier.get()); + return scatterStream; } }; } @@ -255,8 +257,13 @@ public class ParallelScatterZipCreator { compressionDoneAt = System.currentTimeMillis(); synchronized (streams) { + // write zip entries in the order they were added (kept as futures) + for (final Future<ScatterZipOutputStream> future : futures) { + ScatterZipOutputStream scatterStream = future.get(); + scatterStream.zipEntryWriter().writeNextZipEntry(targetStream); + } + for (final ScatterZipOutputStream scatterStream : streams) { - scatterStream.writeTo(targetStream); scatterStream.close(); } } diff --git a/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java b/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java index 006c531..deed65e 100644 --- a/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java +++ b/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +52,7 @@ public class ScatterZipOutputStream implements Closeable { private final ScatterGatherBackingStore backingStore; private final StreamCompressor streamCompressor; private AtomicBoolean isClosed = new AtomicBoolean(); + private ZipEntryWriter zipEntryWriter = null; private static class CompressedEntry { final ZipArchiveEntryRequest zipArchiveEntryRequest; @@ -106,6 +108,7 @@ public class ScatterZipOutputStream implements Closeable { * * @param target The archive to receive the contents of this {@link ScatterZipOutputStream}. * @throws IOException If writing fails + * @see #zipEntryWriter() */ public void writeTo(final ZipArchiveOutputStream target) throws IOException { backingStore.closeForWriting(); @@ -119,6 +122,42 @@ public class ScatterZipOutputStream implements Closeable { } } + public static class ZipEntryWriter implements Closeable { + private final Iterator<CompressedEntry> itemsIterator; + private final InputStream itemsIteratorData; + + public ZipEntryWriter(ScatterZipOutputStream scatter) throws IOException { + scatter.backingStore.closeForWriting(); + itemsIterator = scatter.items.iterator(); + itemsIteratorData = scatter.backingStore.getInputStream(); + } + + @Override + public void close() throws IOException { + if (itemsIteratorData != null) { + itemsIteratorData.close(); + } + } + + public void writeNextZipEntry(final ZipArchiveOutputStream target) throws IOException { + CompressedEntry compressedEntry = itemsIterator.next(); + try (final BoundedInputStream rawStream = new BoundedInputStream(itemsIteratorData, compressedEntry.compressedSize)) { + target.addRawArchiveEntry(compressedEntry.transferToArchiveEntry(), rawStream); + } + } + } + + /** + * Get a zip entry writer for this scatter stream. + * @throws IOException If getting scatter stream input stream + * @return the ZipEntryWriter created on first call of the method + */ + public ZipEntryWriter zipEntryWriter() throws IOException { + if (zipEntryWriter == null) { + zipEntryWriter = new ZipEntryWriter(this); + } + return zipEntryWriter; + } /** * Closes this stream, freeing all resources involved in the creation of this stream. @@ -130,6 +169,9 @@ public class ScatterZipOutputStream implements Closeable { return; } try { + if (zipEntryWriter != null) { + zipEntryWriter.close(); + } backingStore.close(); } finally { streamCompressor.close(); diff --git a/src/site/xdoc/zip.xml b/src/site/xdoc/zip.xml index 59ff5dc..1629ef5 100644 --- a/src/site/xdoc/zip.xml +++ b/src/site/xdoc/zip.xml @@ -551,14 +551,17 @@ <p>To assist this process, clients can use <code>ParallelScatterZipCreator</code> that will handle threads pools and correct memory model consistency so the client - can avoid these issues. Please note that when writing well-formed - Zip files this way, it is usually necessary to keep a - separate <code>ScatterZipOutputStream</code> that receives all directories - and writes this to the target <code>ZipArchiveOutputStream</code> before - the ones created through <code>ParallelScatterZipCreator</code>. This is the responsibility of the client.</p> - - <p>There is no guarantee of order of the entries when writing a Zip - file with <code>ParallelScatterZipCreator</code>.</p> + can avoid these issues.</p> + + <p>Until version 1.18, there was no guarantee of order of the entries when writing a Zip + file with <code>ParallelScatterZipCreator</code>. In consequence, when writing well-formed + Zip files this way, it was usually necessary to keep a + separate <code>ScatterZipOutputStream</code> that received all directories + and wrote this to the target <code>ZipArchiveOutputStream</code> before + the ones created through <code>ParallelScatterZipCreator</code>. This was the responsibility of the client.</p> + + <p>Starting with version 1.19, entries order is kept, then this specific handling of directories is not + necessary any more.</p> See the examples section for a code sample demonstrating how to make a zip file. </subsection> diff --git a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java index 19a8fe9..7a46ede 100644 --- a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java +++ b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java @@ -98,12 +98,15 @@ public class ParallelScatterZipCreatorTest { private void removeEntriesFoundInZipFile(final File result, final Map<String, byte[]> entries) throws IOException { final ZipFile zf = new ZipFile(result); final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder(); + int i = 0; while (entriesInPhysicalOrder.hasMoreElements()){ final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement(); final InputStream inputStream = zf.getInputStream(zipArchiveEntry); final byte[] actual = IOUtils.toByteArray(inputStream); final byte[] expected = entries.remove(zipArchiveEntry.getName()); assertArrayEquals( "For " + zipArchiveEntry.getName(), expected, actual); + // check order of zip entries vs order of order of addition to the parallel zip creator + assertEquals( "For " + zipArchiveEntry.getName(), "file" + i++, zipArchiveEntry.getName()); } zf.close(); } @@ -145,7 +148,7 @@ public class ParallelScatterZipCreatorTest { return new ByteArrayInputStream(payloadBytes); } }; - final Callable<Object> callable; + final Callable<ScatterZipOutputStream> callable; if (i % 2 == 0) { callable = zipCreator.createCallable(za, iss); } else {