This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-vfs.git
commit 9fb261e7080db1516c5882c878b5acf07ab8643e Author: Gary Gregory <gardgreg...@gmail.com> AuthorDate: Mon Aug 19 15:07:18 2019 -0700 - [VFS-726] getInputStream(int bufferSize) on SftpFileObject effectively ignores buffer size. - [VFS-704] Some providers wrap their input/output streams twice in a BufferedInputStream. --- .../commons/vfs2/provider/DefaultFileContent.java | 76 ++++++++- .../vfs2/provider/FileContentThreadData.java | 190 +++++++++++---------- .../commons/vfs2/util/RawMonitorInputStream.java | 175 +++++++++++++++++++ src/changes/changes.xml | 3 + 4 files changed, 344 insertions(+), 100 deletions(-) diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java index 3e1f184..d1f3508 100644 --- a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java +++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/DefaultFileContent.java @@ -16,6 +16,7 @@ */ package org.apache.commons.vfs2.provider; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -34,6 +35,7 @@ import org.apache.commons.vfs2.util.MonitorInputStream; import org.apache.commons.vfs2.util.MonitorOutputStream; import org.apache.commons.vfs2.util.MonitorRandomAccessContent; import org.apache.commons.vfs2.util.RandomAccessMode; +import org.apache.commons.vfs2.util.RawMonitorInputStream; /** * The content of a file. @@ -444,10 +446,15 @@ public final class DefaultFileContent implements FileContent { // Close the input stream while (fileContentThreadData.getInstrsSize() > 0) { - final FileContentInputStream inputStream = (FileContentInputStream) fileContentThreadData - .removeInstr(0); + final InputStream inputStream = fileContentThreadData.removeInputStream(0); try { - inputStream.close(); + if (inputStream instanceof FileContentInputStream) { + ((FileContentInputStream) inputStream).close(); + } else if (inputStream instanceof RawFileContentInputStream) { + ((RawFileContentInputStream) inputStream).close(); + } else { + caught = new FileSystemException("Unsupported InputStream type: " + inputStream); + } } catch (final FileSystemException ex) { caught = ex; @@ -490,14 +497,29 @@ public final class DefaultFileContent implements FileContent { * if (getThreadData().getState() == STATE_WRITING || getThreadData().getState() == STATE_RANDOM_ACCESS) { throw * new FileSystemException("vfs.provider/read-in-use.error", file); } */ - // Get the raw input stream - final InputStream inputStream = bufferSize == 0 ? fileObject.getInputStream() + // @formatter:off + final InputStream inputStream = bufferSize == 0 + ? fileObject.getInputStream() : fileObject.getInputStream(bufferSize); + // @formatter:on // Double buffering may take place here. - final InputStream wrappedInputStream = bufferSize == 0 +// final InputStream wrappedInputStream = bufferSize == 0 +// ? new FileContentInputStream(fileObject, inputStream) +// : new FileContentInputStream(fileObject, inputStream, bufferSize); + + InputStream wrappedInputStream; + if (inputStream instanceof BufferedInputStream) { + // Don't double buffer. + wrappedInputStream = new RawFileContentInputStream(fileObject, inputStream); + } else + { + // @formatter:off + wrappedInputStream = bufferSize == 0 ? new FileContentInputStream(fileObject, inputStream) : new FileContentInputStream(fileObject, inputStream, bufferSize); + // @formatter:on + } getOrCreateThreadData().addInstr(wrappedInputStream); streamOpened(); @@ -530,7 +552,7 @@ public final class DefaultFileContent implements FileContent { /** * Handles the end of input stream. */ - private void endInput(final FileContentInputStream instr) { + private void endInput(final InputStream instr) { final FileContentThreadData fileContentThreadData = threadLocal.get(); if (fileContentThreadData != null) { fileContentThreadData.removeInstr(instr); @@ -646,6 +668,46 @@ public final class DefaultFileContent implements FileContent { } /** + * An input stream for reading content. Provides buffering, and end-of-stream monitoring. + * <p> + * This is the same as {@link FileContentInputStream} but without the buffering. + * </p> + */ + private final class RawFileContentInputStream extends RawMonitorInputStream { + // avoid gc + private final FileObject file; + + RawFileContentInputStream(final FileObject file, final InputStream instr) { + super(instr); + this.file = file; + } + + /** + * Closes this input stream. + */ + @Override + public void close() throws FileSystemException { + try { + super.close(); + } catch (final IOException e) { + throw new FileSystemException("vfs.provider/close-instr.error", file, e); + } + } + + /** + * Called after the stream has been closed. + */ + @Override + protected void onClose() throws IOException { + try { + super.onClose(); + } finally { + endInput(this); + } + } + } + + /** * An input/output stream for reading/writing content on random positions */ private final class FileRandomAccessContent extends MonitorRandomAccessContent { diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java index d736da5..2f08b0f 100644 --- a/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java +++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/provider/FileContentThreadData.java @@ -1,93 +1,97 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.commons.vfs2.provider; - -import java.io.InputStream; -import java.util.ArrayList; - -import org.apache.commons.vfs2.FileSystemException; -import org.apache.commons.vfs2.RandomAccessContent; - -/** - * Holds the data which needs to be local to the current thread - */ -class FileContentThreadData { - - // private int state = DefaultFileContent.STATE_CLOSED; - - private final ArrayList<InputStream> inputStreamList = new ArrayList<>(); - private final ArrayList<RandomAccessContent> randomAccessContentList = new ArrayList<>(); - private DefaultFileContent.FileContentOutputStream outputStream; - - FileContentThreadData() { - } - - /* - * int getState() { return state; } - * - * void setState(int state) { this.state = state; } - */ - - void addInstr(final InputStream inputStream) { - this.inputStreamList.add(inputStream); - } - - void setOutstr(final DefaultFileContent.FileContentOutputStream outputStream) { - this.outputStream = outputStream; - } - - DefaultFileContent.FileContentOutputStream getOutstr() { - return this.outputStream; - } - - void addRastr(final RandomAccessContent randomAccessContent) { - this.randomAccessContentList.add(randomAccessContent); - } - - int getInstrsSize() { - return this.inputStreamList.size(); - } - - public Object removeInstr(final int pos) { - return this.inputStreamList.remove(pos); - } - - public void removeInstr(final InputStream inputStream) { - this.inputStreamList.remove(inputStream); - } - - public Object removeRastr(final int pos) { - return this.randomAccessContentList.remove(pos); - } - - public void removeRastr(final RandomAccessContent randomAccessContent) { - this.randomAccessContentList.remove(randomAccessContent); - } - - public boolean hasStreams() { - return inputStreamList.size() > 0 || outputStream != null || randomAccessContentList.size() > 0; - } - - public void closeOutstr() throws FileSystemException { - outputStream.close(); - outputStream = null; - } - - int getRastrsSize() { - return randomAccessContentList.size(); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.vfs2.provider; + +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.RandomAccessContent; + +/** + * Holds the data which needs to be local to the current thread + */ +class FileContentThreadData { + + // private int state = DefaultFileContent.STATE_CLOSED; + + private final ArrayList<InputStream> inputStreamList = new ArrayList<>(); + private final ArrayList<RandomAccessContent> randomAccessContentList = new ArrayList<>(); + private DefaultFileContent.FileContentOutputStream outputStream; + + FileContentThreadData() { + } + + /* + * int getState() { return state; } + * + * void setState(int state) { this.state = state; } + */ + + void addInstr(final InputStream inputStream) { + this.inputStreamList.add(inputStream); + } + + void setOutstr(final DefaultFileContent.FileContentOutputStream outputStream) { + this.outputStream = outputStream; + } + + DefaultFileContent.FileContentOutputStream getOutstr() { + return this.outputStream; + } + + void addRastr(final RandomAccessContent randomAccessContent) { + this.randomAccessContentList.add(randomAccessContent); + } + + int getInstrsSize() { + return this.inputStreamList.size(); + } + + public Object removeInstr(final int pos) { + return this.inputStreamList.remove(pos); + } + + InputStream removeInputStream(final int pos) { + return this.inputStreamList.remove(pos); + } + + public void removeInstr(final InputStream inputStream) { + this.inputStreamList.remove(inputStream); + } + + public Object removeRastr(final int pos) { + return this.randomAccessContentList.remove(pos); + } + + public void removeRastr(final RandomAccessContent randomAccessContent) { + this.randomAccessContentList.remove(randomAccessContent); + } + + public boolean hasStreams() { + return inputStreamList.size() > 0 || outputStream != null || randomAccessContentList.size() > 0; + } + + public void closeOutstr() throws FileSystemException { + outputStream.close(); + outputStream = null; + } + + int getRastrsSize() { + return randomAccessContentList.size(); + } +} diff --git a/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java b/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java new file mode 100644 index 0000000..42b682a --- /dev/null +++ b/commons-vfs2/src/main/java/org/apache/commons/vfs2/util/RawMonitorInputStream.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.vfs2.util; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An InputStream that provides end-of-stream monitoring. + * <p> + * This is the same as {@link MonitorInputStream} but without the buffering. + * </p> + * + * @since 2.5 + */ +public class RawMonitorInputStream extends FilterInputStream { + + private static final int EOF_CHAR = -1; + private final AtomicBoolean finished = new AtomicBoolean(false); + private final AtomicLong atomicCount = new AtomicLong(0); + +// @Override +// public synchronized void reset() throws IOException { +// if (!finished.get()) { +// super.reset(); +// } +// } +// +// @Override +// public synchronized long skip(long n) throws IOException { +// if (finished.get()) { +// return 0; +// } +// return super.skip(n); +// } + + /** + * Constructs a MonitorInputStream from the passed InputStream + * + * @param inputStream The input stream to wrap. + */ + public RawMonitorInputStream(final InputStream inputStream) { + super(inputStream); + } + + /** + * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried. + * + * @return The number of bytes that are available. + * @throws IOException if an error occurs. + */ + @Override + public synchronized int available() throws IOException { + if (finished.get()) { + return 0; + } + + return super.available(); + } + + /** + * Reads a character. + * + * @return The character that was read as an integer. + * @throws IOException if an error occurs. + */ + @Override + public int read() throws IOException { // lgtm [java/non-sync-override] + if (finished.get()) { + return EOF_CHAR; + } + + final int ch = super.read(); + if (ch != EOF_CHAR) { + atomicCount.incrementAndGet(); + } + + return ch; + } + + /** + * Reads bytes from this input stream. + * + * @param buffer A byte array in which to place the characters read. + * @param offset The offset at which to start reading. + * @param length The maximum number of bytes to read. + * @return The number of bytes read. + * @throws IOException if an error occurs. + */ + @Override + public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override] + if (finished.get()) { + return EOF_CHAR; + } + + final int nread = super.read(buffer, offset, length); + if (nread != EOF_CHAR) { + atomicCount.addAndGet(nread); + } + return nread; + } + + /** + * Closes this input stream and releases any system resources associated with the stream. + * + * @throws IOException if an error occurs. + */ + @Override + public void close() throws IOException { + final boolean closed = finished.getAndSet(true); + if (closed) { + return; + } + + // Close the stream + IOException exc = null; + try { + super.close(); + } catch (final IOException ioe) { + exc = ioe; + } + + // Notify that the stream has been closed + try { + onClose(); + } catch (final IOException ioe) { + exc = ioe; + } + + if (exc != null) { + throw exc; + } + } + + /** + * Called after the stream has been closed. This implementation does nothing. + * + * @throws IOException if an error occurs. + */ + protected void onClose() throws IOException { + // noop + } + + /** + * Gets the number of bytes read by this input stream. + * + * @return The number of bytes read by this input stream. + */ + public long getCount() { + return atomicCount.get(); + } + + @Override + public synchronized void mark(int readlimit) { + // TODO Auto-generated method stub + super.mark(readlimit); + } +} diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 5cfa369..91d6614 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -53,6 +53,9 @@ The <action> type attribute can be add,update,fix,remove. <action issue="VFS-726" dev="ggregory" type="fix" due-to="Cornelius Höfig, Gary Gregory"> getInputStream(int bufferSize) on SftpFileObject effectively ignores buffer size. </action> + <action issue="VFS-704" dev="ggregory" type="fix" due-to="Boris Petrov, Gary Gregory"> + Some providers wrap their input/output streams twice in a BufferedInputStream. + </action> </release> <release version="2.4.1" date="2019-08-10" description="Bug fix release."> <action issue="VFS-725" dev="ggregory" type="fix" due-to="Gary Gregory">