This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d65b378b402 branch-3.1: [feature](regression) add retry to stream load
when connection reset by s3 #54613 (#54703)
d65b378b402 is described below
commit d65b378b40226ce63721e1cac450acfeb0f7918a
Author: shuke <[email protected]>
AuthorDate: Thu Aug 14 11:55:10 2025 +0800
branch-3.1: [feature](regression) add retry to stream load when connection
reset by s3 #54613 (#54703)
picked from #54613
---
.../regression/action/StreamLoadAction.groovy | 192 ++++++++++++++++++++-
.../suites/tpcds_sf1_unique_p1/load.groovy | 1 +
2 files changed, 186 insertions(+), 7 deletions(-)
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index aa19094854e..7cf57d212f2 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -29,6 +29,7 @@ import groovy.util.logging.Slf4j
import org.apache.http.HttpEntity
import org.apache.http.HttpStatus
import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.client.methods.RequestBuilder
import org.apache.http.entity.FileEntity
import org.apache.http.entity.InputStreamEntity
@@ -37,6 +38,8 @@ import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.junit.Assert
+import java.io.InputStream
+import java.io.IOException
@Slf4j
class StreamLoadAction implements SuiteAction {
@@ -50,6 +53,7 @@ class StreamLoadAction implements SuiteAction {
String inputText
Iterator<List<Object>> inputIterator
long time
+ boolean retryIfHttpError = false
Closure check
Map<String, String> headers
SuiteContext context
@@ -138,6 +142,10 @@ class StreamLoadAction implements SuiteAction {
this.time = time.call()
}
+ void retryIfHttpError(boolean r) {
+ this.retryIfHttpError = r
+ }
+
void twoPhaseCommit(boolean twoPhaseCommit) {
this.twoPhaseCommit = twoPhaseCommit;
}
@@ -213,14 +221,18 @@ class StreamLoadAction implements SuiteAction {
}
private InputStream httpGetStream(CloseableHttpClient client, String url) {
- CloseableHttpResponse resp =
client.execute(RequestBuilder.get(url).build())
- int code = resp.getStatusLine().getStatusCode()
- if (code != HttpStatus.SC_OK) {
- String streamBody = EntityUtils.toString(resp.getEntity())
- throw new IllegalStateException("Get http stream failed, status
code is ${code}, body:\n${streamBody}")
- }
+ if (retryIfHttpError) {
+ return new ResumableHttpInputStream(client, url)
+ } else {
+ CloseableHttpResponse resp =
client.execute(RequestBuilder.get(url).build())
+ int code = resp.getStatusLine().getStatusCode()
+ if (code != HttpStatus.SC_OK) {
+ String streamBody = EntityUtils.toString(resp.getEntity())
+ throw new IllegalStateException("Get http stream failed,
status code is ${code}, body:\n${streamBody}")
+ }
- return resp.getEntity().getContent()
+ return resp.getEntity().getContent()
+ }
}
private RequestBuilder prepareRequestHeader(RequestBuilder requestBuilder)
{
@@ -423,4 +435,170 @@ class StreamLoadAction implements SuiteAction {
throw t;
}
}
+
+ /**
+ * A resumable HTTP input stream implementation that supports automatic
retry and resume
+ * on connection failures during data transfer. This stream is designed for
reliable
+ * large file downloads over HTTP with built-in recovery mechanisms,
especially when stream
+ * load runs too slowly due to high cpu.
+ *
+ * Pay Attention:
+ * Using this class can recover from S3 actively disconnecting due to
streaming
+ * load stuck, thereby masking the underlying performance bug where
stream loading
+ * stalls for extended periods.
+ */
+ class ResumableHttpInputStream extends InputStream {
+ private CloseableHttpClient httpClient
+ private String url
+ private long offset = 0
+ private InputStream currentStream
+ private CloseableHttpResponse currentResponse
+ private int maxRetries = 3
+ private int retryDelayMs = 1000
+ private boolean closed = false
+
+ ResumableHttpInputStream(CloseableHttpClient httpClient, String url) {
+ this.httpClient = httpClient
+ this.url = url
+ openNewStream(0)
+ }
+
+ private void openNewStream(long startOffset) {
+ closeCurrentResources()
+ log.info("open new stream ${this.url} with offset ${startOffset}")
+
+ int attempts = 0
+ while (attempts <= maxRetries && !closed) {
+ attempts++
+ try {
+ RequestBuilder builder = RequestBuilder.get(url)
+ if (startOffset > 0) {
+ builder.addHeader("Range", "bytes=${startOffset}-")
+ }
+
+ currentResponse = httpClient.execute(builder.build())
+ int code = currentResponse.getStatusLine().getStatusCode()
+
+ if (code == HttpStatus.SC_OK ||
+ (code == HttpStatus.SC_PARTIAL_CONTENT && startOffset >
0)) {
+ currentStream =
currentResponse.getEntity().getContent()
+ offset = startOffset
+ return
+ }
+
+ String body =
EntityUtils.toString(currentResponse.getEntity())
+ throw new IOException("HTTP error ${code}
${currentResponse.getStatusLine().getReasonPhrase()}\n${body}")
+
+ } catch (IOException e) {
+ closeCurrentResources()
+ if (attempts > maxRetries || closed) {
+ throw e
+ }
+ sleep(retryDelayMs * attempts)
+ }
+ }
+ }
+
+ @Override
+ int read() throws IOException {
+ if (closed) throw new IOException("Stream closed")
+
+ int attempts = 0
+ while (attempts <= maxRetries) {
+ attempts++
+ try {
+ int byteRead = currentStream.read()
+ if (byteRead >= 0) offset++
+ return byteRead
+ } catch (IOException e) {
+ log.info("${url} read exception: ${e.getMessage()}")
+ if (attempts > maxRetries || closed) throw e
+ reopenStreamAfterError()
+ sleep(retryDelayMs * attempts)
+ }
+ }
+ return -1
+ }
+
+ @Override
+ int read(byte[] b, int off, int len) throws IOException {
+ if (closed) throw new IOException("Stream closed")
+ if (b == null) throw new NullPointerException()
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException()
+ }
+
+ int attempts = 0
+ while (attempts <= maxRetries) {
+ attempts++
+ try {
+ int bytesRead = currentStream.read(b, off, len)
+ if (bytesRead > 0) offset += bytesRead
+ return bytesRead
+
+ } catch (IOException e) {
+ log.info("${url} read exception: ${e.getMessage()}")
+ if (attempts > maxRetries || closed) throw e
+ reopenStreamAfterError()
+ sleep(retryDelayMs * attempts)
+ }
+ }
+ return -1
+ }
+
+ private void reopenStreamAfterError() {
+ closeCurrentResources()
+ openNewStream(offset)
+ }
+
+ private void closeCurrentResources() {
+ try {
+ if (currentStream != null) {
+ currentStream.close()
+ }
+ } catch (IOException ignored) {}
+
+ try {
+ if (currentResponse != null) {
+ currentResponse.close()
+ }
+ } catch (IOException ignored) {}
+
+ currentStream = null
+ currentResponse = null
+ }
+
+ @Override
+ void close() throws IOException {
+ if (!closed) {
+ closed = true
+ closeCurrentResources()
+ }
+ }
+
+ long getOffset() { offset }
+
+ void setRetryPolicy(int maxRetries, int baseDelayMs) {
+ this.maxRetries = maxRetries
+ this.retryDelayMs = baseDelayMs
+ }
+
+ @Override
+ int available() throws IOException {
+ return currentStream != null ? currentStream.available() : 0
+ }
+
+ @Override
+ long skip(long n) throws IOException {
+ if (currentStream == null) return 0
+ long skipped = currentStream.skip(n)
+ offset += skipped
+ return skipped
+ }
+
+ @Override
+ boolean markSupported() {
+ return false
+ }
+ }
}
diff --git a/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
b/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
index 0caf0889f8c..e93d7f7d020 100644
--- a/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
+++ b/regression-test/suites/tpcds_sf1_unique_p1/load.groovy
@@ -121,6 +121,7 @@ suite("load") {
file """${getS3Url()}/regression/tpcds/sf1/${tableName}.dat.gz"""
time 10000 // limit inflight 10s
+ retryIfHttpError true
// stream load action will check result, include Success status,
and NumberTotalRows == NumberLoadedRows
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]