COMPRESS-405 Create Fixed Length Block OutputStream / WriteableByteChannel

This commit provides a new class that is an OutputStream and 
WritableByteChannel, and which supports writing to a destination output stream 
or byte channel in fixed size blocks. Internally, all writes are made using NIO.
If the destination is a  FileOutputStream the existing channel is used.
Other OutputStreams are wrapped with a custom channel implementation which does 
not attempt to split writes into chunks.

If the target channel fails to write the entire buffer in a single call, an 
exception is thrown,

Incoming data is accumulated in a ByteBuffer until a complete block is ready, 
then written to the target.
If  WritableByteChannel::write(ByteBuffer)  method is called, the code will 
attempt to avoid copying data into the buffer if the buffer is empty, and a 
complete block is available.

The class and UnitTest are in compress/utils . This is a MINOR change - thus 
the version number for the package should be increated to 1.15.0.

Signed-off-by: Simon Spero <sesunc...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/commons-compress/repo
Commit: http://git-wip-us.apache.org/repos/asf/commons-compress/commit/4d00e14e
Tree: http://git-wip-us.apache.org/repos/asf/commons-compress/tree/4d00e14e
Diff: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/4d00e14e

Branch: refs/heads/master
Commit: 4d00e14ec8d1e83533a8aeb788437568b417ca88
Parents: 115a76d
Author: Simon Spero <sesunc...@gmail.com>
Authored: Sun Jun 11 12:21:16 2017 -0400
Committer: Stefan Bodewig <bode...@apache.org>
Committed: Thu Jun 15 18:18:04 2017 +0200

----------------------------------------------------------------------
 .../utils/FixedLengthBlockOutputStream.java     | 209 +++++++++++
 .../utils/FixedLengthBlockOutputStreamTest.java | 365 +++++++++++++++++++
 2 files changed, 574 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/commons-compress/blob/4d00e14e/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java
 
b/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java
new file mode 100644
index 0000000..bffab9e
--- /dev/null
+++ 
b/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java
@@ -0,0 +1,209 @@
+package org.apache.commons.compress.utils;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class FixedLengthBlockOutputStream extends OutputStream implements 
WritableByteChannel,
+    AutoCloseable {
+
+    private final WritableByteChannel out;
+    private final int blockSize;
+    private final ByteBuffer buffer;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public FixedLengthBlockOutputStream(OutputStream os, int blockSize) {
+        if (os instanceof FileOutputStream) {
+            FileOutputStream fileOutputStream = (FileOutputStream) os;
+            out = fileOutputStream.getChannel();
+            buffer = ByteBuffer.allocateDirect(blockSize);
+        } else {
+            out = new BufferAtATimeOutputChannel(os);
+            buffer = ByteBuffer.allocate(blockSize);
+        }
+        this.blockSize = blockSize;
+    }
+
+    public FixedLengthBlockOutputStream(WritableByteChannel out, int 
blockSize) {
+        this.out = out;
+        this.blockSize = blockSize;
+        this.buffer = ByteBuffer.allocateDirect(blockSize);
+    }
+
+    private void maybeFlush() throws IOException {
+        if (!buffer.hasRemaining()) {
+            writeBlock();
+        }
+    }
+
+    private void writeBlock() throws IOException {
+        buffer.flip();
+        int i = out.write(buffer);
+        boolean hasRemaining = buffer.hasRemaining();
+        if (i != blockSize || hasRemaining) {
+            String msg = String
+                .format("Failed to write %,d bytes atomically. Only wrote  
%,d",
+                    blockSize, i);
+            throw new IOException(msg);
+        }
+        buffer.clear();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if(!isOpen()) {
+            throw new ClosedChannelException();
+        }
+        buffer.put((byte) b);
+        maybeFlush();
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if(!isOpen()) {
+            throw new ClosedChannelException();
+        }
+        while (len > 0) {
+            int n = Math.min(len, buffer.remaining());
+            buffer.put(b, off, n);
+            maybeFlush();
+            len -= n;
+            off += n;
+        }
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        if(!isOpen()) {
+            throw new ClosedChannelException();
+        }
+        int srcRemaining = src.remaining();
+
+        if (srcRemaining < buffer.remaining()) {
+            // if don't have enough bytes in src to fill up a block we must 
buffer
+            buffer.put(src);
+        } else {
+            int srcLeft = srcRemaining;
+            int savedLimit = src.limit();
+            // If we're not at the start of buffer, we have some bytes already 
 buffered
+            // fill up the reset of buffer and write the block.
+            if (buffer.position() != 0) {
+                int n = buffer.remaining();
+                src.limit(src.position() + n);
+                buffer.put(src);
+                writeBlock();
+                srcLeft -= n;
+            }
+            // whilst we have enough bytes in src for complete blocks,
+            // write them directly from src without copying them to buffer
+            while (srcLeft >= blockSize) {
+                src.limit(src.position() + blockSize);
+                out.write(src);
+                srcLeft -= blockSize;
+            }
+            // copy any remaining bytes into buffer
+            src.limit(savedLimit);
+            buffer.put(src);
+        }
+        return srcRemaining;
+    }
+
+    @Override
+    public boolean isOpen() {
+        if(!out.isOpen()) {
+            closed.set(true);
+        }
+        return !closed.get();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            if (buffer.position() != 0) {
+                padLastBlock();
+                writeBlock();
+            }
+            out.close();
+        }
+    }
+
+    private void padLastBlock() {
+        buffer.order(ByteOrder.nativeOrder());
+        int bytesToWrite = buffer.remaining();
+        if (bytesToWrite > 8) {
+            int align = (buffer.position() & 7);
+            if (align != 0) {
+                int limit = 8 - align;
+                for (int i = 0; i < limit; i++) {
+                    buffer.put((byte) 0);
+                }
+                bytesToWrite -= limit;
+            }
+
+            while (bytesToWrite >= 8) {
+                buffer.putLong(0L);
+                bytesToWrite -= 8;
+            }
+        }
+        while (buffer.hasRemaining()) {
+            buffer.put((byte) 0);
+        }
+    }
+
+    /**
+     * Helper class to provide channel wrapper for arbitrary output stream 
that doesn't alter the
+     * size of writes.  We can't use Channels.newChannel, because for non 
FileOutputStreams, it
+     * breaks up writes into 8KB max chunks. Since the purpose of this class 
is to always write
+     * complete blocks, we need to write a simple class to take care of it.
+     */
+    private static class BufferAtATimeOutputChannel implements 
WritableByteChannel {
+
+        private final OutputStream out;
+        private final AtomicBoolean closed = new AtomicBoolean(false);
+
+        private BufferAtATimeOutputChannel(OutputStream out) {
+            this.out = out;
+        }
+
+        @Override
+        public int write(ByteBuffer buffer) throws IOException {
+            assert isOpen() : "somehow trying to write to closed 
BufferAtATimeOutputChannel";
+            assert buffer.hasArray() :
+                "direct buffer somehow written to BufferAtATimeOutputChannel";
+
+            try {
+                int pos = buffer.position();
+                int len = buffer.limit() - pos;
+                out.write(buffer.array(), buffer.arrayOffset() + pos, len);
+                buffer.position(buffer.limit());
+                return len;
+            } catch (IOException e) {
+                  try {
+                      close();
+                  } finally {
+                      throw e;
+                  }
+            }
+        }
+
+        @Override
+        public boolean isOpen() {
+            return !closed.get();
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (closed.compareAndSet(false, true)) {
+                out.close();
+            }
+        }
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/commons-compress/blob/4d00e14e/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java
 
b/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java
new file mode 100644
index 0000000..8f07acd
--- /dev/null
+++ 
b/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java
@@ -0,0 +1,365 @@
+package org.apache.commons.compress.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.hamcrest.core.IsInstanceOf;
+import org.junit.Test;
+import org.mockito.internal.matchers.GreaterOrEqual;
+
+public class FixedLengthBlockOutputStreamTest {
+
+    @Test
+    public void testSmallWrite() throws IOException {
+        testWriteAndPad(10240, "hello world!\n", false);
+        testWriteAndPad(512, "hello world!\n", false);
+        testWriteAndPad(11, "hello world!\n", false);
+        testWriteAndPad(3, "hello world!\n", false);
+    }
+
+    @Test
+    public void testSmallWriteToStream() throws IOException {
+        testWriteAndPadToStream(10240, "hello world!\n", false);
+        testWriteAndPadToStream(512, "hello world!\n", false);
+        testWriteAndPadToStream(11, "hello world!\n", false);
+        testWriteAndPadToStream(3, "hello     world!\n", false);
+    }
+
+    @Test
+    public void testWriteSingleBytes() throws IOException {
+        int blockSize = 4;
+        MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, 
false);
+        ByteArrayOutputStream bos = mock.bos;
+        String text = "hello world avengers";
+        byte msg[] = text.getBytes();
+        int len = msg.length;
+        try (FixedLengthBlockOutputStream out = new 
FixedLengthBlockOutputStream(mock, blockSize)) {
+            for (int i = 0; i < len; i++) {
+                out.write(msg[i]);
+            }
+        }
+        byte[] output = bos.toByteArray();
+
+        validate(blockSize, msg, output);
+    }
+
+
+    @Test
+    public void testWriteBuf() throws IOException {
+        String hwa = "hello world avengers";
+        testBuf(4, hwa);
+        testBuf(512, hwa);
+        testBuf(10240, hwa);
+        testBuf(11, hwa + hwa + hwa);
+    }
+
+    @Test
+    public void testMultiWriteBuf() throws IOException {
+        int blockSize = 13;
+        MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, 
false);
+        String testString = "hello world";
+        byte msg[] = testString.getBytes();
+        int reps = 17;
+
+        try (FixedLengthBlockOutputStream out = new 
FixedLengthBlockOutputStream(mock, blockSize)) {
+            for (int i = 0; i < reps; i++) {
+                ByteBuffer buf = getByteBuffer(msg);
+                out.write(buf);
+            }
+        }
+        ByteArrayOutputStream bos = mock.bos;
+        double v = Math.ceil((reps * msg.length) / (double) blockSize) * 
blockSize;
+        assertEquals("wrong size", (long) v, bos.size());
+        int strLen = msg.length * reps;
+        byte[] output = bos.toByteArray();
+        String l = new String(output, 0, strLen);
+        StringBuilder buf = new StringBuilder(strLen);
+        for (int i = 0; i < reps; i++) {
+            buf.append(testString);
+        }
+        assertEquals(buf.toString(), l);
+        for (int i = strLen; i < output.length; i++) {
+            assertEquals(0, output[i]);
+        }
+    }
+
+    @Test
+    public void testPartialWritingThrowsException() {
+        try {
+            testWriteAndPad(512, "hello world!\n", true);
+            fail("Exception for partial write not thrown");
+        } catch (IOException e) {
+            String msg = e.getMessage();
+            assertEquals("exception message",
+                "Failed to write 512 bytes atomically. Only wrote  511", msg);
+        }
+
+    }
+
+    @Test
+    public void testWriteFailsAfterFLClosedThrowsException() {
+        try {
+            FixedLengthBlockOutputStream out = getClosedFLBOS();
+            out.write(1);
+            fail("expected Closed Channel Exception");
+        } catch (IOException e) {
+            assertThat(e, 
IsInstanceOf.instanceOf(ClosedChannelException.class));
+            // expected
+        }
+        try {
+            FixedLengthBlockOutputStream out = getClosedFLBOS();
+            out.write(new byte[] {0,1,2,3});
+            fail("expected Closed Channel Exception");
+        } catch (IOException e) {
+            assertThat(e, 
IsInstanceOf.instanceOf(ClosedChannelException.class));
+            // expected
+        }
+
+        try {
+            FixedLengthBlockOutputStream out = getClosedFLBOS();
+            out.write(ByteBuffer.wrap(new byte[] {0,1,2,3}));
+            fail("expected Closed Channel Exception");
+        } catch (IOException e) {
+            assertThat(e, 
IsInstanceOf.instanceOf(ClosedChannelException.class));
+            // expected
+        }
+
+    }
+
+    private FixedLengthBlockOutputStream getClosedFLBOS() throws IOException {
+        int blockSize = 512;
+        FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream(
+            new MockOutputStream(blockSize, false), blockSize);
+        out.write(1);
+        assertTrue(out.isOpen());
+        out.close();
+        assertFalse(out.isOpen());
+        return out;
+    }
+
+    @Test
+    public void testWriteFailsAfterDestClosedThrowsException() {
+        int blockSize = 2;
+        MockOutputStream mock = new MockOutputStream(blockSize, false);
+        FixedLengthBlockOutputStream out =
+            new FixedLengthBlockOutputStream(mock, blockSize);
+        try {
+            out.write(1);
+            assertTrue(out.isOpen());
+            mock.close();
+            out.write(1);
+            fail("expected IO Exception");
+        } catch (IOException e) {
+            // expected
+        }
+        assertFalse(out.isOpen());
+    }
+
+    @Test
+    public void testWithFileOutputStream() throws IOException {
+        final Path tempFile = Files.createTempFile("xxx", "yyy");
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Files.deleteIfExists(tempFile);
+                } catch (IOException e) {
+                }
+            }
+        });
+        int blockSize = 512;
+        int reps = 1000;
+        OutputStream os = new FileOutputStream(tempFile.toFile());
+        try (FixedLengthBlockOutputStream out = new 
FixedLengthBlockOutputStream(
+            os, blockSize)) {
+            DataOutputStream dos = new DataOutputStream(out);
+            for (int i = 0; i < reps; i++) {
+               dos.writeInt(i);
+            }
+        }
+        long expectedDataSize = reps * 4L;
+        long expectedFileSize = 
(long)Math.ceil(expectedDataSize/(double)blockSize)*blockSize;
+        assertEquals("file size",expectedFileSize, Files.size(tempFile));
+        DataInputStream din = new 
DataInputStream(Files.newInputStream(tempFile));
+        for(int i=0;i<reps;i++) {
+            assertEquals("file int",i,din.readInt());
+        }
+        for(int i=0;i<expectedFileSize - expectedDataSize;i++) {
+            assertEquals(0,din.read());
+        }
+        assertEquals(-1,din.read());
+    }
+
+    private void testBuf(int blockSize, String text) throws IOException {
+        MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, 
false);
+
+        ByteArrayOutputStream bos = mock.bos;
+        byte msg[] = text.getBytes();
+        ByteBuffer buf = getByteBuffer(msg);
+        try (FixedLengthBlockOutputStream out = new 
FixedLengthBlockOutputStream(mock, blockSize)) {
+            out.write(buf);
+        }
+        double v = Math.ceil(msg.length / (double) blockSize) * blockSize;
+        assertEquals("wrong size", (long) v, bos.size());
+        byte[] output = bos.toByteArray();
+        String l = new String(output, 0, msg.length);
+        assertEquals(text, l);
+        for (int i = msg.length; i < bos.size(); i++) {
+            assertEquals(String.format("output[%d]", i), 0, output[i]);
+
+        }
+    }
+
+    private ByteBuffer getByteBuffer(byte[] msg) {
+        int len = msg.length;
+        ByteBuffer buf = ByteBuffer.allocate(len);
+        buf.put(msg);
+        buf.flip();
+        return buf;
+    }
+
+
+    private void testWriteAndPad(int blockSize, String text, boolean 
doPartialWrite)
+        throws IOException {
+        MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, 
doPartialWrite);
+        byte[] msg = text.getBytes(StandardCharsets.US_ASCII);
+
+        ByteArrayOutputStream bos = mock.bos;
+        try (FixedLengthBlockOutputStream out = new 
FixedLengthBlockOutputStream(mock, blockSize)) {
+
+            out.write(msg);
+            assertEquals("no partial write", (msg.length / blockSize) * 
blockSize, bos.size());
+        }
+        validate(blockSize, msg, bos.toByteArray());
+    }
+
+    private void testWriteAndPadToStream(int blockSize, String text, boolean 
doPartialWrite)
+        throws IOException {
+        MockOutputStream mock = new MockOutputStream(blockSize, 
doPartialWrite);
+        byte[] msg = text.getBytes(StandardCharsets.US_ASCII);
+
+        ByteArrayOutputStream bos = mock.bos;
+        try (FixedLengthBlockOutputStream out = new 
FixedLengthBlockOutputStream(mock, blockSize)) {
+            out.write(msg);
+            assertEquals("no partial write", (msg.length / blockSize) * 
blockSize, bos.size());
+        }
+        validate(blockSize, msg, bos.toByteArray());
+
+    }
+
+
+    private void validate(int blockSize, byte[] expectedBytes, byte[] 
actualBytes) {
+        double v = Math.ceil(expectedBytes.length / (double) blockSize) * 
blockSize;
+        assertEquals("wrong size", (long) v, actualBytes.length);
+        assertContainsAtOffset("output", expectedBytes, 0, actualBytes);
+        for (int i = expectedBytes.length; i < actualBytes.length; i++) {
+            assertEquals(String.format("output[%d]", i), 0, actualBytes[i]);
+
+        }
+    }
+
+    private static void assertContainsAtOffset(String msg, byte[] expected, 
int offset,
+        byte[] actual) {
+        assertThat(actual.length, new GreaterOrEqual<>(offset + 
expected.length));
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(String.format("%s ([%d])", msg, i), expected[i], 
actual[i + offset]);
+        }
+    }
+
+    private static class MockOutputStream extends OutputStream {
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        private final int requiredWriteSize;
+        private final boolean doPartialWrite;
+        private AtomicBoolean closed = new AtomicBoolean();
+
+        private MockOutputStream(int requiredWriteSize, boolean 
doPartialWrite) {
+            this.requiredWriteSize = requiredWriteSize;
+            this.doPartialWrite = doPartialWrite;
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            checkIsOpen();
+            assertEquals("write size", requiredWriteSize, len);
+            if (doPartialWrite) {
+                len--;
+            }
+            bos.write(b, off, len);
+        }
+
+        private void checkIsOpen() throws IOException {
+            if (closed.get()) {
+                IOException e = new IOException("Closed");
+                throw e;
+            }
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            checkIsOpen();
+            assertEquals("write size", requiredWriteSize, 1);
+            bos.write(b);
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (closed.compareAndSet(false, true)) {
+                bos.close();
+            }
+        }
+    }
+
+    private static class MockWritableByteChannel implements 
WritableByteChannel {
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        private final int requiredWriteSize;
+        private final boolean doPartialWrite;
+
+        private MockWritableByteChannel(int requiredWriteSize, boolean 
doPartialWrite) {
+            this.requiredWriteSize = requiredWriteSize;
+            this.doPartialWrite = doPartialWrite;
+        }
+
+        @Override
+        public int write(ByteBuffer src) throws IOException {
+            assertEquals("write size", requiredWriteSize, src.remaining());
+            if (doPartialWrite) {
+                src.limit(src.limit() - 1);
+            }
+            int bytesOut = src.remaining();
+            while (src.hasRemaining()) {
+                bos.write(src.get());
+            }
+            return bytesOut;
+        }
+
+        AtomicBoolean closed = new AtomicBoolean();
+
+        @Override
+        public boolean isOpen() {
+            return !closed.get();
+        }
+
+        @Override
+        public void close() throws IOException {
+            closed.compareAndSet(false, true);
+        }
+    }
+}

Reply via email to