This is an automated email from the ASF dual-hosted git repository.

michaelo pushed a commit to branch WAGON-537
in repository https://gitbox.apache.org/repos/asf/maven-wagon.git

commit 7644e86c981213f8facd89a2ba123442db03a9d1
Author: olaf-otto <olaf.o...@unic.com>
AuthorDate: Fri Oct 26 00:49:31 2018 +0200

    [WAGON-537] Maven transfer speed of large artifacts is slow due to 
unsuitable buffer strategy
    
    Implemented a buffer strategy which fills the buffer to at least 50% and has
    priority over frequent writes. Added dynamic NIO buffer capacity allocation
    based on the expected number of bytes to transfer.
---
 .../java/org/apache/maven/wagon/AbstractWagon.java | 85 +++++++++++++++++++---
 .../wagon/shared/http/AbstractHttpClientWagon.java | 76 ++++++++++---------
 2 files changed, 116 insertions(+), 45 deletions(-)

diff --git 
a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java 
b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
index 4cbf37d..8995559 100644
--- a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
+++ b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
@@ -42,8 +42,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
 /**
  * Implementation of common facilities for Wagon providers.
  *
@@ -52,7 +58,24 @@ import java.util.List;
 public abstract class AbstractWagon
     implements Wagon
 {
-    protected static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+    protected static final int DEFAULT_BUFFER_SIZE = 4 * 1024;
+    protected static final int MAXIMUM_BUFFER_SIZE = 512 * 1024;
+
+    /**
+     * To efficiently buffer data, use a multiple of 4 KiB as this is likely 
to match the hardware
+     * buffer size of certain storage devices.
+     */
+    protected static final int BUFFER_SEGMENT_SIZE = 4 * 1024;
+
+    /**
+     * The desired minimum amount of chunks in which a {@link Resource} shall 
be
+     * {@link #transfer(Resource, InputStream, OutputStream, int, long) 
transferred}.
+     * This corresponds to the minimum times {@link 
#fireTransferProgress(TransferEvent, byte[], int)}
+     * is executed. 100 notifications is a conservative value that will lead 
to small chunks for
+     * any artifact less that {@link #BUFFER_SEGMENT_SIZE} * {@link 
#MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS}
+     * in size.
+     */
+    protected static final int MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS = 100;
 
     protected Repository repository;
 
@@ -560,31 +583,73 @@ public abstract class AbstractWagon
     protected void transfer( Resource resource, InputStream input, 
OutputStream output, int requestType, long maxSize )
         throws IOException
     {
-        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
+        ByteBuffer buffer = ByteBuffer.allocate( getBufferCapacityForTransfer( 
resource.getContentLength() ) );
+        int halfBufferCapacity = buffer.capacity() / 2;
 
         TransferEvent transferEvent = new TransferEvent( this, resource, 
TransferEvent.TRANSFER_PROGRESS, requestType );
         transferEvent.setTimestamp( System.currentTimeMillis() );
 
+        ReadableByteChannel in = Channels.newChannel( input );
+
         long remaining = maxSize;
-        while ( remaining > 0 )
+        while ( remaining > 0L )
         {
-            // let's safely cast to int because the min value will be lower 
than the buffer size.
-            int n = input.read( buffer, 0, (int) Math.min( buffer.length, 
remaining ) );
+            int read = in.read( buffer );
 
-            if ( n == -1 )
+            if ( read == -1 )
             {
+                // EOF, but some data has not been written yet.
+                if ( buffer.position() != 0 )
+                {
+                    buffer.flip();
+                    fireTransferProgress( transferEvent, buffer.array(), 
buffer.limit() );
+                    output.write( buffer.array(), 0, buffer.limit() );
+                }
+
                 break;
             }
 
-            fireTransferProgress( transferEvent, buffer, n );
-
-            output.write( buffer, 0, n );
+            // Prevent minichunking / fragmentation: when less than half the 
buffer is utilized,
+            // read some more bytes before writing and firing progress.
+            if ( buffer.position() < halfBufferCapacity )
+            {
+                continue;
+            }
 
-            remaining -= n;
+            buffer.flip();
+            fireTransferProgress( transferEvent, buffer.array(), 
buffer.limit() );
+            output.write( buffer.array(), 0, buffer.limit() );
+            remaining -= buffer.limit();
+            buffer.clear();
         }
         output.flush();
     }
 
+    /**
+     * Provides a buffer size for efficiently transferring the given amount of 
bytes such that
+     * it is not fragmented into too many chunks. For larger files larger 
buffers are provided such that downstream
+     * {@link #fireTransferProgress(TransferEvent, byte[], int) listeners} are 
not notified too frequently.
+     * For instance, transferring gigabyte-sized resources would result in 
millions of notifications when using
+     * only a few kibibytes of buffer, drastically slowing down transfer since 
transfer progress listeners and
+     * notifications are synchronous and may block, e.g., when writing 
download progress status to console.
+     *
+     * @param numberOfBytes can be 0 or less, in which case a default buffer 
size is used.
+     * @return a byte buffer suitable for transferring the given amount of 
bytes without too many chunks.
+     */
+    protected int getBufferCapacityForTransfer( long numberOfBytes )
+    {
+        if ( numberOfBytes <= 0L )
+        {
+            return DEFAULT_BUFFER_SIZE;
+        }
+
+        final int numberOfBufferSegments = (int)
+            numberOfBytes / ( BUFFER_SEGMENT_SIZE * 
MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS );
+        final int potentialBufferSize = numberOfBufferSegments * 
BUFFER_SEGMENT_SIZE;
+        return min( MAXIMUM_BUFFER_SIZE, max( DEFAULT_BUFFER_SIZE, 
potentialBufferSize ) );
+    }
+
     // ----------------------------------------------------------------------
     //
     // ----------------------------------------------------------------------
diff --git 
a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
 
b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
index 9f294f7..b0740f5 100755
--- 
a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
+++ 
b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
@@ -84,6 +84,10 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -105,9 +109,6 @@ public abstract class AbstractHttpClientWagon
     private final class RequestEntityImplementation
         extends AbstractHttpEntity
     {
-
-        private static final int BUFFER_SIZE = 2048;
-
         private final Resource resource;
 
         private final Wagon wagon;
@@ -160,52 +161,57 @@ public abstract class AbstractHttpClientWagon
             return repeatable;
         }
 
-        public void writeTo( final OutputStream outputStream )
+        public void writeTo( final OutputStream output )
             throws IOException
         {
-            if ( outputStream == null )
+            if ( output == null )
             {
-                throw new NullPointerException( "outputStream cannot be null" 
);
+                throw new NullPointerException( "output cannot be null" );
             }
             TransferEvent transferEvent =
                 new TransferEvent( wagon, resource, 
TransferEvent.TRANSFER_PROGRESS, TransferEvent.REQUEST_PUT );
             transferEvent.setTimestamp( System.currentTimeMillis() );
-            InputStream instream = ( this.source != null )
-                ? new FileInputStream( this.source )
-                : stream;
-            try
+
+            try ( ReadableByteChannel input = ( this.source != null )
+                    ? new RandomAccessFile( this.source, "r" ).getChannel()
+                    : Channels.newChannel( stream ) )
             {
-                byte[] buffer = new byte[BUFFER_SIZE];
-                int l;
-                if ( this.length < 0 )
-                {
-                    // until EOF
-                    while ( ( l = instream.read( buffer ) ) != -1 )
-                    {
-                        fireTransferProgress( transferEvent, buffer, -1 );
-                        outputStream.write( buffer, 0, l );
-                    }
-                }
-                else
+                ByteBuffer buffer = ByteBuffer.allocate( 
getBufferCapacityForTransfer( this.length ) );
+                int halfBufferCapacity = buffer.capacity() / 2;
+
+                long remaining = this.length < 0L ? Long.MAX_VALUE : 
this.length;
+                while ( remaining > 0L )
                 {
-                    // no need to consume more than length
-                    long remaining = this.length;
-                    while ( remaining > 0 )
+                    int read = input.read( buffer );
+                    if ( read == -1 )
                     {
-                        l = instream.read( buffer, 0, (int) Math.min( 
BUFFER_SIZE, remaining ) );
-                        if ( l == -1 )
+                        // EOF, but some data has not been written yet.
+                        if ( buffer.position() != 0 )
                         {
-                            break;
+                            buffer.flip();
+                            fireTransferProgress( transferEvent, 
buffer.array(), buffer.limit() );
+                            output.write( buffer.array(), 0, buffer.limit() );
+                            buffer.clear();
                         }
-                        fireTransferProgress( transferEvent, buffer, (int) 
Math.min( BUFFER_SIZE, remaining ) );
-                        outputStream.write( buffer, 0, l );
-                        remaining -= l;
+
+                        break;
                     }
+
+                    // Prevent minichunking / fragmentation: when less than 
half the buffer is utilized,
+                    // read some more bytes before writing and firing progress.
+                    if ( buffer.position() < halfBufferCapacity )
+                    {
+                        continue;
+                    }
+
+                    buffer.flip();
+                    fireTransferProgress( transferEvent, buffer.array(), 
buffer.limit() );
+                    output.write( buffer.array(), 0, buffer.limit() );
+                    remaining -= buffer.limit();
+                    buffer.clear();
+
                 }
-            }
-            finally
-            {
-                instream.close();
+                output.flush();
             }
         }
 

Reply via email to