This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d65b378b402 branch-3.1: [feature](regression) add retry to stream load 
when connection reset by s3 #54613 (#54703)
d65b378b402 is described below

commit d65b378b40226ce63721e1cac450acfeb0f7918a
Author: shuke <[email protected]>
AuthorDate: Thu Aug 14 11:55:10 2025 +0800

    branch-3.1: [feature](regression) add retry to stream load when connection 
reset by s3 #54613 (#54703)
    
    picked from #54613
---
 .../regression/action/StreamLoadAction.groovy      | 192 ++++++++++++++++++++-
 .../suites/tpcds_sf1_unique_p1/load.groovy         |   1 +
 2 files changed, 186 insertions(+), 7 deletions(-)

diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index aa19094854e..7cf57d212f2 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -29,6 +29,7 @@ import groovy.util.logging.Slf4j
 import org.apache.http.HttpEntity
 import org.apache.http.HttpStatus
 import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.impl.client.CloseableHttpClient
 import org.apache.http.client.methods.RequestBuilder
 import org.apache.http.entity.FileEntity
 import org.apache.http.entity.InputStreamEntity
@@ -37,6 +38,8 @@ import org.apache.http.impl.client.CloseableHttpClient
 import org.apache.http.impl.client.HttpClients
 import org.apache.http.util.EntityUtils
 import org.junit.Assert
+import java.io.InputStream
+import java.io.IOException
 
 @Slf4j
 class StreamLoadAction implements SuiteAction {
@@ -50,6 +53,7 @@ class StreamLoadAction implements SuiteAction {
     String inputText
     Iterator<List<Object>> inputIterator
     long time
+    boolean retryIfHttpError = false 
     Closure check
     Map<String, String> headers
     SuiteContext context
@@ -138,6 +142,10 @@ class StreamLoadAction implements SuiteAction {
         this.time = time.call()
     }
 
+    void retryIfHttpError(boolean r) {
+        this.retryIfHttpError = r
+    }
+
     void twoPhaseCommit(boolean twoPhaseCommit) {
         this.twoPhaseCommit = twoPhaseCommit;
     }
@@ -213,14 +221,18 @@ class StreamLoadAction implements SuiteAction {
     }
 
     private InputStream httpGetStream(CloseableHttpClient client, String url) {
-        CloseableHttpResponse resp = 
client.execute(RequestBuilder.get(url).build())
-        int code = resp.getStatusLine().getStatusCode()
-        if (code != HttpStatus.SC_OK) {
-            String streamBody = EntityUtils.toString(resp.getEntity())
-            throw new IllegalStateException("Get http stream failed, status 
code is ${code}, body:\n${streamBody}")
-        }
+        if (retryIfHttpError) {
+            return new ResumableHttpInputStream(client, url)
+        } else {
+            CloseableHttpResponse resp = 
client.execute(RequestBuilder.get(url).build())
+            int code = resp.getStatusLine().getStatusCode()
+            if (code != HttpStatus.SC_OK) {
+                String streamBody = EntityUtils.toString(resp.getEntity())
+                throw new IllegalStateException("Get http stream failed, 
status code is ${code}, body:\n${streamBody}")
+            }
 
-        return resp.getEntity().getContent()
+            return resp.getEntity().getContent()
+        }
     }
 
     private RequestBuilder prepareRequestHeader(RequestBuilder requestBuilder) 
{
@@ -423,4 +435,170 @@ class StreamLoadAction implements SuiteAction {
             throw t;
         }
     }
+
+    /**
+    * A resumable HTTP input stream implementation that supports automatic 
retry and resume 
+    * on connection failures during data transfer. This stream is designed for 
reliable 
+    * large file downloads over HTTP with built-in recovery mechanisms, 
especially when stream
+    * load runs too slowly due to high cpu.
+    * 
+    * Pay Attention:
+    *     Using this class can recover from S3 actively disconnecting due to 
streaming 
+    *     load stuck, thereby masking the underlying performance bug where 
stream loading 
+    *     stalls for extended periods.
+    */
+    class ResumableHttpInputStream extends InputStream {
+        private CloseableHttpClient httpClient
+        private String url
+        private long offset = 0
+        private InputStream currentStream
+        private CloseableHttpResponse currentResponse
+        private int maxRetries = 3
+        private int retryDelayMs = 1000
+        private boolean closed = false
+
+        ResumableHttpInputStream(CloseableHttpClient httpClient, String url) {
+            this.httpClient = httpClient
+            this.url = url
+            openNewStream(0)
+        }
+
+        private void openNewStream(long startOffset) {
+            closeCurrentResources() 
+            log.info("open new stream ${this.url} with offset ${startOffset}")
+            
+            int attempts = 0
+            while (attempts <= maxRetries && !closed) {
+                attempts++
+                try {
+                    RequestBuilder builder = RequestBuilder.get(url)
+                    if (startOffset > 0) {
+                        builder.addHeader("Range", "bytes=${startOffset}-")
+                    }
+                    
+                    currentResponse = httpClient.execute(builder.build())
+                    int code = currentResponse.getStatusLine().getStatusCode()
+                    
+                    if (code == HttpStatus.SC_OK || 
+                    (code == HttpStatus.SC_PARTIAL_CONTENT && startOffset > 
0)) {
+                        currentStream = 
currentResponse.getEntity().getContent()
+                        offset = startOffset
+                        return
+                    }
+                    
+                    String body = 
EntityUtils.toString(currentResponse.getEntity())
+                    throw new IOException("HTTP error ${code} 
${currentResponse.getStatusLine().getReasonPhrase()}\n${body}")
+                    
+                } catch (IOException e) {
+                    closeCurrentResources()
+                    if (attempts > maxRetries || closed) {
+                        throw e
+                    }
+                    sleep(retryDelayMs * attempts)
+                }
+            }
+        }
+
+        @Override
+        int read() throws IOException {
+            if (closed) throw new IOException("Stream closed")
+            
+            int attempts = 0
+            while (attempts <= maxRetries) {
+                attempts++
+                try {
+                    int byteRead = currentStream.read()
+                    if (byteRead >= 0) offset++
+                    return byteRead
+                } catch (IOException e) {
+                    log.info("${url} read exception: ${e.getMessage()}")
+                    if (attempts > maxRetries || closed) throw e
+                    reopenStreamAfterError()
+                    sleep(retryDelayMs * attempts)
+                }
+            }
+            return -1
+        }
+
+        @Override
+        int read(byte[] b, int off, int len) throws IOException {
+            if (closed) throw new IOException("Stream closed")
+            if (b == null) throw new NullPointerException()
+            if (off < 0 || len < 0 || len > b.length - off) {
+                throw new IndexOutOfBoundsException()
+            }
+            
+            int attempts = 0
+            while (attempts <= maxRetries) {
+                attempts++
+                try {
+                    int bytesRead = currentStream.read(b, off, len)
+                    if (bytesRead > 0) offset += bytesRead
+                    return bytesRead
+                    
+                } catch (IOException e) {
+                    log.info("${url} read exception: ${e.getMessage()}")
+                    if (attempts > maxRetries || closed) throw e
+                    reopenStreamAfterError()
+                    sleep(retryDelayMs * attempts)
+                }
+            }
+            return -1
+        }
+
+        private void reopenStreamAfterError() {
+            closeCurrentResources()
+            openNewStream(offset)
+        }
+
+        private void closeCurrentResources() {
+            try {
+                if (currentStream != null) {
+                    currentStream.close()
+                }
+            } catch (IOException ignored) {}
+            
+            try {
+                if (currentResponse != null) {
+                    currentResponse.close()
+                }
+            } catch (IOException ignored) {}
+            
+            currentStream = null
+            currentResponse = null
+        }
+
+        @Override
+        void close() throws IOException {
+            if (!closed) {
+                closed = true
+                closeCurrentResources()
+            }
+        }
+
+        long getOffset() { offset }
+
+        void setRetryPolicy(int maxRetries, int baseDelayMs) {
+            this.maxRetries = maxRetries
+            this.retryDelayMs = baseDelayMs
+        }
+        
+        @Override
+        int available() throws IOException {
+            return currentStream != null ? currentStream.available() : 0
+        }
+        
+        @Override
+        long skip(long n) throws IOException {
+            if (currentStream == null) return 0
+            long skipped = currentStream.skip(n)
+            offset += skipped
+            return skipped
+        }
+        
+        @Override
+        boolean markSupported() {
+            return false
+        }
+    }
 }
diff --git a/regression-test/suites/tpcds_sf1_unique_p1/load.groovy 
b/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
index 0caf0889f8c..e93d7f7d020 100644
--- a/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
+++ b/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
@@ -121,6 +121,7 @@ suite("load") {
             file """${getS3Url()}/regression/tpcds/sf1/${tableName}.dat.gz"""
 
             time 10000 // limit inflight 10s
+            retryIfHttpError true
 
             // stream load action will check result, include Success status, 
and NumberTotalRows == NumberLoadedRows
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to