This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 1fcc640e [Fix] When http reports an error, writing will get stuck (#539) 1fcc640e is described below commit 1fcc640ea589094baa7536abcef26fbb6edb4752 Author: wudi <676366...@qq.com> AuthorDate: Fri Jan 10 12:11:07 2025 +0800 [Fix] When http reports an error, writing will get stuck (#539) --- .../java/org/apache/doris/flink/sink/HttpUtil.java | 1 + .../doris/flink/sink/writer/DorisStreamLoad.java | 26 ++++++++++++++++++++-- .../doris/flink/sink/writer/LabelGenerator.java | 8 +++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 53d3ce13..d1600de6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -60,6 +60,7 @@ public class HttpUtil { return true; } }) + .setRetryHandler((exception, executionCount, context) -> false) .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) .setDefaultRequestConfig( RequestConfig.custom() diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index f900f741..c6e39326 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -89,6 +89,7 @@ public class DorisStreamLoad implements Serializable { private final Properties streamLoadProp; private final RecordStream recordStream; private volatile Future<CloseableHttpResponse> pendingLoadFuture; + private volatile Exception httpException = null; private final CloseableHttpClient httpClient; private final ExecutorService executorService; private boolean loadBatchFirstRecord; @@ -115,6 +116,10 @@ public class DorisStreamLoad implements Serializable { this.streamLoadProp = executionOptions.getStreamLoadProp(); this.enableDelete = executionOptions.getDeletable(); this.httpClient = httpClient; + String threadName = + String.format( + "stream-load-upload-%s-%s", + labelGenerator.getSubtaskId(), labelGenerator.getTableIdentifier()); this.executorService = new ThreadPoolExecutor( 1, @@ -122,7 +127,7 @@ public class DorisStreamLoad implements Serializable { 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new ExecutorThreadFactory("stream-load-upload")); + new ExecutorThreadFactory(threadName)); this.recordStream = new RecordStream( executionOptions.getBufferSize(), @@ -250,6 +255,7 @@ public class DorisStreamLoad implements Serializable { * @throws IOException */ public void writeRecord(byte[] record) throws IOException { + checkLoadException(); if (loadBatchFirstRecord) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { @@ -258,6 +264,12 @@ public class DorisStreamLoad implements Serializable { recordStream.write(record); } + private void checkLoadException() { + if (httpException != null) { + throw new RuntimeException("Stream load http request error, ", httpException); + } + } + @VisibleForTesting public RecordStream getRecordStream() { return recordStream; @@ -347,11 +359,21 @@ public class DorisStreamLoad implements Serializable { } else { executeMessage = "table " + table + " start execute load for label " + label; } + Thread mainThread = Thread.currentThread(); pendingLoadFuture = executorService.submit( () -> { LOG.info(executeMessage); - return httpClient.execute(putBuilder.build()); + try { + return httpClient.execute(putBuilder.build()); + } catch (Exception e) { + LOG.error("Failed to execute load, cause ", e); + httpException = e; + // When an HTTP error occurs, the main thread should be + // interrupted to prevent blocking + mainThread.interrupt(); + throw e; + } }); } catch (Exception e) { String err; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index d80315f5..84c14b79 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -91,4 +91,12 @@ public class LabelGenerator { String concatPrefix = String.format("%s_%s_%s", labelPrefix, tableIdentifier, subtaskId); return concatPrefix; } + + public int getSubtaskId() { + return subtaskId; + } + + public String getTableIdentifier() { + return tableIdentifier; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org