This is an automated email from the ASF dual-hosted git repository. bodewig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-compress.git
The following commit(s) were added to refs/heads/master by this push: new e959762 COMPRESS-231 add a concatenated SeekableByteChannel based on code by Tim Underwood e959762 is described below commit e959762ed200d9bc03fd858b8c96e35f95cada90 Author: Stefan Bodewig <bode...@apache.org> AuthorDate: Sun Aug 18 16:58:21 2019 +0200 COMPRESS-231 add a concatenated SeekableByteChannel based on code by Tim Underwood --- .../utils/MultiReadOnlySeekableByteChannel.java | 219 +++++++++++++++++++++ .../MultiReadOnlySeekableByteChannelTest.java | 217 ++++++++++++++++++++ 2 files changed, 436 insertions(+) diff --git a/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java b/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java new file mode 100644 index 0000000..fd78f9d --- /dev/null +++ b/src/main/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannel.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.commons.compress.utils; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.NonWritableChannelException; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Read-Only Implementation of {@link SeekableByteChannel} that + * concatenates a collection of other {@link SeekableByteChannel}s. + * + * <p>This is a lose port of <a + * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">MultiReadOnlySeekableByteChannel</a> + * by Tim Underwood.</p> + * + * @since 1.19 + */ +public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel { + + private final List<SeekableByteChannel> channels; + private long globalPosition; + private int currentChannelIdx; + + /** + * Concatenates the given channels. + * + * @param channels the channels to concatenate + * @throws NullPointerException if channels is null + */ + public MultiReadOnlySeekableByteChannel(List<SeekableByteChannel> channels) { + this.channels = Collections.unmodifiableList(new ArrayList<>( + Objects.requireNonNull(channels, "channels must not be null"))); + } + + @Override + public synchronized int read(ByteBuffer dst) throws IOException { + if (!isOpen()) { + throw new ClosedChannelException(); + } + + int totalBytesRead = 0; + while (dst.hasRemaining() && currentChannelIdx < channels.size()) { + final SeekableByteChannel currentChannel = channels.get(currentChannelIdx); + final int newBytesRead = currentChannel.read(dst); + if (newBytesRead == -1) { + // EOF for this channel -- advance to next channel idx + currentChannelIdx += 1; + continue; + } + if (currentChannel.position() >= currentChannel.size()) { + // we are at the end of the current channel + currentChannelIdx++; + } + totalBytesRead += newBytesRead; + } + if (totalBytesRead > 0) { + globalPosition += totalBytesRead; + return totalBytesRead; + } + return -1; + } + + @Override + public void close() throws IOException { + IOException first = null; + for (SeekableByteChannel ch : channels) { + try { + ch.close(); + } catch (IOException ex) { + if (first == null) { + first = ex; + } + } + } + if (first != null) { + throw new IOException("failed to close wrapped channel", first); + } + } + + @Override + public boolean isOpen() { + for (SeekableByteChannel ch : channels) { + if (!ch.isOpen()) { + return false; + } + } + return true; + } + + @Override + public long position() { + return globalPosition; + } + + @Override + public long size() throws IOException { + long acc = 0; + for (SeekableByteChannel ch : channels) { + acc += ch.size(); + } + return acc; + } + + /** + * @throws NonWritableChannelException since this implementation is read-only. + */ + @Override + public SeekableByteChannel truncate(long size) { + throw new NonWritableChannelException(); + } + + /** + * @throws NonWritableChannelException since this implementation is read-only. + */ + @Override + public int write(ByteBuffer src) { + throw new NonWritableChannelException(); + } + + @Override + public synchronized SeekableByteChannel position(long newPosition) throws IOException { + if (newPosition < 0) { + throw new IllegalArgumentException("Negative position: " + newPosition); + } + if (!isOpen()) { + throw new ClosedChannelException(); + } + + globalPosition = newPosition; + + long pos = newPosition; + + for (int i = 0; i < channels.size(); i++) { + final SeekableByteChannel currentChannel = channels.get(i); + final long size = currentChannel.size(); + + final long newChannelPos; + if (pos == -1L) { + // Position is already set for the correct channel, + // the rest of the channels get reset to 0 + newChannelPos = 0; + } else if (pos <= size) { + // This channel is where we want to be + currentChannelIdx = i; + long tmp = pos; + pos = -1L; // Mark pos as already being set + newChannelPos = tmp; + } else { + // newPosition is past this channel. Set channel + // position to the end and substract channel size from + // pos + pos -= size; + newChannelPos = size; + } + + currentChannel.position(newChannelPos); + } + return this; + } + + /** + * Concatenates the given channels. + * + * @param channels the channels to concatenate + * @throws NullPointerException if channels is null + */ + public static SeekableByteChannel forSeekableByteChannels(SeekableByteChannel... channels) { + if (Objects.requireNonNull(channels, "channels must not be null").length == 1) { + return channels[0]; + } + return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels)); + } + + /** + * Concatenates the given files. + * + * @param files the files to concatenate + * @throws NullPointerException if files is null + * @throws IOException if opening a channel for one of the files fails + */ + public static SeekableByteChannel forFiles(File... files) throws IOException { + List<SeekableByteChannel> channels = new ArrayList<>(); + for (File f : Objects.requireNonNull(files, "files must not be null")) { + channels.add(Files.newByteChannel(f.toPath(), StandardOpenOption.READ)); + } + if (channels.size() == 1) { + return channels.get(0); + } + return new MultiReadOnlySeekableByteChannel(channels); + } + +} diff --git a/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java b/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java new file mode 100644 index 0000000..0892077 --- /dev/null +++ b/src/test/java/org/apache/commons/compress/utils/MultiReadOnlySeekableByteChannelTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.commons.compress.utils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Initially based on <a + * href="https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/test/scala/fm/common/TestMultiReadOnlySeekableByteChannel.scala">TestMultiReadOnlySeekableByteChannel.scala</a> + * by Tim Underwood. + */ +public class MultiReadOnlySeekableByteChannelTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void constructorThrowsOnNullArg() { + thrown.expect(NullPointerException.class); + new MultiReadOnlySeekableByteChannel(null); + } + + @Test + public void forSeekableByteChannelsThrowsOnNullArg() { + thrown.expect(NullPointerException.class); + MultiReadOnlySeekableByteChannel.forSeekableByteChannels(null); + } + + @Test + public void forFilesThrowsOnNullArg() throws IOException { + thrown.expect(NullPointerException.class); + MultiReadOnlySeekableByteChannel.forFiles(null); + } + + @Test + public void forSeekableByteChannelsReturnsIdentityForSingleElement() { + final SeekableByteChannel e = makeEmpty(); + final SeekableByteChannel m = MultiReadOnlySeekableByteChannel.forSeekableByteChannels(e); + Assert.assertSame(e, m); + } + + @Test + public void referenceBehaviorForEmptyChannel() throws IOException { + checkEmpty(makeEmpty()); + } + + @Test + public void twoEmptyChannelsConcatenateAsEmptyChannel() throws IOException { + checkEmpty(MultiReadOnlySeekableByteChannel.forSeekableByteChannels(makeEmpty(), makeEmpty())); + } + + @Test + public void checkForSingleByte() throws IOException { + check(new byte[] { 0 }); + } + + @Test + public void checkForString() throws IOException { + check("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + .getBytes(StandardCharsets.UTF_8)); + } + + @Test + public void verifyGrouped() { + Assert.assertArrayEquals(new byte[][] { + new byte[] { 1, 2, 3, }, + new byte[] { 4, 5, 6, }, + new byte[] { 7, }, + }, grouped(new byte[] { 1, 2, 3, 4, 5, 6, 7 }, 3)); + Assert.assertArrayEquals(new byte[][] { + new byte[] { 1, 2, 3, }, + new byte[] { 4, 5, 6, }, + }, grouped(new byte[] { 1, 2, 3, 4, 5, 6 }, 3)); + Assert.assertArrayEquals(new byte[][] { + new byte[] { 1, 2, 3, }, + new byte[] { 4, 5, }, + }, grouped(new byte[] { 1, 2, 3, 4, 5, }, 3)); + } + + private SeekableByteChannel makeEmpty() { + return makeSingle(new byte[0]); + } + + private SeekableByteChannel makeSingle(byte[] arr) { + return new SeekableInMemoryByteChannel(arr); + } + + private SeekableByteChannel makeMulti(byte[][] arr) { + SeekableByteChannel[] s = new SeekableByteChannel[arr.length]; + for (int i = 0; i < s.length; i++) { + s[i] = makeSingle(arr[i]); + } + return MultiReadOnlySeekableByteChannel.forSeekableByteChannels(s); + } + + private void checkEmpty(SeekableByteChannel channel) throws IOException { + ByteBuffer buf = ByteBuffer.allocate(10); + + Assert.assertTrue(channel.isOpen()); + Assert.assertEquals(0, channel.size()); + Assert.assertEquals(0, channel.position()); + Assert.assertEquals(-1, channel.read(buf)); + + channel.position(5); + Assert.assertEquals(-1, channel.read(buf)); + + channel.close(); + Assert.assertFalse(channel.isOpen()); + + try { + channel.read(buf); + Assert.fail("expected a ClosedChannelException"); + } catch (ClosedChannelException expected) { + } + try { + channel.position(100); + Assert.fail("expected a ClosedChannelException"); + } catch (ClosedChannelException expected) { + } + } + + private void check(final byte[] expected) throws IOException { + for (int channelSize = 1; channelSize <= expected.length; channelSize++) { + // Sanity check that all operations work for SeekableInMemoryByteChannel + check(expected, makeSingle(expected)); + // Checks against our MultiReadOnlySeekableByteChannel instance + check(expected, makeMulti(grouped(expected, channelSize))); + } + } + + private void check(final byte[] expected, SeekableByteChannel channel) throws IOException { + for (int readBufferSize = 1; readBufferSize <= expected.length + 5; readBufferSize++) { + check(expected, channel, readBufferSize); + } + } + + private void check(final byte[] expected, final SeekableByteChannel channel, final int readBufferSize) + throws IOException { + Assert.assertTrue("readBufferSize " + readBufferSize, channel.isOpen()); + Assert.assertEquals("readBufferSize " + readBufferSize, expected.length, channel.size()); + channel.position(0); + Assert.assertEquals("readBufferSize " + readBufferSize, 0, channel.position()); + + // Will hold the entire result that we read + final ByteBuffer resultBuffer = ByteBuffer.allocate(expected.length + 100); + + // Used for each read() method call + final ByteBuffer buf = ByteBuffer.allocate(readBufferSize); + + int bytesRead = channel.read(buf); + + while (bytesRead != -1) { + int remaining = buf.remaining(); + + buf.flip(); + resultBuffer.put(buf); + buf.clear(); + bytesRead = channel.read(buf); + + // If this isn't the last read() then we expect the buf + // ByteBuffer to be full (i.e. have no remaining) + if (resultBuffer.position() < expected.length) { + Assert.assertEquals("readBufferSize " + readBufferSize, 0, remaining); + } + + if (bytesRead == -1) { + Assert.assertEquals("readBufferSize " + readBufferSize, 0, buf.position()); + } else { + Assert.assertEquals("readBufferSize " + readBufferSize, bytesRead, buf.position()); + } + } + + resultBuffer.flip(); + byte[] arr = new byte[resultBuffer.remaining()]; + resultBuffer.get(arr); + Assert.assertArrayEquals("readBufferSize " + readBufferSize, expected, arr); + } + + private byte[][] grouped(final byte[] input, final int chunkSize) { + List<byte[]> groups = new ArrayList<>(); + int idx = 0; + for (; idx + chunkSize <= input.length; idx += chunkSize) { + groups.add(Arrays.copyOfRange(input, idx, idx + chunkSize)); + } + if (idx < input.length) { + groups.add(Arrays.copyOfRange(input, idx, input.length)); + } + return groups.toArray(new byte[0][]); + } +}