This is an automated email from the ASF dual-hosted git repository. michaelo pushed a commit to branch WAGON-537 in repository https://gitbox.apache.org/repos/asf/maven-wagon.git
commit 7644e86c981213f8facd89a2ba123442db03a9d1 Author: olaf-otto <olaf.o...@unic.com> AuthorDate: Fri Oct 26 00:49:31 2018 +0200 [WAGON-537] Maven transfer speed of large artifacts is slow due to unsuitable buffer strategy Implemented a buffer strategy which fills the buffer to at least 50% and has priority over frequent writes. Added dynamic NIO buffer capacity allocation based on the expected number of bytes to transfer. --- .../java/org/apache/maven/wagon/AbstractWagon.java | 85 +++++++++++++++++++--- .../wagon/shared/http/AbstractHttpClientWagon.java | 76 ++++++++++--------- 2 files changed, 116 insertions(+), 45 deletions(-) diff --git a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java index 4cbf37d..8995559 100644 --- a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java +++ b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java @@ -42,8 +42,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.util.List; +import static java.lang.Math.max; +import static java.lang.Math.min; + /** * Implementation of common facilities for Wagon providers. * @@ -52,7 +58,24 @@ import java.util.List; public abstract class AbstractWagon implements Wagon { - protected static final int DEFAULT_BUFFER_SIZE = 1024 * 4; + protected static final int DEFAULT_BUFFER_SIZE = 4 * 1024; + protected static final int MAXIMUM_BUFFER_SIZE = 512 * 1024; + + /** + * To efficiently buffer data, use a multiple of 4 KiB as this is likely to match the hardware + * buffer size of certain storage devices. + */ + protected static final int BUFFER_SEGMENT_SIZE = 4 * 1024; + + /** + * The desired minimum amount of chunks in which a {@link Resource} shall be + * {@link #transfer(Resource, InputStream, OutputStream, int, long) transferred}. + * This corresponds to the minimum times {@link #fireTransferProgress(TransferEvent, byte[], int)} + * is executed. 100 notifications is a conservative value that will lead to small chunks for + * any artifact less that {@link #BUFFER_SEGMENT_SIZE} * {@link #MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS} + * in size. + */ + protected static final int MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS = 100; protected Repository repository; @@ -560,31 +583,73 @@ public abstract class AbstractWagon protected void transfer( Resource resource, InputStream input, OutputStream output, int requestType, long maxSize ) throws IOException { - byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; + + ByteBuffer buffer = ByteBuffer.allocate( getBufferCapacityForTransfer( resource.getContentLength() ) ); + int halfBufferCapacity = buffer.capacity() / 2; TransferEvent transferEvent = new TransferEvent( this, resource, TransferEvent.TRANSFER_PROGRESS, requestType ); transferEvent.setTimestamp( System.currentTimeMillis() ); + ReadableByteChannel in = Channels.newChannel( input ); + long remaining = maxSize; - while ( remaining > 0 ) + while ( remaining > 0L ) { - // let's safely cast to int because the min value will be lower than the buffer size. - int n = input.read( buffer, 0, (int) Math.min( buffer.length, remaining ) ); + int read = in.read( buffer ); - if ( n == -1 ) + if ( read == -1 ) { + // EOF, but some data has not been written yet. + if ( buffer.position() != 0 ) + { + buffer.flip(); + fireTransferProgress( transferEvent, buffer.array(), buffer.limit() ); + output.write( buffer.array(), 0, buffer.limit() ); + } + break; } - fireTransferProgress( transferEvent, buffer, n ); - - output.write( buffer, 0, n ); + // Prevent minichunking / fragmentation: when less than half the buffer is utilized, + // read some more bytes before writing and firing progress. + if ( buffer.position() < halfBufferCapacity ) + { + continue; + } - remaining -= n; + buffer.flip(); + fireTransferProgress( transferEvent, buffer.array(), buffer.limit() ); + output.write( buffer.array(), 0, buffer.limit() ); + remaining -= buffer.limit(); + buffer.clear(); } output.flush(); } + /** + * Provides a buffer size for efficiently transferring the given amount of bytes such that + * it is not fragmented into too many chunks. For larger files larger buffers are provided such that downstream + * {@link #fireTransferProgress(TransferEvent, byte[], int) listeners} are not notified too frequently. + * For instance, transferring gigabyte-sized resources would result in millions of notifications when using + * only a few kibibytes of buffer, drastically slowing down transfer since transfer progress listeners and + * notifications are synchronous and may block, e.g., when writing download progress status to console. + * + * @param numberOfBytes can be 0 or less, in which case a default buffer size is used. + * @return a byte buffer suitable for transferring the given amount of bytes without too many chunks. + */ + protected int getBufferCapacityForTransfer( long numberOfBytes ) + { + if ( numberOfBytes <= 0L ) + { + return DEFAULT_BUFFER_SIZE; + } + + final int numberOfBufferSegments = (int) + numberOfBytes / ( BUFFER_SEGMENT_SIZE * MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS ); + final int potentialBufferSize = numberOfBufferSegments * BUFFER_SEGMENT_SIZE; + return min( MAXIMUM_BUFFER_SIZE, max( DEFAULT_BUFFER_SIZE, potentialBufferSize ) ); + } + // ---------------------------------------------------------------------- // // ---------------------------------------------------------------------- diff --git a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java index 9f294f7..b0740f5 100755 --- a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java +++ b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java @@ -84,6 +84,10 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; @@ -105,9 +109,6 @@ public abstract class AbstractHttpClientWagon private final class RequestEntityImplementation extends AbstractHttpEntity { - - private static final int BUFFER_SIZE = 2048; - private final Resource resource; private final Wagon wagon; @@ -160,52 +161,57 @@ public abstract class AbstractHttpClientWagon return repeatable; } - public void writeTo( final OutputStream outputStream ) + public void writeTo( final OutputStream output ) throws IOException { - if ( outputStream == null ) + if ( output == null ) { - throw new NullPointerException( "outputStream cannot be null" ); + throw new NullPointerException( "output cannot be null" ); } TransferEvent transferEvent = new TransferEvent( wagon, resource, TransferEvent.TRANSFER_PROGRESS, TransferEvent.REQUEST_PUT ); transferEvent.setTimestamp( System.currentTimeMillis() ); - InputStream instream = ( this.source != null ) - ? new FileInputStream( this.source ) - : stream; - try + + try ( ReadableByteChannel input = ( this.source != null ) + ? new RandomAccessFile( this.source, "r" ).getChannel() + : Channels.newChannel( stream ) ) { - byte[] buffer = new byte[BUFFER_SIZE]; - int l; - if ( this.length < 0 ) - { - // until EOF - while ( ( l = instream.read( buffer ) ) != -1 ) - { - fireTransferProgress( transferEvent, buffer, -1 ); - outputStream.write( buffer, 0, l ); - } - } - else + ByteBuffer buffer = ByteBuffer.allocate( getBufferCapacityForTransfer( this.length ) ); + int halfBufferCapacity = buffer.capacity() / 2; + + long remaining = this.length < 0L ? Long.MAX_VALUE : this.length; + while ( remaining > 0L ) { - // no need to consume more than length - long remaining = this.length; - while ( remaining > 0 ) + int read = input.read( buffer ); + if ( read == -1 ) { - l = instream.read( buffer, 0, (int) Math.min( BUFFER_SIZE, remaining ) ); - if ( l == -1 ) + // EOF, but some data has not been written yet. + if ( buffer.position() != 0 ) { - break; + buffer.flip(); + fireTransferProgress( transferEvent, buffer.array(), buffer.limit() ); + output.write( buffer.array(), 0, buffer.limit() ); + buffer.clear(); } - fireTransferProgress( transferEvent, buffer, (int) Math.min( BUFFER_SIZE, remaining ) ); - outputStream.write( buffer, 0, l ); - remaining -= l; + + break; } + + // Prevent minichunking / fragmentation: when less than half the buffer is utilized, + // read some more bytes before writing and firing progress. + if ( buffer.position() < halfBufferCapacity ) + { + continue; + } + + buffer.flip(); + fireTransferProgress( transferEvent, buffer.array(), buffer.limit() ); + output.write( buffer.array(), 0, buffer.limit() ); + remaining -= buffer.limit(); + buffer.clear(); + } - } - finally - { - instream.close(); + output.flush(); } }