Author: krosenvold Date: Tue Jan 13 06:59:24 2015 New Revision: 1651285 URL: http://svn.apache.org/r1651285 Log: Changed from nThreads to receiving an ExecutorService
There are a lot of different models/versions of executorservices, also varying according to client JDK level. Give client full control of how the executor service is created and also possibly how to schedule tasks through a slightly lower-level cerateCallable/submit api. Termination of ExecutorService is still controlled by ParallelScatterZipCreator, as must be. Modified: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java Modified: commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java?rev=1651285&r1=1651284&r2=1651285&view=diff ============================================================================== --- commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java (original) +++ commons/proper/compress/trunk/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java Tue Jan 13 06:59:24 2015 @@ -40,18 +40,22 @@ import static java.util.Collections.sync * the output file. Things that need to come in a specific order (manifests, directories) * must be handled by the client of this class, usually by writing these things to the * #ZipArchiveOutputStream *before* calling #writeTo on this class.</p> + * <p> + * The client can supply an ExecutorService, but for reasons of memory model consistency, + * this will be shut down by this class prior to completion. + * </p> */ public class ParallelScatterZipCreator { private final List<ScatterZipOutputStream> streams = synchronizedList(new ArrayList<ScatterZipOutputStream>()); private final ExecutorService es; - private final ScatterGatherBackingStoreSupplier supplier; + private final ScatterGatherBackingStoreSupplier backingStoreSupplier; private final List<Future> futures = new ArrayList<Future>(); private final long startedAt = System.currentTimeMillis(); private long compressionDoneAt = 0; private long scatterDoneAt; - private static class DefaultSupplier implements ScatterGatherBackingStoreSupplier { + private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier { final AtomicInteger storeNum = new AtomicInteger(0); public ScatterGatherBackingStore get() throws IOException { @@ -71,7 +75,7 @@ public class ParallelScatterZipCreator { @Override protected ScatterZipOutputStream initialValue() { try { - ScatterZipOutputStream scatterStream = createDeferred(supplier); + ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier); streams.add(scatterStream); return scatterStream; } catch (IOException e) { @@ -84,27 +88,30 @@ public class ParallelScatterZipCreator { * Create a ParallelScatterZipCreator with default threads */ public ParallelScatterZipCreator() { - this(Runtime.getRuntime().availableProcessors()); + this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); } /** * Create a ParallelScatterZipCreator * - * @param nThreads the number of threads to use in parallel. + * @param executorService The executorService to use for parallel scheduling. For technical reasons, + * this will be shut down by this class. */ - public ParallelScatterZipCreator(int nThreads) { - this( nThreads, new DefaultSupplier()); + public ParallelScatterZipCreator(ExecutorService executorService) { + this(executorService, new DefaultBackingStoreSupplier()); } /** * Create a ParallelScatterZipCreator * - * @param nThreads the number of threads to use in parallel. + * @param executorService The executorService to use. For technical reasons, this will be shut down + * by this class. * @param backingStoreSupplier The supplier of backing store which shall be used */ - public ParallelScatterZipCreator(int nThreads, ScatterGatherBackingStoreSupplier backingStoreSupplier) { - supplier = backingStoreSupplier; - es = Executors.newFixedThreadPool(nThreads); + public ParallelScatterZipCreator(ExecutorService executorService, + ScatterGatherBackingStoreSupplier backingStoreSupplier) { + this.backingStoreSupplier = backingStoreSupplier; + es = executorService; } /** @@ -113,19 +120,43 @@ public class ParallelScatterZipCreator { * This method is expected to be called from a single client thread * </p> * - * @param zipArchiveEntry The entry to add. Compression method + * @param zipArchiveEntry The entry to add. * @param source The source input stream supplier */ public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { + submit(createCallable(zipArchiveEntry, source)); + } + + /** + * Submit a callable for compression + * @param callable The callable to run + */ + public void submit(Callable<Object> callable) { + futures.add(es.submit(callable)); + } + + /** + * Create a callable that will compress the given archive entry. + * + * <p>This method is expected to be called from a single client thread.</p> + * <p> + * This method is used by clients that want finer grained control over how the callable is + * created, possibly wanting to wrap this callable in a different callable</p> + * + * @param zipArchiveEntry The entry to add. + * @param source The source input stream supplier + * @return A callable that will be used to check for errors + */ + + public Callable<Object> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) { final int method = zipArchiveEntry.getMethod(); if (method == ZipMethod.UNKNOWN_CODE) { throw new IllegalArgumentException("Method must be set on the supplied zipArchiveEntry"); } - // Consider if we want to constrain the number of items that can enqueue here. - Future<Object> future = es.submit(new Callable<Object>() { - public Void call() throws Exception { - ScatterZipOutputStream streamToUse = tlScatterStreams.get(); + return new Callable<Object>() { + public Object call() throws Exception { + final ScatterZipOutputStream streamToUse = tlScatterStreams.get(); InputStream payload = source.get(); try { streamToUse.addArchiveEntry(zipArchiveEntry, payload, method); @@ -134,9 +165,7 @@ public class ParallelScatterZipCreator { } return null; } - - }); - futures.add( future); + }; } @@ -161,7 +190,7 @@ public class ParallelScatterZipCreator { } es.shutdown(); - es.awaitTermination(1000 * 60, TimeUnit.SECONDS); + es.awaitTermination(1000 * 60, TimeUnit.SECONDS); // == Infinity. We really *must* wait for this to complete // It is important that all threads terminate before we go on, ensure happens-before relationship compressionDoneAt = System.currentTimeMillis(); Modified: commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java URL: http://svn.apache.org/viewvc/commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java?rev=1651285&r1=1651284&r2=1651285&view=diff ============================================================================== --- commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java (original) +++ commons/proper/compress/trunk/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java Tue Jan 13 06:59:24 2015 @@ -27,12 +27,17 @@ import java.io.InputStream; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.junit.Assert.*; @SuppressWarnings("OctalInteger") public class ParallelScatterZipCreatorTest { + private final int NUMITEMS = 5000; + @Test public void concurrent() throws Exception { @@ -44,12 +49,36 @@ public class ParallelScatterZipCreatorTe Map<String, byte[]> entries = writeEntries(zipCreator); zipCreator.writeTo(zos); zos.close(); - removeEntriesFoundInZipFile(result, entries); assertTrue(entries.size() == 0); assertNotNull( zipCreator.getStatisticsMessage()); } + @Test + public void callableApi() + throws Exception { + File result = File.createTempFile("parallelScatterGather2", ""); + ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result); + zos.setEncoding("UTF-8"); + ExecutorService es = Executors.newFixedThreadPool(1); + + ScatterGatherBackingStoreSupplier supp = new ScatterGatherBackingStoreSupplier() { + public ScatterGatherBackingStore get() throws IOException { + return new FileBasedScatterGatherBackingStore(File.createTempFile("parallelscatter", "n1")); + } + }; + + ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp); + Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator); + zipCreator.writeTo(zos); + zos.close(); + + + removeEntriesFoundInZipFile(result, entries); + assertTrue(entries.size() == 0); + assertNotNull(zipCreator.getStatisticsMessage()); + } + private void removeEntriesFoundInZipFile(File result, Map<String, byte[]> entries) throws IOException { ZipFile zf = new ZipFile(result); Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder(); @@ -58,21 +87,16 @@ public class ParallelScatterZipCreatorTe InputStream inputStream = zf.getInputStream(zipArchiveEntry); byte[] actual = IOUtils.toByteArray(inputStream); byte[] expected = entries.remove(zipArchiveEntry.getName()); - assertArrayEquals( expected, actual); + assertArrayEquals( "For " + zipArchiveEntry.getName(), expected, actual); } zf.close(); } private Map<String, byte[]> writeEntries(ParallelScatterZipCreator zipCreator) { Map<String, byte[]> entries = new HashMap<String, byte[]>(); - for (int i = 0; i < 10000; i++){ - ZipArchiveEntry za = new ZipArchiveEntry( "file" + i); - final String payload = "content" + i; - final byte[] payloadBytes = payload.getBytes(); - entries.put( za.getName(), payloadBytes); - za.setMethod(ZipArchiveEntry.DEFLATED); - za.setSize(payload.length()); - za.setUnixMode(UnixStat.FILE_FLAG | 0664); + for (int i = 0; i < NUMITEMS; i++){ + final byte[] payloadBytes = ("content" + i).getBytes(); + ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes); zipCreator.addArchiveEntry(za, new InputStreamSupplier() { public InputStream get() { return new ByteArrayInputStream(payloadBytes); @@ -81,4 +105,28 @@ public class ParallelScatterZipCreatorTe } return entries; } + + private Map<String, byte[]> writeEntriesAsCallable(ParallelScatterZipCreator zipCreator) { + Map<String, byte[]> entries = new HashMap<String, byte[]>(); + for (int i = 0; i < NUMITEMS; i++){ + final byte[] payloadBytes = ("content" + i).getBytes(); + ZipArchiveEntry za = createZipArchiveEntry(entries, i, payloadBytes); + final Callable<Object> callable = zipCreator.createCallable(za, new InputStreamSupplier() { + public InputStream get() { + return new ByteArrayInputStream(payloadBytes); + } + }); + zipCreator.submit(callable); + } + return entries; + } + + private ZipArchiveEntry createZipArchiveEntry(Map<String, byte[]> entries, int i, byte[] payloadBytes) { + ZipArchiveEntry za = new ZipArchiveEntry( "file" + i); + entries.put( za.getName(), payloadBytes); + za.setMethod(ZipArchiveEntry.DEFLATED); + za.setSize(payloadBytes.length); + za.setUnixMode(UnixStat.FILE_FLAG | 0664); + return za; + } } \ No newline at end of file