Author: krosenvold
Date: Tue Jan 13 06:59:24 2015
New Revision: 1651285

URL: http://svn.apache.org/r1651285
Log:
Changed from nThreads to receiving an ExecutorService

There are a lot of different models/versions of executorservices, also
varying according to client JDK level. Give client full control of how the
executor service is created and also possibly how to schedule tasks through
a slightly lower-level cerateCallable/submit api.

Termination of ExecutorService is still controlled by ParallelScatterZipCreator,
as must be.

Modified:
    
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
    
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java

Modified: 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java?rev=1651285&r1=1651284&r2=1651285&view=diff
==============================================================================
--- 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
 (original)
+++ 
commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
 Tue Jan 13 06:59:24 2015
@@ -40,18 +40,22 @@ import static java.util.Collections.sync
  * the output file. Things that need to come in a specific order (manifests, 
directories)
  * must be handled by the client of this class, usually by writing these 
things to the
  * #ZipArchiveOutputStream *before* calling #writeTo on this class.</p>
+ * <p>
+ * The client can supply an ExecutorService, but for reasons of memory model 
consistency,
+ * this will be shut down by this class prior to completion.
+ * </p>
  */
 public class ParallelScatterZipCreator {
     private final List<ScatterZipOutputStream> streams = synchronizedList(new 
ArrayList<ScatterZipOutputStream>());
     private final ExecutorService es;
-    private final ScatterGatherBackingStoreSupplier supplier;
+    private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
     private final List<Future> futures = new ArrayList<Future>();
 
     private final long startedAt = System.currentTimeMillis();
     private long compressionDoneAt = 0;
     private long scatterDoneAt;
 
-    private static class DefaultSupplier implements 
ScatterGatherBackingStoreSupplier {
+    private static class DefaultBackingStoreSupplier implements 
ScatterGatherBackingStoreSupplier {
         final AtomicInteger storeNum = new AtomicInteger(0);
 
         public ScatterGatherBackingStore get() throws IOException {
@@ -71,7 +75,7 @@ public class ParallelScatterZipCreator {
         @Override
         protected ScatterZipOutputStream initialValue() {
             try {
-                ScatterZipOutputStream scatterStream = 
createDeferred(supplier);
+                ScatterZipOutputStream scatterStream = 
createDeferred(backingStoreSupplier);
                 streams.add(scatterStream);
                 return scatterStream;
             } catch (IOException e) {
@@ -84,27 +88,30 @@ public class ParallelScatterZipCreator {
      * Create a ParallelScatterZipCreator with default threads
      */
     public ParallelScatterZipCreator() {
-        this(Runtime.getRuntime().availableProcessors());
+        
this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
     }
 
     /**
      * Create a ParallelScatterZipCreator
      *
-     * @param nThreads the number of threads to use in parallel.
+     * @param executorService The executorService to use for parallel 
scheduling. For technical reasons,
+     *                        this will be shut down by this class.
      */
-    public ParallelScatterZipCreator(int nThreads) {
-        this( nThreads, new DefaultSupplier());
+    public ParallelScatterZipCreator(ExecutorService executorService) {
+        this(executorService, new DefaultBackingStoreSupplier());
     }
 
     /**
      * Create a ParallelScatterZipCreator
      *
-     * @param nThreads the number of threads to use in parallel.
+     * @param executorService The executorService to use. For technical 
reasons, this will be shut down
+     *                        by this class.
      * @param backingStoreSupplier The supplier of backing store which shall 
be used
      */
-    public ParallelScatterZipCreator(int nThreads, 
ScatterGatherBackingStoreSupplier backingStoreSupplier) {
-        supplier = backingStoreSupplier;
-        es = Executors.newFixedThreadPool(nThreads);
+    public ParallelScatterZipCreator(ExecutorService executorService,
+                                     ScatterGatherBackingStoreSupplier 
backingStoreSupplier) {
+        this.backingStoreSupplier = backingStoreSupplier;
+        es = executorService;
     }
 
     /**
@@ -113,19 +120,43 @@ public class ParallelScatterZipCreator {
      * This method is expected to be called from a single client thread
      * </p>
      *
-     * @param zipArchiveEntry The entry to add. Compression method
+     * @param zipArchiveEntry The entry to add.
      * @param source          The source input stream supplier
      */
 
     public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final 
InputStreamSupplier source) {
+        submit(createCallable(zipArchiveEntry, source));
+    }
+
+    /**
+     * Submit a callable for compression
+     * @param callable The callable to run
+     */
+    public void submit(Callable<Object> callable) {
+        futures.add(es.submit(callable));
+    }
+
+    /**
+     * Create a callable that will compress the given archive entry.
+     *
+     * <p>This method is expected to be called from a single client thread.</p>
+     * <p>
+     * This method is used by clients that want finer grained control over how 
the callable is
+     * created, possibly wanting to wrap this callable in a different 
callable</p>
+     *
+     * @param zipArchiveEntry The entry to add.
+     * @param source    The source input stream supplier
+     * @return   A callable that will be used to check for errors
+     */
+
+    public Callable<Object> createCallable(final ZipArchiveEntry 
zipArchiveEntry, final InputStreamSupplier source) {
         final int method = zipArchiveEntry.getMethod();
         if (method == ZipMethod.UNKNOWN_CODE) {
             throw new IllegalArgumentException("Method must be set on the 
supplied zipArchiveEntry");
         }
-        // Consider if we want to constrain the number of items that can 
enqueue here.
-        Future<Object> future = es.submit(new Callable<Object>() {
-            public Void call() throws Exception {
-                ScatterZipOutputStream streamToUse = tlScatterStreams.get();
+        return new Callable<Object>() {
+            public Object call() throws Exception {
+                final ScatterZipOutputStream streamToUse = 
tlScatterStreams.get();
                 InputStream payload = source.get();
                 try {
                     streamToUse.addArchiveEntry(zipArchiveEntry, payload, 
method);
@@ -134,9 +165,7 @@ public class ParallelScatterZipCreator {
                 }
                 return null;
             }
-
-        });
-        futures.add( future);
+        };
     }
 
 
@@ -161,7 +190,7 @@ public class ParallelScatterZipCreator {
         }
 
         es.shutdown();
-        es.awaitTermination(1000 * 60, TimeUnit.SECONDS);
+        es.awaitTermination(1000 * 60, TimeUnit.SECONDS);  // == Infinity. We 
really *must* wait for this to complete
 
         // It is important that all threads terminate before we go on, ensure 
happens-before relationship
         compressionDoneAt = System.currentTimeMillis();

Modified: 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
URL: 
http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java?rev=1651285&r1=1651284&r2=1651285&view=diff
==============================================================================
--- 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
 (original)
+++ 
commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
 Tue Jan 13 06:59:24 2015
@@ -27,12 +27,17 @@ import java.io.InputStream;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.*;
 
 @SuppressWarnings("OctalInteger")
 public class ParallelScatterZipCreatorTest {
 
+    private final int NUMITEMS = 5000;
+
     @Test
     public void concurrent()
             throws Exception {
@@ -44,12 +49,36 @@ public class ParallelScatterZipCreatorTe
         Map<String, byte[]> entries = writeEntries(zipCreator);
         zipCreator.writeTo(zos);
         zos.close();
-
         removeEntriesFoundInZipFile(result, entries);
         assertTrue(entries.size() == 0);
         assertNotNull( zipCreator.getStatisticsMessage());
     }
 
+    @Test
+    public void callableApi()
+            throws Exception {
+        File result = File.createTempFile("parallelScatterGather2", "");
+        ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
+        zos.setEncoding("UTF-8");
+        ExecutorService es = Executors.newFixedThreadPool(1);
+
+        ScatterGatherBackingStoreSupplier supp = new 
ScatterGatherBackingStoreSupplier() {
+            public ScatterGatherBackingStore get() throws IOException {
+                return new 
FileBasedScatterGatherBackingStore(File.createTempFile("parallelscatter", 
"n1"));
+            }
+        };
+
+        ParallelScatterZipCreator zipCreator = new 
ParallelScatterZipCreator(es, supp);
+        Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator);
+        zipCreator.writeTo(zos);
+        zos.close();
+
+
+        removeEntriesFoundInZipFile(result, entries);
+        assertTrue(entries.size() == 0);
+        assertNotNull(zipCreator.getStatisticsMessage());
+    }
+
     private void removeEntriesFoundInZipFile(File result, Map<String, byte[]> 
entries) throws IOException {
         ZipFile zf = new ZipFile(result);
         Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = 
zf.getEntriesInPhysicalOrder();
@@ -58,21 +87,16 @@ public class ParallelScatterZipCreatorTe
             InputStream inputStream = zf.getInputStream(zipArchiveEntry);
             byte[] actual = IOUtils.toByteArray(inputStream);
             byte[] expected = entries.remove(zipArchiveEntry.getName());
-            assertArrayEquals( expected, actual);
+            assertArrayEquals( "For " + zipArchiveEntry.getName(),  expected, 
actual);
         }
         zf.close();
     }
 
     private Map<String, byte[]> writeEntries(ParallelScatterZipCreator 
zipCreator) {
         Map<String, byte[]> entries = new HashMap<String, byte[]>();
-        for (int i = 0; i < 10000; i++){
-            ZipArchiveEntry za = new ZipArchiveEntry( "file" + i);
-            final String payload = "content" + i;
-            final byte[] payloadBytes = payload.getBytes();
-            entries.put( za.getName(), payloadBytes);
-            za.setMethod(ZipArchiveEntry.DEFLATED);
-            za.setSize(payload.length());
-            za.setUnixMode(UnixStat.FILE_FLAG | 0664);
+        for (int i = 0; i < NUMITEMS; i++){
+            final byte[] payloadBytes = ("content" + i).getBytes();
+            ZipArchiveEntry za = createZipArchiveEntry(entries, i, 
payloadBytes);
             zipCreator.addArchiveEntry(za, new InputStreamSupplier() {
                 public InputStream get() {
                     return new ByteArrayInputStream(payloadBytes);
@@ -81,4 +105,28 @@ public class ParallelScatterZipCreatorTe
         }
         return entries;
     }
+
+    private Map<String, byte[]> 
writeEntriesAsCallable(ParallelScatterZipCreator zipCreator) {
+        Map<String, byte[]> entries = new HashMap<String, byte[]>();
+        for (int i = 0; i < NUMITEMS; i++){
+            final byte[] payloadBytes = ("content" + i).getBytes();
+            ZipArchiveEntry za = createZipArchiveEntry(entries, i, 
payloadBytes);
+            final Callable<Object> callable = zipCreator.createCallable(za, 
new InputStreamSupplier() {
+                public InputStream get() {
+                    return new ByteArrayInputStream(payloadBytes);
+                }
+            });
+            zipCreator.submit(callable);
+        }
+        return entries;
+    }
+
+    private ZipArchiveEntry createZipArchiveEntry(Map<String, byte[]> entries, 
int i, byte[] payloadBytes) {
+        ZipArchiveEntry za = new ZipArchiveEntry( "file" + i);
+        entries.put( za.getName(), payloadBytes);
+        za.setMethod(ZipArchiveEntry.DEFLATED);
+        za.setSize(payloadBytes.length);
+        za.setUnixMode(UnixStat.FILE_FLAG | 0664);
+        return za;
+    }
 }
\ No newline at end of file


Reply via email to