COMPRESS-405 Create Fixed Length Block OutputStream / WriteableByteChannel This commit provides a new class that is an OutputStream and WritableByteChannel, and which supports writing to a destination output stream or byte channel in fixed size blocks. Internally, all writes are made using NIO. If the destination is a FileOutputStream the existing channel is used. Other OutputStreams are wrapped with a custom channel implementation which does not attempt to split writes into chunks.
If the target channel fails to write the entire buffer in a single call, an exception is thrown, Incoming data is accumulated in a ByteBuffer until a complete block is ready, then written to the target. If WritableByteChannel::write(ByteBuffer) method is called, the code will attempt to avoid copying data into the buffer if the buffer is empty, and a complete block is available. The class and UnitTest are in compress/utils . This is a MINOR change - thus the version number for the package should be increated to 1.15.0. Signed-off-by: Simon Spero <sesunc...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/commons-compress/repo Commit: http://git-wip-us.apache.org/repos/asf/commons-compress/commit/4d00e14e Tree: http://git-wip-us.apache.org/repos/asf/commons-compress/tree/4d00e14e Diff: http://git-wip-us.apache.org/repos/asf/commons-compress/diff/4d00e14e Branch: refs/heads/master Commit: 4d00e14ec8d1e83533a8aeb788437568b417ca88 Parents: 115a76d Author: Simon Spero <sesunc...@gmail.com> Authored: Sun Jun 11 12:21:16 2017 -0400 Committer: Stefan Bodewig <bode...@apache.org> Committed: Thu Jun 15 18:18:04 2017 +0200 ---------------------------------------------------------------------- .../utils/FixedLengthBlockOutputStream.java | 209 +++++++++++ .../utils/FixedLengthBlockOutputStreamTest.java | 365 +++++++++++++++++++ 2 files changed, 574 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/commons-compress/blob/4d00e14e/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java b/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java new file mode 100644 index 0000000..bffab9e --- /dev/null +++ b/src/main/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStream.java @@ -0,0 +1,209 @@ +package org.apache.commons.compress.utils; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.atomic.AtomicBoolean; + +public class FixedLengthBlockOutputStream extends OutputStream implements WritableByteChannel, + AutoCloseable { + + private final WritableByteChannel out; + private final int blockSize; + private final ByteBuffer buffer; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public FixedLengthBlockOutputStream(OutputStream os, int blockSize) { + if (os instanceof FileOutputStream) { + FileOutputStream fileOutputStream = (FileOutputStream) os; + out = fileOutputStream.getChannel(); + buffer = ByteBuffer.allocateDirect(blockSize); + } else { + out = new BufferAtATimeOutputChannel(os); + buffer = ByteBuffer.allocate(blockSize); + } + this.blockSize = blockSize; + } + + public FixedLengthBlockOutputStream(WritableByteChannel out, int blockSize) { + this.out = out; + this.blockSize = blockSize; + this.buffer = ByteBuffer.allocateDirect(blockSize); + } + + private void maybeFlush() throws IOException { + if (!buffer.hasRemaining()) { + writeBlock(); + } + } + + private void writeBlock() throws IOException { + buffer.flip(); + int i = out.write(buffer); + boolean hasRemaining = buffer.hasRemaining(); + if (i != blockSize || hasRemaining) { + String msg = String + .format("Failed to write %,d bytes atomically. Only wrote %,d", + blockSize, i); + throw new IOException(msg); + } + buffer.clear(); + } + + @Override + public void write(int b) throws IOException { + if(!isOpen()) { + throw new ClosedChannelException(); + } + buffer.put((byte) b); + maybeFlush(); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if(!isOpen()) { + throw new ClosedChannelException(); + } + while (len > 0) { + int n = Math.min(len, buffer.remaining()); + buffer.put(b, off, n); + maybeFlush(); + len -= n; + off += n; + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + if(!isOpen()) { + throw new ClosedChannelException(); + } + int srcRemaining = src.remaining(); + + if (srcRemaining < buffer.remaining()) { + // if don't have enough bytes in src to fill up a block we must buffer + buffer.put(src); + } else { + int srcLeft = srcRemaining; + int savedLimit = src.limit(); + // If we're not at the start of buffer, we have some bytes already buffered + // fill up the reset of buffer and write the block. + if (buffer.position() != 0) { + int n = buffer.remaining(); + src.limit(src.position() + n); + buffer.put(src); + writeBlock(); + srcLeft -= n; + } + // whilst we have enough bytes in src for complete blocks, + // write them directly from src without copying them to buffer + while (srcLeft >= blockSize) { + src.limit(src.position() + blockSize); + out.write(src); + srcLeft -= blockSize; + } + // copy any remaining bytes into buffer + src.limit(savedLimit); + buffer.put(src); + } + return srcRemaining; + } + + @Override + public boolean isOpen() { + if(!out.isOpen()) { + closed.set(true); + } + return !closed.get(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (buffer.position() != 0) { + padLastBlock(); + writeBlock(); + } + out.close(); + } + } + + private void padLastBlock() { + buffer.order(ByteOrder.nativeOrder()); + int bytesToWrite = buffer.remaining(); + if (bytesToWrite > 8) { + int align = (buffer.position() & 7); + if (align != 0) { + int limit = 8 - align; + for (int i = 0; i < limit; i++) { + buffer.put((byte) 0); + } + bytesToWrite -= limit; + } + + while (bytesToWrite >= 8) { + buffer.putLong(0L); + bytesToWrite -= 8; + } + } + while (buffer.hasRemaining()) { + buffer.put((byte) 0); + } + } + + /** + * Helper class to provide channel wrapper for arbitrary output stream that doesn't alter the + * size of writes. We can't use Channels.newChannel, because for non FileOutputStreams, it + * breaks up writes into 8KB max chunks. Since the purpose of this class is to always write + * complete blocks, we need to write a simple class to take care of it. + */ + private static class BufferAtATimeOutputChannel implements WritableByteChannel { + + private final OutputStream out; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private BufferAtATimeOutputChannel(OutputStream out) { + this.out = out; + } + + @Override + public int write(ByteBuffer buffer) throws IOException { + assert isOpen() : "somehow trying to write to closed BufferAtATimeOutputChannel"; + assert buffer.hasArray() : + "direct buffer somehow written to BufferAtATimeOutputChannel"; + + try { + int pos = buffer.position(); + int len = buffer.limit() - pos; + out.write(buffer.array(), buffer.arrayOffset() + pos, len); + buffer.position(buffer.limit()); + return len; + } catch (IOException e) { + try { + close(); + } finally { + throw e; + } + } + } + + @Override + public boolean isOpen() { + return !closed.get(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + out.close(); + } + } + + } + + +} http://git-wip-us.apache.org/repos/asf/commons-compress/blob/4d00e14e/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java b/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java new file mode 100644 index 0000000..8f07acd --- /dev/null +++ b/src/test/java/org/apache/commons/compress/utils/FixedLengthBlockOutputStreamTest.java @@ -0,0 +1,365 @@ +package org.apache.commons.compress.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; +import org.hamcrest.core.IsInstanceOf; +import org.junit.Test; +import org.mockito.internal.matchers.GreaterOrEqual; + +public class FixedLengthBlockOutputStreamTest { + + @Test + public void testSmallWrite() throws IOException { + testWriteAndPad(10240, "hello world!\n", false); + testWriteAndPad(512, "hello world!\n", false); + testWriteAndPad(11, "hello world!\n", false); + testWriteAndPad(3, "hello world!\n", false); + } + + @Test + public void testSmallWriteToStream() throws IOException { + testWriteAndPadToStream(10240, "hello world!\n", false); + testWriteAndPadToStream(512, "hello world!\n", false); + testWriteAndPadToStream(11, "hello world!\n", false); + testWriteAndPadToStream(3, "hello world!\n", false); + } + + @Test + public void testWriteSingleBytes() throws IOException { + int blockSize = 4; + MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, false); + ByteArrayOutputStream bos = mock.bos; + String text = "hello world avengers"; + byte msg[] = text.getBytes(); + int len = msg.length; + try (FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream(mock, blockSize)) { + for (int i = 0; i < len; i++) { + out.write(msg[i]); + } + } + byte[] output = bos.toByteArray(); + + validate(blockSize, msg, output); + } + + + @Test + public void testWriteBuf() throws IOException { + String hwa = "hello world avengers"; + testBuf(4, hwa); + testBuf(512, hwa); + testBuf(10240, hwa); + testBuf(11, hwa + hwa + hwa); + } + + @Test + public void testMultiWriteBuf() throws IOException { + int blockSize = 13; + MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, false); + String testString = "hello world"; + byte msg[] = testString.getBytes(); + int reps = 17; + + try (FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream(mock, blockSize)) { + for (int i = 0; i < reps; i++) { + ByteBuffer buf = getByteBuffer(msg); + out.write(buf); + } + } + ByteArrayOutputStream bos = mock.bos; + double v = Math.ceil((reps * msg.length) / (double) blockSize) * blockSize; + assertEquals("wrong size", (long) v, bos.size()); + int strLen = msg.length * reps; + byte[] output = bos.toByteArray(); + String l = new String(output, 0, strLen); + StringBuilder buf = new StringBuilder(strLen); + for (int i = 0; i < reps; i++) { + buf.append(testString); + } + assertEquals(buf.toString(), l); + for (int i = strLen; i < output.length; i++) { + assertEquals(0, output[i]); + } + } + + @Test + public void testPartialWritingThrowsException() { + try { + testWriteAndPad(512, "hello world!\n", true); + fail("Exception for partial write not thrown"); + } catch (IOException e) { + String msg = e.getMessage(); + assertEquals("exception message", + "Failed to write 512 bytes atomically. Only wrote 511", msg); + } + + } + + @Test + public void testWriteFailsAfterFLClosedThrowsException() { + try { + FixedLengthBlockOutputStream out = getClosedFLBOS(); + out.write(1); + fail("expected Closed Channel Exception"); + } catch (IOException e) { + assertThat(e, IsInstanceOf.instanceOf(ClosedChannelException.class)); + // expected + } + try { + FixedLengthBlockOutputStream out = getClosedFLBOS(); + out.write(new byte[] {0,1,2,3}); + fail("expected Closed Channel Exception"); + } catch (IOException e) { + assertThat(e, IsInstanceOf.instanceOf(ClosedChannelException.class)); + // expected + } + + try { + FixedLengthBlockOutputStream out = getClosedFLBOS(); + out.write(ByteBuffer.wrap(new byte[] {0,1,2,3})); + fail("expected Closed Channel Exception"); + } catch (IOException e) { + assertThat(e, IsInstanceOf.instanceOf(ClosedChannelException.class)); + // expected + } + + } + + private FixedLengthBlockOutputStream getClosedFLBOS() throws IOException { + int blockSize = 512; + FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream( + new MockOutputStream(blockSize, false), blockSize); + out.write(1); + assertTrue(out.isOpen()); + out.close(); + assertFalse(out.isOpen()); + return out; + } + + @Test + public void testWriteFailsAfterDestClosedThrowsException() { + int blockSize = 2; + MockOutputStream mock = new MockOutputStream(blockSize, false); + FixedLengthBlockOutputStream out = + new FixedLengthBlockOutputStream(mock, blockSize); + try { + out.write(1); + assertTrue(out.isOpen()); + mock.close(); + out.write(1); + fail("expected IO Exception"); + } catch (IOException e) { + // expected + } + assertFalse(out.isOpen()); + } + + @Test + public void testWithFileOutputStream() throws IOException { + final Path tempFile = Files.createTempFile("xxx", "yyy"); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + Files.deleteIfExists(tempFile); + } catch (IOException e) { + } + } + }); + int blockSize = 512; + int reps = 1000; + OutputStream os = new FileOutputStream(tempFile.toFile()); + try (FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream( + os, blockSize)) { + DataOutputStream dos = new DataOutputStream(out); + for (int i = 0; i < reps; i++) { + dos.writeInt(i); + } + } + long expectedDataSize = reps * 4L; + long expectedFileSize = (long)Math.ceil(expectedDataSize/(double)blockSize)*blockSize; + assertEquals("file size",expectedFileSize, Files.size(tempFile)); + DataInputStream din = new DataInputStream(Files.newInputStream(tempFile)); + for(int i=0;i<reps;i++) { + assertEquals("file int",i,din.readInt()); + } + for(int i=0;i<expectedFileSize - expectedDataSize;i++) { + assertEquals(0,din.read()); + } + assertEquals(-1,din.read()); + } + + private void testBuf(int blockSize, String text) throws IOException { + MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, false); + + ByteArrayOutputStream bos = mock.bos; + byte msg[] = text.getBytes(); + ByteBuffer buf = getByteBuffer(msg); + try (FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream(mock, blockSize)) { + out.write(buf); + } + double v = Math.ceil(msg.length / (double) blockSize) * blockSize; + assertEquals("wrong size", (long) v, bos.size()); + byte[] output = bos.toByteArray(); + String l = new String(output, 0, msg.length); + assertEquals(text, l); + for (int i = msg.length; i < bos.size(); i++) { + assertEquals(String.format("output[%d]", i), 0, output[i]); + + } + } + + private ByteBuffer getByteBuffer(byte[] msg) { + int len = msg.length; + ByteBuffer buf = ByteBuffer.allocate(len); + buf.put(msg); + buf.flip(); + return buf; + } + + + private void testWriteAndPad(int blockSize, String text, boolean doPartialWrite) + throws IOException { + MockWritableByteChannel mock = new MockWritableByteChannel(blockSize, doPartialWrite); + byte[] msg = text.getBytes(StandardCharsets.US_ASCII); + + ByteArrayOutputStream bos = mock.bos; + try (FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream(mock, blockSize)) { + + out.write(msg); + assertEquals("no partial write", (msg.length / blockSize) * blockSize, bos.size()); + } + validate(blockSize, msg, bos.toByteArray()); + } + + private void testWriteAndPadToStream(int blockSize, String text, boolean doPartialWrite) + throws IOException { + MockOutputStream mock = new MockOutputStream(blockSize, doPartialWrite); + byte[] msg = text.getBytes(StandardCharsets.US_ASCII); + + ByteArrayOutputStream bos = mock.bos; + try (FixedLengthBlockOutputStream out = new FixedLengthBlockOutputStream(mock, blockSize)) { + out.write(msg); + assertEquals("no partial write", (msg.length / blockSize) * blockSize, bos.size()); + } + validate(blockSize, msg, bos.toByteArray()); + + } + + + private void validate(int blockSize, byte[] expectedBytes, byte[] actualBytes) { + double v = Math.ceil(expectedBytes.length / (double) blockSize) * blockSize; + assertEquals("wrong size", (long) v, actualBytes.length); + assertContainsAtOffset("output", expectedBytes, 0, actualBytes); + for (int i = expectedBytes.length; i < actualBytes.length; i++) { + assertEquals(String.format("output[%d]", i), 0, actualBytes[i]); + + } + } + + private static void assertContainsAtOffset(String msg, byte[] expected, int offset, + byte[] actual) { + assertThat(actual.length, new GreaterOrEqual<>(offset + expected.length)); + for (int i = 0; i < expected.length; i++) { + assertEquals(String.format("%s ([%d])", msg, i), expected[i], actual[i + offset]); + } + } + + private static class MockOutputStream extends OutputStream { + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + private final int requiredWriteSize; + private final boolean doPartialWrite; + private AtomicBoolean closed = new AtomicBoolean(); + + private MockOutputStream(int requiredWriteSize, boolean doPartialWrite) { + this.requiredWriteSize = requiredWriteSize; + this.doPartialWrite = doPartialWrite; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkIsOpen(); + assertEquals("write size", requiredWriteSize, len); + if (doPartialWrite) { + len--; + } + bos.write(b, off, len); + } + + private void checkIsOpen() throws IOException { + if (closed.get()) { + IOException e = new IOException("Closed"); + throw e; + } + } + + @Override + public void write(int b) throws IOException { + checkIsOpen(); + assertEquals("write size", requiredWriteSize, 1); + bos.write(b); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + bos.close(); + } + } + } + + private static class MockWritableByteChannel implements WritableByteChannel { + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + private final int requiredWriteSize; + private final boolean doPartialWrite; + + private MockWritableByteChannel(int requiredWriteSize, boolean doPartialWrite) { + this.requiredWriteSize = requiredWriteSize; + this.doPartialWrite = doPartialWrite; + } + + @Override + public int write(ByteBuffer src) throws IOException { + assertEquals("write size", requiredWriteSize, src.remaining()); + if (doPartialWrite) { + src.limit(src.limit() - 1); + } + int bytesOut = src.remaining(); + while (src.hasRemaining()) { + bos.write(src.get()); + } + return bytesOut; + } + + AtomicBoolean closed = new AtomicBoolean(); + + @Override + public boolean isOpen() { + return !closed.get(); + } + + @Override + public void close() throws IOException { + closed.compareAndSet(false, true); + } + } +}