Author: krosenvold
Date: Wed Jan 14 07:06:22 2015
New Revision: 1651575

URL: http://svn.apache.org/r1651575
Log:
Added ZipArchiveEntryRequest class

ZipArchiveEntry is not thread safe, and the hand-off between the creating thread
and the executorService actually doing the compression has been somewhat of a
tightrope-walking effort, since we cannot reliably read fields off the 
ZipArchiveEntry

Furthermore, to achieve true maximum IO performance in the gather-phase it would
be required that Zip headers be created in the parallel part of the compression 
run,
which was not possible prior to this commit.

The ZipArchiveEntryRequest has clear and well-defined thread semantics and can 
cater
for any future algorithmic improvements that may want to try to take 
performance to the
very edge of what is achievable. To my understanding this will not be for this 
next
relasease :)

Added:
    
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java
Modified:
    
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
    
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java
    
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java

Modified: 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java?rev=1651575&r1=1651574&r2=1651575&view=diff
==============================================================================
--- 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
 (original)
+++ 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
 Wed Jan 14 07:06:22 2015
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.zip.Deflater;
 
 import static java.util.Collections.synchronizedList;
+import static 
org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
 
 /**
  * Creates a zip in parallel by using multiple threadlocal 
#ScatterZipOutputStream instances.
@@ -132,7 +133,7 @@ public class ParallelScatterZipCreator {
      * Submit a callable for compression
      * @param callable The callable to run
      */
-    public void submit(Callable<Object> callable) {
+    public final void submit(Callable<Object> callable) {
         futures.add(es.submit(callable));
     }
 
@@ -149,20 +150,15 @@ public class ParallelScatterZipCreator {
      * @return   A callable that will be used to check for errors
      */
 
-    public Callable<Object> createCallable(final ZipArchiveEntry 
zipArchiveEntry, final InputStreamSupplier source) {
+    public final Callable<Object> createCallable(ZipArchiveEntry 
zipArchiveEntry, InputStreamSupplier source) {
         final int method = zipArchiveEntry.getMethod();
         if (method == ZipMethod.UNKNOWN_CODE) {
             throw new IllegalArgumentException("Method must be set on the 
supplied zipArchiveEntry");
         }
+        final ZipArchiveEntryRequest zipArchiveEntryRequest = 
createZipArchiveEntryRequest(zipArchiveEntry, source);
         return new Callable<Object>() {
             public Object call() throws Exception {
-                final ScatterZipOutputStream streamToUse = 
tlScatterStreams.get();
-                InputStream payload = source.get();
-                try {
-                    streamToUse.addArchiveEntry(zipArchiveEntry, payload, 
method);
-                } finally {
-                    payload.close();
-                }
+                tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequest);
                 return null;
             }
         };

Modified: 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java?rev=1651575&r1=1651574&r2=1651575&view=diff
==============================================================================
--- 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java
 (original)
+++ 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStream.java
 Wed Jan 14 07:06:22 2015
@@ -49,25 +49,30 @@ public class ScatterZipOutputStream impl
     private final StreamCompressor streamCompressor;
 
     private static class CompressedEntry {
-        final ZipArchiveEntry entry;
+        final ZipArchiveEntryRequest zipArchiveEntryRequest;
         final long crc;
         final long compressedSize;
-        final int method;
         final long size;
 
-        public CompressedEntry(ZipArchiveEntry entry, long crc, long 
compressedSize, int method, long size) {
-            this.entry = entry;
+        public CompressedEntry(ZipArchiveEntryRequest zipArchiveEntryRequest, 
long crc, long compressedSize, long size) {
+            this.zipArchiveEntryRequest = zipArchiveEntryRequest;
             this.crc = crc;
             this.compressedSize = compressedSize;
-            this.method = method;
             this.size = size;
         }
 
+        /**
+         * Update the original ZipArchiveEntry witg sizes/crc
+         * Do not use this methods from threads that did not create the 
instance itself !
+         * @return the zipeArchiveEntry that is basis for this request
+         */
+
         public ZipArchiveEntry transferToArchiveEntry(){
+            ZipArchiveEntry entry = 
zipArchiveEntryRequest.getZipArchiveEntry();
             entry.setCompressedSize(compressedSize);
             entry.setSize(size);
             entry.setCrc(crc);
-            entry.setMethod(method);
+            entry.setMethod(zipArchiveEntryRequest.getMethod());
             return entry;
         }
     }
@@ -81,16 +86,18 @@ public class ScatterZipOutputStream impl
     /**
      * Add an archive entry to this scatter stream.
      *
-     * @param zipArchiveEntry The entry to write
-     * @param payload         The content to write for the entry. The caller 
is responsible for closing this.
-     * @param method          The compression method
+     * @param zipArchiveEntryRequest The entry to write.
      * @throws IOException    If writing fails
      */
-    public void addArchiveEntry(ZipArchiveEntry zipArchiveEntry, InputStream 
payload, int method) throws IOException {
-        streamCompressor.deflate(payload, method);
-        items.add(new CompressedEntry(zipArchiveEntry, 
streamCompressor.getCrc32(),
-                                      
streamCompressor.getBytesWrittenForLastEntry(), method,
-                                      streamCompressor.getBytesRead()));
+    public void addArchiveEntry(ZipArchiveEntryRequest zipArchiveEntryRequest) 
throws IOException {
+        final InputStream payloadStream = 
zipArchiveEntryRequest.getPayloadStream();
+        try {
+            streamCompressor.deflate(payloadStream, 
zipArchiveEntryRequest.getMethod());
+        } finally {
+            payloadStream.close();
+        }
+        items.add(new CompressedEntry(zipArchiveEntryRequest, 
streamCompressor.getCrc32(),
+                                      
streamCompressor.getBytesWrittenForLastEntry(), 
streamCompressor.getBytesRead()));
     }
 
     /**

Added: 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java?rev=1651575&view=auto
==============================================================================
--- 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java
 (added)
+++ 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ZipArchiveEntryRequest.java
 Wed Jan 14 07:06:22 2015
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.commons.compress.archivers.zip;
+
+import java.io.InputStream;
+
+/**
+ * A Thread-safe representation of a ZipArchiveEntry that is used to add 
entries to parallel archives.
+ */
+public class ZipArchiveEntryRequest {
+    /*
+     The zipArchiveEntry is not thread safe, and cannot be safely accessed by 
the getters of this class.
+     It is safely accessible during the construction part of this class and 
also after the
+     thread pools have been shut down.
+     */
+    private final ZipArchiveEntry zipArchiveEntry;
+    private final InputStreamSupplier payloadSupplier;
+    private final int method;
+
+
+    private ZipArchiveEntryRequest(ZipArchiveEntry zipArchiveEntry, 
InputStreamSupplier payloadSupplier) {
+        // this constructor has "safe" access to all member variables on 
zipArchiveEntry
+        this.zipArchiveEntry = zipArchiveEntry;
+        this.payloadSupplier = payloadSupplier;
+        this.method = zipArchiveEntry.getMethod();
+    }
+
+    /**
+     * Create a ZipArchiveEntryRequest
+     * @param zipArchiveEntry The entry to use
+     * @param payloadSupplier The payload that will be added to the zip entry.
+     * @return The newly created request
+     */
+    public static ZipArchiveEntryRequest 
createZipArchiveEntryRequest(ZipArchiveEntry zipArchiveEntry, 
InputStreamSupplier payloadSupplier) {
+        return new ZipArchiveEntryRequest(zipArchiveEntry, payloadSupplier);
+    }
+
+    /**
+     * The paylaod that will be added to this zip entry
+     * @return The input stream.
+     */
+    public InputStream getPayloadStream() {
+        return payloadSupplier.get();
+    }
+
+    /**
+     * The compression method to use
+     * @return The compression method to use
+     */
+    public int getMethod(){
+       return method;
+    }
+
+
+    /**
+     * Gets the underlying entry. Do not use this methods from threads that 
did not create the instance itself !
+     * @return the zipeArchiveEntry that is basis for this request
+     */
+    ZipArchiveEntry getZipArchiveEntry() {
+        return zipArchiveEntry;
+    }
+}

Modified: 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java?rev=1651575&r1=1651574&r2=1651575&view=diff
==============================================================================
--- 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java
 (original)
+++ 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ScatterZipOutputStreamTest.java
 Wed Jan 14 07:06:22 2015
@@ -22,7 +22,9 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.InputStream;
 
+import static 
org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
@@ -36,12 +38,14 @@ public class ScatterZipOutputStreamTest
         final byte[] A_PAYLOAD = "XAAY".getBytes();
 
         ZipArchiveEntry zab = new ZipArchiveEntry("b.txt");
-        ByteArrayInputStream payload = new ByteArrayInputStream(B_PAYLOAD);
-        scatterZipOutputStream.addArchiveEntry(zab, payload, 
ZipArchiveEntry.DEFLATED);
+        zab.setMethod(ZipArchiveEntry.DEFLATED);
+        final ByteArrayInputStream payload = new 
ByteArrayInputStream(B_PAYLOAD);
+        
scatterZipOutputStream.addArchiveEntry(createZipArchiveEntryRequest(zab, 
createPayloadSupplier(payload)));
 
         ZipArchiveEntry zae = new ZipArchiveEntry("a.txt");
+        zae.setMethod(ZipArchiveEntry.DEFLATED);
         ByteArrayInputStream payload1 = new ByteArrayInputStream(A_PAYLOAD);
-        scatterZipOutputStream.addArchiveEntry(zae, payload1, 
ZipArchiveEntry.DEFLATED);
+        
scatterZipOutputStream.addArchiveEntry(createZipArchiveEntryRequest(zae, 
createPayloadSupplier(payload1)));
 
         File target = File.createTempFile("scattertest", ".zip");
         ZipArchiveOutputStream outputStream = new 
ZipArchiveOutputStream(target);
@@ -58,4 +62,12 @@ public class ScatterZipOutputStreamTest
         assertEquals(4, a_entry.getSize());
         assertArrayEquals(A_PAYLOAD, 
IOUtils.toByteArray(zf.getInputStream(a_entry)));
     }
+
+    private InputStreamSupplier createPayloadSupplier(final 
ByteArrayInputStream payload) {
+        return new InputStreamSupplier() {
+            public InputStream get() {
+                return payload;
+            }
+        };
+    }
 }
\ No newline at end of file


Reply via email to