COMPRESS-388: Fix concurrent reads performance

Project: http://git-wip-us.apache.org/repos/asf/commons-compress/repo
Commit: http://git-wip-us.apache.org/repos/asf/commons-compress/commit/7e89c9cc
Tree: http://git-wip-us.apache.org/repos/asf/commons-compress/tree/7e89c9cc
Diff: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/7e89c9cc

Branch: refs/heads/master
Commit: 7e89c9cc80bc3fd2aab64f25dd816c6d14790988
Parents: 13a0390
Author: Zbynek Vyskovsky <kvr...@gmail.com>
Authored: Sat Apr 22 23:45:46 2017 -0700
Committer: Stefan Bodewig <bode...@apache.org>
Committed: Tue Apr 25 20:02:18 2017 +0200

----------------------------------------------------------------------
 .../commons/compress/archivers/zip/ZipFile.java | 100 ++++++++++----
 .../compress/archivers/zip/ZipFileTest.java     | 130 +++++++++++++++++++
 src/test/resources/mixed.zip                    | Bin 0 -> 71817 bytes
 3 files changed, 204 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/commons-compress/blob/7e89c9cc/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java 
b/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java
index 34c08b2..1bfd753 100644
--- a/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java
+++ b/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
@@ -441,7 +442,7 @@ public class ZipFile implements Closeable {
         }
         final OffsetEntry offsetEntry = ((Entry) ze).getOffsetEntry();
         final long start = offsetEntry.dataOffset;
-        return new BoundedInputStream(start, ze.getCompressedSize());
+        return createBoundedInputStream(start, ze.getCompressedSize());
     }
 
 
@@ -484,7 +485,7 @@ public class ZipFile implements Closeable {
         final long start = offsetEntry.dataOffset;
         // doesn't get closed if the method is not supported, but doesn't hold 
any resources either
         final BoundedInputStream bis =
-            new BoundedInputStream(start, ze.getCompressedSize()); //NOSONAR
+            createBoundedInputStream(start, ze.getCompressedSize()); //NOSONAR
         switch (ZipMethod.getMethodByCode(ze.getMethod())) {
             case STORED:
                 return bis;
@@ -1081,16 +1082,26 @@ public class ZipFile implements Closeable {
     }
 
     /**
+     * Creates new BoundedInputStream, according to implementation of
+     * underlying archive channel.
+     */
+    private BoundedInputStream createBoundedInputStream(long start, long 
remaining) {
+        return archive instanceof FileChannel ?
+            new BoundedFileChannelInputStream(start, remaining) :
+            new BoundedInputStream(start, remaining);
+    }
+
+    /**
      * InputStream that delegates requests to the underlying
      * SeekableByteChannel, making sure that only bytes from a certain
      * range can be read.
      */
     private class BoundedInputStream extends InputStream {
-        private static final int MAX_BUF_LEN = 8192;
-        private final ByteBuffer buffer;
-        private long remaining;
-        private long loc;
-        private boolean addDummyByte = false;
+        protected static final int MAX_BUF_LEN = 8192;
+        protected final ByteBuffer buffer;
+        protected long remaining;
+        protected long loc;
+        protected boolean addDummyByte = false;
 
         BoundedInputStream(final long start, final long remaining) {
             this.remaining = remaining;
@@ -1111,14 +1122,11 @@ public class ZipFile implements Closeable {
                 }
                 return -1;
             }
-            synchronized (archive) {
-                archive.position(loc++);
-                int read = read(1);
-                if (read < 0) {
-                    return read;
-                }
-                return buffer.get() & 0xff;
+            int read = read(loc++, 1);
+            if (read < 0) {
+                return read;
             }
+            return buffer.get() & 0xff;
         }
 
         @Override
@@ -1141,16 +1149,12 @@ public class ZipFile implements Closeable {
             }
             ByteBuffer buf;
             int ret = -1;
-            synchronized (archive) {
-                archive.position(loc);
-                if (len <= buffer.capacity()) {
-                    buf = buffer;
-                    ret = read(len);
-                } else {
-                    buf = ByteBuffer.allocate(len);
-                    ret = archive.read(buf);
-                    buf.flip();
-                }
+            if (len <= buffer.capacity()) {
+                buf = buffer;
+                ret = read(loc, len);
+            } else {
+                buf = ByteBuffer.allocate(len);
+                ret = read(loc, buf);
             }
             if (ret > 0) {
                 buf.get(b, off, ret);
@@ -1160,9 +1164,23 @@ public class ZipFile implements Closeable {
             return ret;
         }
 
-        private int read(int len) throws IOException {
+        protected int read(long pos, ByteBuffer buf) throws IOException {
+            int read;
+            synchronized (archive) {
+                archive.position(pos);
+                read = archive.read(buf);
+            }
+            buf.flip();
+            return read;
+        }
+
+        protected int read(long pos, int len) throws IOException {
+            int read;
             buffer.rewind().limit(len);
-            int read = archive.read(buffer);
+            synchronized (archive) {
+                archive.position(pos);
+                read = archive.read(buffer);
+            }
             buffer.flip();
             return read;
         }
@@ -1176,6 +1194,36 @@ public class ZipFile implements Closeable {
         }
     }
 
+    /**
+     * Lock-free implementation of BoundedInputStream. The
+     * implementation uses positioned reads on the underlying archive
+     * file channel and therefore performs significantly faster in
+     * concurrent environment.
+     */
+    private class BoundedFileChannelInputStream extends BoundedInputStream {
+        private final FileChannel archive;
+
+        BoundedFileChannelInputStream(final long start, final long remaining) {
+            super(start, remaining);
+            archive = (FileChannel)ZipFile.this.archive;
+        }
+
+        @Override
+        protected int read(long pos, ByteBuffer buf) throws IOException {
+            int read = archive.read(buf, pos);
+            buf.flip();
+            return read;
+        }
+
+        @Override
+        protected int read(long position, int len) throws IOException {
+            buffer.rewind().limit(len);
+            int read = archive.read(buffer, position);
+            buffer.flip();
+            return read;
+        }
+    }
+
     private static final class NameAndComment {
         private final byte[] name;
         private final byte[] comment;

http://git-wip-us.apache.org/repos/asf/commons-compress/blob/7e89c9cc/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java 
b/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java
index ee7b26f..3e83675 100644
--- a/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java
+++ b/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java
@@ -24,13 +24,17 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.ZipEntry;
 
 import org.apache.commons.compress.utils.IOUtils;
@@ -331,6 +335,132 @@ public class ZipFileTest {
         assertArrayEquals(expected, 
IOUtils.toByteArray(zf.getInputStream(ze)));
     }
 
+    @Test
+    public void testConcurrentReadSeekable() throws Exception {
+        // mixed.zip contains both inflated and stored files
+        byte[] data = null;
+        try (FileInputStream fis = new FileInputStream(getFile("mixed.zip"))) {
+            data = IOUtils.toByteArray(fis);
+        }
+        zf = new ZipFile(new SeekableInMemoryByteChannel(data), 
ZipEncodingHelper.UTF8);
+
+        final Map<String, byte[]> content = new HashMap<String, byte[]>();
+        for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) {
+            content.put(entry.getName(), 
IOUtils.toByteArray(zf.getInputStream(entry)));
+        }
+
+        final AtomicInteger passedCount = new AtomicInteger();
+        Runnable run = new Runnable() {
+            @Override
+            public void run() {
+                for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) 
{
+                    assertAllReadMethods(content.get(entry.getName()), zf, 
entry);
+                }
+                passedCount.incrementAndGet();
+            }
+        };
+        Thread t0 = new Thread(run);
+        Thread t1 = new Thread(run);
+        t0.start();
+        t1.start();
+        t0.join();
+        t1.join();
+        assertEquals(2, passedCount.get());
+    }
+
+    @Test
+    public void testConcurrentReadFile() throws Exception {
+        // mixed.zip contains both inflated and stored files
+        final File archive = getFile("mixed.zip");
+        zf = new ZipFile(archive);
+
+        final Map<String, byte[]> content = new HashMap<String, byte[]>();
+        for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) {
+            content.put(entry.getName(), 
IOUtils.toByteArray(zf.getInputStream(entry)));
+        }
+
+        final AtomicInteger passedCount = new AtomicInteger();
+        Runnable run = new Runnable() {
+            @Override
+            public void run() {
+                for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) 
{
+                    assertAllReadMethods(content.get(entry.getName()), zf, 
entry);
+                }
+                passedCount.incrementAndGet();
+            }
+        };
+        Thread t0 = new Thread(run);
+        Thread t1 = new Thread(run);
+        t0.start();
+        t1.start();
+        t0.join();
+        t1.join();
+        assertEquals(2, passedCount.get());
+    }
+
+    private void assertAllReadMethods(byte[] expected, ZipFile zipFile, 
ZipArchiveEntry entry) {
+        // simple IOUtil read
+        try (InputStream stream = zf.getInputStream(entry)) {
+            byte[] full = IOUtils.toByteArray(stream);
+            assertArrayEquals(expected, full);
+        }
+        catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+
+        // big buffer at the beginning and then chunks by IOUtils read
+        try (InputStream stream = zf.getInputStream(entry)) {
+            byte[] full;
+            byte[] bytes = new byte[0x40000];
+            int read = stream.read(bytes);
+            if (read < 0) {
+                full = new byte[0];
+            }
+            else {
+                full = readStreamRest(bytes, read, stream);
+            }
+            assertArrayEquals(expected, full);
+        }
+        catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+
+        // small chunk / single byte and big buffer then
+        try (InputStream stream = zf.getInputStream(entry)) {
+            byte[] full;
+            int single = stream.read();
+            if (single < 0) {
+                full = new byte[0];
+            }
+            else {
+                byte[] big = new byte[0x40000];
+                big[0] = (byte)single;
+                int read = stream.read(big, 1, big.length-1);
+                if (read < 0) {
+                    full = new byte[]{ (byte)single };
+                }
+                else {
+                    full = readStreamRest(big, read+1, stream);
+                }
+            }
+            assertArrayEquals(expected, full);
+        }
+        catch (IOException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /**
+     * Utility to append the rest of the stream to already read data.
+     */
+    private byte[] readStreamRest(byte[] beginning, int length, InputStream 
stream) throws IOException {
+        byte[] rest = IOUtils.toByteArray(stream);
+        byte[] full = new byte[length+rest.length];
+        System.arraycopy(beginning, 0, full, 0, length);
+        System.arraycopy(rest, 0, full, length, rest.length);
+        return full;
+    }
+
     /*
      * ordertest.zip has been handcrafted.
      *

http://git-wip-us.apache.org/repos/asf/commons-compress/blob/7e89c9cc/src/test/resources/mixed.zip
----------------------------------------------------------------------
diff --git a/src/test/resources/mixed.zip b/src/test/resources/mixed.zip
new file mode 100644
index 0000000..a36f2af
Binary files /dev/null and b/src/test/resources/mixed.zip differ

Reply via email to