COMPRESS-388: Fix concurrent reads performance
Project: http://git-wip-us.apache.org/repos/asf/commons-compress/repo Commit: http://git-wip-us.apache.org/repos/asf/commons-compress/commit/7e89c9cc Tree: http://git-wip-us.apache.org/repos/asf/commons-compress/tree/7e89c9cc Diff: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/7e89c9cc Branch: refs/heads/master Commit: 7e89c9cc80bc3fd2aab64f25dd816c6d14790988 Parents: 13a0390 Author: Zbynek Vyskovsky <kvr...@gmail.com> Authored: Sat Apr 22 23:45:46 2017 -0700 Committer: Stefan Bodewig <bode...@apache.org> Committed: Tue Apr 25 20:02:18 2017 +0200 ---------------------------------------------------------------------- .../commons/compress/archivers/zip/ZipFile.java | 100 ++++++++++---- .../compress/archivers/zip/ZipFileTest.java | 130 +++++++++++++++++++ src/test/resources/mixed.zip | Bin 0 -> 71817 bytes 3 files changed, 204 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/commons-compress/blob/7e89c9cc/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java b/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java index 34c08b2..1bfd753 100644 --- a/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java +++ b/src/main/java/org/apache/commons/compress/archivers/zip/ZipFile.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.StandardOpenOption; @@ -441,7 +442,7 @@ public class ZipFile implements Closeable { } final OffsetEntry offsetEntry = ((Entry) ze).getOffsetEntry(); final long start = offsetEntry.dataOffset; - return new BoundedInputStream(start, ze.getCompressedSize()); + return createBoundedInputStream(start, ze.getCompressedSize()); } @@ -484,7 +485,7 @@ public class ZipFile implements Closeable { final long start = offsetEntry.dataOffset; // doesn't get closed if the method is not supported, but doesn't hold any resources either final BoundedInputStream bis = - new BoundedInputStream(start, ze.getCompressedSize()); //NOSONAR + createBoundedInputStream(start, ze.getCompressedSize()); //NOSONAR switch (ZipMethod.getMethodByCode(ze.getMethod())) { case STORED: return bis; @@ -1081,16 +1082,26 @@ public class ZipFile implements Closeable { } /** + * Creates new BoundedInputStream, according to implementation of + * underlying archive channel. + */ + private BoundedInputStream createBoundedInputStream(long start, long remaining) { + return archive instanceof FileChannel ? + new BoundedFileChannelInputStream(start, remaining) : + new BoundedInputStream(start, remaining); + } + + /** * InputStream that delegates requests to the underlying * SeekableByteChannel, making sure that only bytes from a certain * range can be read. */ private class BoundedInputStream extends InputStream { - private static final int MAX_BUF_LEN = 8192; - private final ByteBuffer buffer; - private long remaining; - private long loc; - private boolean addDummyByte = false; + protected static final int MAX_BUF_LEN = 8192; + protected final ByteBuffer buffer; + protected long remaining; + protected long loc; + protected boolean addDummyByte = false; BoundedInputStream(final long start, final long remaining) { this.remaining = remaining; @@ -1111,14 +1122,11 @@ public class ZipFile implements Closeable { } return -1; } - synchronized (archive) { - archive.position(loc++); - int read = read(1); - if (read < 0) { - return read; - } - return buffer.get() & 0xff; + int read = read(loc++, 1); + if (read < 0) { + return read; } + return buffer.get() & 0xff; } @Override @@ -1141,16 +1149,12 @@ public class ZipFile implements Closeable { } ByteBuffer buf; int ret = -1; - synchronized (archive) { - archive.position(loc); - if (len <= buffer.capacity()) { - buf = buffer; - ret = read(len); - } else { - buf = ByteBuffer.allocate(len); - ret = archive.read(buf); - buf.flip(); - } + if (len <= buffer.capacity()) { + buf = buffer; + ret = read(loc, len); + } else { + buf = ByteBuffer.allocate(len); + ret = read(loc, buf); } if (ret > 0) { buf.get(b, off, ret); @@ -1160,9 +1164,23 @@ public class ZipFile implements Closeable { return ret; } - private int read(int len) throws IOException { + protected int read(long pos, ByteBuffer buf) throws IOException { + int read; + synchronized (archive) { + archive.position(pos); + read = archive.read(buf); + } + buf.flip(); + return read; + } + + protected int read(long pos, int len) throws IOException { + int read; buffer.rewind().limit(len); - int read = archive.read(buffer); + synchronized (archive) { + archive.position(pos); + read = archive.read(buffer); + } buffer.flip(); return read; } @@ -1176,6 +1194,36 @@ public class ZipFile implements Closeable { } } + /** + * Lock-free implementation of BoundedInputStream. The + * implementation uses positioned reads on the underlying archive + * file channel and therefore performs significantly faster in + * concurrent environment. + */ + private class BoundedFileChannelInputStream extends BoundedInputStream { + private final FileChannel archive; + + BoundedFileChannelInputStream(final long start, final long remaining) { + super(start, remaining); + archive = (FileChannel)ZipFile.this.archive; + } + + @Override + protected int read(long pos, ByteBuffer buf) throws IOException { + int read = archive.read(buf, pos); + buf.flip(); + return read; + } + + @Override + protected int read(long position, int len) throws IOException { + buffer.rewind().limit(len); + int read = archive.read(buffer, position); + buffer.flip(); + return read; + } + } + private static final class NameAndComment { private final byte[] name; private final byte[] comment; http://git-wip-us.apache.org/repos/asf/commons-compress/blob/7e89c9cc/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java b/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java index ee7b26f..3e83675 100644 --- a/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java +++ b/src/test/java/org/apache/commons/compress/archivers/zip/ZipFileTest.java @@ -24,13 +24,17 @@ import static org.junit.Assert.*; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.ZipEntry; import org.apache.commons.compress.utils.IOUtils; @@ -331,6 +335,132 @@ public class ZipFileTest { assertArrayEquals(expected, IOUtils.toByteArray(zf.getInputStream(ze))); } + @Test + public void testConcurrentReadSeekable() throws Exception { + // mixed.zip contains both inflated and stored files + byte[] data = null; + try (FileInputStream fis = new FileInputStream(getFile("mixed.zip"))) { + data = IOUtils.toByteArray(fis); + } + zf = new ZipFile(new SeekableInMemoryByteChannel(data), ZipEncodingHelper.UTF8); + + final Map<String, byte[]> content = new HashMap<String, byte[]>(); + for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) { + content.put(entry.getName(), IOUtils.toByteArray(zf.getInputStream(entry))); + } + + final AtomicInteger passedCount = new AtomicInteger(); + Runnable run = new Runnable() { + @Override + public void run() { + for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) { + assertAllReadMethods(content.get(entry.getName()), zf, entry); + } + passedCount.incrementAndGet(); + } + }; + Thread t0 = new Thread(run); + Thread t1 = new Thread(run); + t0.start(); + t1.start(); + t0.join(); + t1.join(); + assertEquals(2, passedCount.get()); + } + + @Test + public void testConcurrentReadFile() throws Exception { + // mixed.zip contains both inflated and stored files + final File archive = getFile("mixed.zip"); + zf = new ZipFile(archive); + + final Map<String, byte[]> content = new HashMap<String, byte[]>(); + for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) { + content.put(entry.getName(), IOUtils.toByteArray(zf.getInputStream(entry))); + } + + final AtomicInteger passedCount = new AtomicInteger(); + Runnable run = new Runnable() { + @Override + public void run() { + for (ZipArchiveEntry entry: Collections.list(zf.getEntries())) { + assertAllReadMethods(content.get(entry.getName()), zf, entry); + } + passedCount.incrementAndGet(); + } + }; + Thread t0 = new Thread(run); + Thread t1 = new Thread(run); + t0.start(); + t1.start(); + t0.join(); + t1.join(); + assertEquals(2, passedCount.get()); + } + + private void assertAllReadMethods(byte[] expected, ZipFile zipFile, ZipArchiveEntry entry) { + // simple IOUtil read + try (InputStream stream = zf.getInputStream(entry)) { + byte[] full = IOUtils.toByteArray(stream); + assertArrayEquals(expected, full); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + + // big buffer at the beginning and then chunks by IOUtils read + try (InputStream stream = zf.getInputStream(entry)) { + byte[] full; + byte[] bytes = new byte[0x40000]; + int read = stream.read(bytes); + if (read < 0) { + full = new byte[0]; + } + else { + full = readStreamRest(bytes, read, stream); + } + assertArrayEquals(expected, full); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + + // small chunk / single byte and big buffer then + try (InputStream stream = zf.getInputStream(entry)) { + byte[] full; + int single = stream.read(); + if (single < 0) { + full = new byte[0]; + } + else { + byte[] big = new byte[0x40000]; + big[0] = (byte)single; + int read = stream.read(big, 1, big.length-1); + if (read < 0) { + full = new byte[]{ (byte)single }; + } + else { + full = readStreamRest(big, read+1, stream); + } + } + assertArrayEquals(expected, full); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Utility to append the rest of the stream to already read data. + */ + private byte[] readStreamRest(byte[] beginning, int length, InputStream stream) throws IOException { + byte[] rest = IOUtils.toByteArray(stream); + byte[] full = new byte[length+rest.length]; + System.arraycopy(beginning, 0, full, 0, length); + System.arraycopy(rest, 0, full, length, rest.length); + return full; + } + /* * ordertest.zip has been handcrafted. * http://git-wip-us.apache.org/repos/asf/commons-compress/blob/7e89c9cc/src/test/resources/mixed.zip ---------------------------------------------------------------------- diff --git a/src/test/resources/mixed.zip b/src/test/resources/mixed.zip new file mode 100644 index 0000000..a36f2af Binary files /dev/null and b/src/test/resources/mixed.zip differ