This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bcfad1d [Improve](Stream) Optimize the situation where Flink may get 
stuck after the streamload thread exits abnormally (#578)
2bcfad1d is described below

commit 2bcfad1dd00552992f6ab397caa95832798b3219
Author: wudi <676366...@qq.com>
AuthorDate: Tue Mar 18 16:34:00 2025 +0800

    [Improve](Stream) Optimize the situation where Flink may get stuck after 
the streamload thread exits abnormally (#578)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |   2 +-
 .../exception/LabelAlreadyExistsException.java     |   2 +-
 .../doris/flink/sink/writer/DorisStreamLoad.java   | 118 +++++++++++++++-----
 .../doris/flink/sink/writer/DorisWriter.java       | 116 +++++---------------
 .../doris/flink/sink/writer/LoadConstants.java     |   9 ++
 .../doris/flink/sink/writer/RecordBuffer.java      |  34 +++---
 .../doris/flink/sink/writer/RecordStream.java      |   7 +-
 .../doris/flink/table/DorisConfigOptions.java      |   4 +-
 .../sink/DorisSinkMultiTblFailoverITCase.java      | 121 ++++++++++++++++++++-
 .../doris/flink/sink/writer/TestDorisWriter.java   |  54 ++++-----
 10 files changed, 299 insertions(+), 168 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 371069f5..831a317e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -34,7 +34,7 @@ public class DorisExecutionOptions implements Serializable {
 
     private static final long serialVersionUID = 1L;
     // 0 means disable checker thread
-    public static final int DEFAULT_CHECK_INTERVAL = 10000;
+    public static final int DEFAULT_CHECK_INTERVAL = 0;
     public static final int DEFAULT_MAX_RETRY_TIMES = 3;
     private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
     private static final int DEFAULT_BUFFER_COUNT = 3;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
index ea86c60a..9a523b16 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.flink.exception;
 
-public class LabelAlreadyExistsException extends RuntimeException {
+public class LabelAlreadyExistsException extends DorisRuntimeException {
     public LabelAlreadyExistsException() {
         super();
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index c6e39326..18a97959 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -28,10 +28,12 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.LabelAlreadyExistsException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
+import org.apache.doris.flink.sink.LoadStatus;
 import org.apache.doris.flink.sink.ResponseUtil;
 import org.apache.http.client.entity.GzipCompressingEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -47,6 +49,7 @@ import java.net.NoRouteToHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -61,6 +64,7 @@ import static 
org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
 import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.DORIS_SUCCESS_STATUS;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE;
@@ -88,7 +92,7 @@ public class DorisStreamLoad implements Serializable {
     private final boolean enableDelete;
     private final Properties streamLoadProp;
     private final RecordStream recordStream;
-    private volatile Future<CloseableHttpResponse> pendingLoadFuture;
+    private volatile Future<RespContent> pendingLoadFuture;
     private volatile Exception httpException = null;
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
@@ -175,7 +179,7 @@ public class DorisStreamLoad implements Serializable {
         this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
     }
 
-    public Future<CloseableHttpResponse> getPendingLoadFuture() {
+    public Future<RespContent> getPendingLoadFuture() {
         return pendingLoadFuture;
     }
 
@@ -254,19 +258,30 @@ public class DorisStreamLoad implements Serializable {
      * @param record
      * @throws IOException
      */
-    public void writeRecord(byte[] record) throws IOException {
+    public void writeRecord(byte[] record) throws InterruptedException {
         checkLoadException();
-        if (loadBatchFirstRecord) {
-            loadBatchFirstRecord = false;
-        } else if (lineDelimiter != null) {
-            recordStream.write(lineDelimiter);
+        try {
+            if (loadBatchFirstRecord) {
+                loadBatchFirstRecord = false;
+            } else if (lineDelimiter != null) {
+                recordStream.write(lineDelimiter);
+            }
+            recordStream.write(record);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            if (httpException != null) {
+                throw new DorisRuntimeException(httpException.getMessage(), 
httpException);
+            } else {
+                LOG.info("write record interrupted, cause " + e.getClass());
+                throw e;
+            }
         }
-        recordStream.write(record);
     }
 
     private void checkLoadException() {
         if (httpException != null) {
-            throw new RuntimeException("Stream load http request error, ", 
httpException);
+            throw new RuntimeException(
+                    "Stream load http request error, " + 
httpException.getMessage(), httpException);
         }
     }
 
@@ -292,26 +307,32 @@ public class DorisStreamLoad implements Serializable {
         throw new StreamLoadException("stream load error: " + 
response.getStatusLine().toString());
     }
 
-    public RespContent stopLoad() throws IOException {
-        recordStream.endInput();
-        if (enableGroupCommit) {
-            LOG.info("table {} stream load stopped with group commit on host 
{}", table, hostPort);
-        } else {
-            LOG.info(
-                    "table {} stream load stopped for {} on host {}",
-                    table,
-                    currentLabel,
-                    hostPort);
-        }
-
-        Preconditions.checkState(pendingLoadFuture != null);
+    public RespContent stopLoad() throws InterruptedException {
         try {
-            return handlePreCommitResponse(pendingLoadFuture.get());
-        } catch (NoRouteToHostException nex) {
-            LOG.error("Failed to connect, cause ", nex);
-            throw new DorisRuntimeException(
-                    "No Route to Host to " + hostPort + ", exception: " + nex);
-        } catch (Exception e) {
+            recordStream.endInput();
+            if (enableGroupCommit) {
+                LOG.info(
+                        "table {} stream load stopped with group commit on 
host {}",
+                        table,
+                        hostPort);
+            } else {
+                LOG.info(
+                        "table {} stream load stopped for {} on host {}",
+                        table,
+                        currentLabel,
+                        hostPort);
+            }
+
+            Preconditions.checkState(pendingLoadFuture != null);
+            return pendingLoadFuture.get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            if (httpException != null) {
+                throw new DorisRuntimeException(httpException.getMessage(), 
httpException);
+            } else {
+                throw e;
+            }
+        } catch (ExecutionException e) {
             throw new DorisRuntimeException(e);
         }
     }
@@ -365,7 +386,46 @@ public class DorisStreamLoad implements Serializable {
                             () -> {
                                 LOG.info(executeMessage);
                                 try {
-                                    return 
httpClient.execute(putBuilder.build());
+                                    CloseableHttpResponse execute =
+                                            
httpClient.execute(putBuilder.build());
+                                    RespContent respContent = 
handlePreCommitResponse(execute);
+
+                                    if 
(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                                        if (enable2PC
+                                                && 
LoadStatus.LABEL_ALREADY_EXIST.equals(
+                                                        
respContent.getStatus())
+                                                && !JOB_EXIST_FINISHED.equals(
+                                                        
respContent.getExistingJobStatus())) {
+                                            LOG.info(
+                                                    "try to abort {} cause 
status {}, exist job status {} ",
+                                                    respContent.getLabel(),
+                                                    respContent.getStatus(),
+                                                    
respContent.getExistingJobStatus());
+                                            
abortLabelExistTransaction(respContent);
+                                            throw new 
LabelAlreadyExistsException(
+                                                    "Exist label abort 
finished, retry");
+                                        } else {
+                                            String errMsg =
+                                                    String.format(
+                                                            "table %s.%s 
stream load error: %s, see more in %s",
+                                                            getDb(),
+                                                            getTable(),
+                                                            
respContent.getMessage(),
+                                                            
respContent.getErrorURL());
+                                            LOG.error("Failed to load, {}", 
errMsg);
+                                            throw new 
DorisRuntimeException(errMsg);
+                                        }
+                                    }
+                                    return respContent;
+                                } catch (NoRouteToHostException nex) {
+                                    LOG.error("Failed to connect, cause ", 
nex);
+                                    httpException = nex;
+                                    mainThread.interrupt();
+                                    throw new DorisRuntimeException(
+                                            "No Route to Host to "
+                                                    + hostPort
+                                                    + ", exception: "
+                                                    + nex);
                                 } catch (Exception e) {
                                     LOG.error("Failed to execute load, cause 
", e);
                                     httpException = e;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 466b995f..64fbc95f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -27,7 +27,6 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.apache.doris.flink.exception.LabelAlreadyExistsException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.BackendUtil;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -49,12 +47,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
-import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
-import static 
org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINISHED;
 
 /**
  * Doris Writer will load data to doris.
@@ -64,8 +56,6 @@ import static 
org.apache.doris.flink.sink.writer.DorisStreamLoad.JOB_EXIST_FINIS
 public class DorisWriter<IN>
         implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable> 
{
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriter.class);
-    private static final List<String> DORIS_SUCCESS_STATUS =
-            new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
     private final long lastCheckpointId;
     private long curCheckpointId;
     private Map<String, DorisStreamLoad> dorisStreamLoadMap = new 
ConcurrentHashMap<>();
@@ -86,7 +76,6 @@ public class DorisWriter<IN>
     private SinkWriterMetricGroup sinkMetricGroup;
     private Map<String, DorisWriteMetrics> sinkMetricsMap = new 
ConcurrentHashMap<>();
     private volatile boolean multiTableLoad = false;
-    private final ReentrantLock checkLock = new ReentrantLock();
 
     public DorisWriter(
             Sink.InitContext initContext,
@@ -139,13 +128,6 @@ public class DorisWriter<IN>
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        if (intervalTime >= 1000) {
-            // when uploading data in streaming mode, we need to regularly 
detect whether there are
-            // exceptions.
-            LOG.info("start stream load checkdone thread with interval {} ms", 
intervalTime);
-            scheduledExecutorService.scheduleWithFixedDelay(
-                    this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
-        }
     }
 
     @VisibleForTesting
@@ -242,59 +224,32 @@ public class DorisWriter<IN>
         }
         // disable exception checker before stop load.
         globalLoading = false;
-        checkLock.lockInterruptibly();
-        try {
-            // submit stream load http request
-            List<DorisCommittable> committableList = new ArrayList<>();
-            for (Map.Entry<String, DorisStreamLoad> streamLoader : 
dorisStreamLoadMap.entrySet()) {
-                String tableIdentifier = streamLoader.getKey();
-                if (!loadingMap.getOrDefault(tableIdentifier, false)) {
-                    LOG.debug("skip table {}, no data need to load.", 
tableIdentifier);
-                    continue;
-                }
-                DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
-                RespContent respContent = dorisStreamLoad.stopLoad();
-                // refresh metrics
-                if (sinkMetricsMap.containsKey(tableIdentifier)) {
-                    DorisWriteMetrics dorisWriteMetrics = 
sinkMetricsMap.get(tableIdentifier);
-                    dorisWriteMetrics.flush(respContent);
-                }
-                if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                    if (executionOptions.enabled2PC()
-                            && 
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())
-                            && 
!JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
-                        LOG.info(
-                                "try to abort {} cause status {}, exist job 
status {} ",
-                                respContent.getLabel(),
-                                respContent.getStatus(),
-                                respContent.getExistingJobStatus());
-                        
dorisStreamLoad.abortLabelExistTransaction(respContent);
-                        throw new LabelAlreadyExistsException("Exist label 
abort finished, retry");
-                    } else {
-                        String errMsg =
-                                String.format(
-                                        "table %s stream load error: %s, see 
more in %s",
-                                        tableIdentifier,
-                                        respContent.getMessage(),
-                                        respContent.getErrorURL());
-                        LOG.error("Failed to load, {}", errMsg);
-                        throw new DorisRuntimeException(errMsg);
-                    }
-                }
-                if (executionOptions.enabled2PC()) {
-                    long txnId = respContent.getTxnId();
-                    committableList.add(
-                            new DorisCommittable(
-                                    dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
-                }
+        // submit stream load http request
+        List<DorisCommittable> committableList = new ArrayList<>();
+        for (Map.Entry<String, DorisStreamLoad> streamLoader : 
dorisStreamLoadMap.entrySet()) {
+            String tableIdentifier = streamLoader.getKey();
+            if (!loadingMap.getOrDefault(tableIdentifier, false)) {
+                LOG.debug("skip table {}, no data need to load.", 
tableIdentifier);
+                continue;
+            }
+            DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
+            RespContent respContent = dorisStreamLoad.stopLoad();
+            // refresh metrics
+            if (sinkMetricsMap.containsKey(tableIdentifier)) {
+                DorisWriteMetrics dorisWriteMetrics = 
sinkMetricsMap.get(tableIdentifier);
+                dorisWriteMetrics.flush(respContent);
+            }
+            if (executionOptions.enabled2PC()) {
+                long txnId = respContent.getTxnId();
+                committableList.add(
+                        new DorisCommittable(
+                                dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
             }
-
-            // clean loadingMap
-            loadingMap.clear();
-            return committableList;
-        } finally {
-            checkLock.unlock();
         }
+
+        // clean loadingMap
+        loadingMap.clear();
+        return committableList;
     }
 
     private void abortPossibleSuccessfulTransaction() {
@@ -358,25 +313,15 @@ public class DorisWriter<IN>
                                 new 
HttpUtil(dorisReadOptions).getHttpClient()));
     }
 
-    /** Check the streamload http request regularly. */
+    /** Http throws an exception actively, there is no need to check 
regularly. */
+    @Deprecated
     private void checkDone() {
         if (!globalLoading) {
             return;
         }
         LOG.debug("start timer checker, interval {} ms", intervalTime);
-        if (checkLock.tryLock()) {
-            try {
-                // double check
-                if (!globalLoading) {
-                    return;
-                }
-                for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
-                        dorisStreamLoadMap.entrySet()) {
-                    checkAllDone(streamLoadMap.getKey(), 
streamLoadMap.getValue());
-                }
-            } finally {
-                checkLock.unlock();
-            }
+        for (Map.Entry<String, DorisStreamLoad> streamLoadMap : 
dorisStreamLoadMap.entrySet()) {
+            checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
         }
     }
 
@@ -401,6 +346,7 @@ public class DorisWriter<IN>
                 // use send cached data to new txn, then notify to restart the 
stream
                 if (executionOptions.isUseCache()) {
                     try {
+
                         
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
                         if (executionOptions.enabled2PC()) {
                             dorisStreamLoad.abortPreCommit(labelPrefix, 
curCheckpointId);
@@ -419,9 +365,7 @@ public class DorisWriter<IN>
                 } else {
                     String errorMsg;
                     try {
-                        RespContent content =
-                                dorisStreamLoad.handlePreCommitResponse(
-                                        
dorisStreamLoad.getPendingLoadFuture().get());
+                        RespContent content = 
dorisStreamLoad.getPendingLoadFuture().get();
                         if (executionOptions.enabled2PC()
                                 && 
LoadStatus.LABEL_ALREADY_EXIST.equals(content.getStatus())) {
                             LOG.info(
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index 1e026977..54deb4a0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -17,6 +17,13 @@
 
 package org.apache.doris.flink.sink.writer;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
+import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+
 /** Constants for load. */
 public class LoadConstants {
     public static final String COLUMNS_KEY = "columns";
@@ -35,4 +42,6 @@ public class LoadConstants {
     public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
     public static final String COMPRESS_TYPE = "compress_type";
     public static final String COMPRESS_TYPE_GZ = "gz";
+    public static final List<String> DORIS_SUCCESS_STATUS =
+            new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
index c84e1039..aeecaed9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
@@ -64,25 +64,21 @@ public class RecordBuffer {
         }
     }
 
-    public void stopBufferData() throws IOException {
-        try {
-            // add Empty buffer as finish flag.
-            boolean isEmpty = false;
-            if (currentWriteBuffer != null) {
-                currentWriteBuffer.flip();
-                // check if the current write buffer is empty.
-                isEmpty = currentWriteBuffer.limit() == 0;
-                readQueue.put(currentWriteBuffer);
-                currentWriteBuffer = null;
-            }
-            if (!isEmpty) {
-                ByteBuffer byteBuffer = writeQueue.take();
-                byteBuffer.flip();
-                Preconditions.checkState(byteBuffer.limit() == 0);
-                readQueue.put(byteBuffer);
-            }
-        } catch (Exception e) {
-            throw new IOException(e);
+    public void stopBufferData() throws InterruptedException {
+        // add Empty buffer as finish flag.
+        boolean isEmpty = false;
+        if (currentWriteBuffer != null) {
+            currentWriteBuffer.flip();
+            // check if the current write buffer is empty.
+            isEmpty = currentWriteBuffer.limit() == 0;
+            readQueue.put(currentWriteBuffer);
+            currentWriteBuffer = null;
+        }
+        if (!isEmpty) {
+            ByteBuffer byteBuffer = writeQueue.take();
+            byteBuffer.flip();
+            Preconditions.checkState(byteBuffer.limit() == 0);
+            readQueue.put(byteBuffer);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
index 693967ac..2fe51734 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
@@ -45,7 +45,7 @@ public class RecordStream extends InputStream {
         recordBuffer.startBufferData();
     }
 
-    public void endInput() throws IOException {
+    public void endInput() throws InterruptedException {
         recordBuffer.stopBufferData();
     }
 
@@ -54,15 +54,16 @@ public class RecordStream extends InputStream {
         try {
             return recordBuffer.read(buff);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new RuntimeException(e);
         }
     }
 
-    public void write(byte[] buff) throws IOException {
+    public void write(byte[] buff) throws InterruptedException {
         try {
             recordBuffer.write(buff);
         } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            throw e;
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index b99710e3..ffd3ec92 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -216,9 +216,9 @@ public class DorisConfigOptions {
     public static final ConfigOption<Duration> SINK_CHECK_INTERVAL =
             ConfigOptions.key("sink.check-interval")
                     .durationType()
-                    .defaultValue(Duration.ofMillis(10000))
+                    .defaultValue(Duration.ofMillis(0))
                     .withDescription(
-                            "check exception with the interval while loading, 
The default is 1s, 0 means disabling the checker thread");
+                            "check exception with the interval while loading, 
0 means disabling the checker thread");
     public static final ConfigOption<Integer> SINK_MAX_RETRIES =
             ConfigOptions.key("sink.max-retries")
                     .intType()
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
index d5dd6927..f4146990 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkMultiTblFailoverITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -48,6 +49,7 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.api.common.JobStatus.FINISHED;
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 
 /** DorisSink abnormal case of multi-table writing */
@@ -57,6 +59,7 @@ public class DorisSinkMultiTblFailoverITCase extends 
AbstractITCaseService {
             LoggerFactory.getLogger(DorisSinkMultiTblFailoverITCase.class);
     static final String DATABASE = "test_multi_failover_sink";
     static final String TABLE_MULTI_CSV = "tbl_multi_csv";
+    static final String TABLE_MULTI_CSV_NO_EXIST_TBL = 
"tbl_multi_csv_no_exist";
     private final boolean batchMode;
 
     public DorisSinkMultiTblFailoverITCase(boolean batchMode) {
@@ -68,12 +71,121 @@ public class DorisSinkMultiTblFailoverITCase extends 
AbstractITCaseService {
         return new Object[][] {new Object[] {false}, new Object[] {true}};
     }
 
+    /**
+     * In an extreme case, during a checkpoint, a piece of data written is 
bufferCount*bufferSize
+     */
+    @Test
+    public void testTableNotExistCornerCase() throws Exception {
+        LOG.info("Start to testTableNotExistCornerCase");
+        dropDatabase();
+        dropTable(TABLE_MULTI_CSV_NO_EXIST_TBL);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getCheckpointConfig().setCheckpointTimeout(300 * 1000);
+        env.setParallelism(1);
+        int checkpointIntervalMs = 10000;
+        env.enableCheckpointing(checkpointIntervalMs);
+
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("format", "csv");
+        DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .enable2PC()
+                .setBatchMode(batchMode)
+                .setFlushQueueSize(1)
+                .setBufferSize(1)
+                .setBufferCount(3)
+                .setCheckInterval(0)
+                .setStreamLoadProp(properties);
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder
+                .setFenodes(getFenodes())
+                .setTableIdentifier("")
+                .setUsername(getDorisUsername())
+                .setPassword(getDorisPassword());
+
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setSerializer(new RecordWithMetaSerializer())
+                .setDorisOptions(dorisBuilder.build());
+
+        DataStreamSource<RecordWithMeta> mockSource =
+                env.addSource(
+                        new SourceFunction<RecordWithMeta>() {
+                            @Override
+                            public void run(SourceContext<RecordWithMeta> ctx) 
throws Exception {
+                                RecordWithMeta record3 =
+                                        new RecordWithMeta(
+                                                DATABASE, 
TABLE_MULTI_CSV_NO_EXIST_TBL, "1,3");
+                                ctx.collect(record3);
+                            }
+
+                            @Override
+                            public void cancel() {}
+                        });
+
+        mockSource.sinkTo(builder.build());
+        JobClient jobClient = env.executeAsync();
+        CompletableFuture<JobStatus> jobStatus = jobClient.getJobStatus();
+        LOG.info("Job status: {}", jobStatus);
+
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(60)));
+        // wait checkpoint failure
+        List<JobStatus> errorStatus =
+                Arrays.asList(
+                        JobStatus.FAILING,
+                        JobStatus.CANCELLING,
+                        JobStatus.CANCELED,
+                        JobStatus.FAILED,
+                        JobStatus.RESTARTING);
+
+        waitForJobStatus(jobClient, errorStatus, 
Deadline.fromNow(Duration.ofSeconds(30)));
+
+        LOG.info("start to create add table");
+        initializeTable(TABLE_MULTI_CSV_NO_EXIST_TBL);
+
+        LOG.info("wait job restart success");
+        // wait table restart success
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(60)));
+
+        LOG.info("wait job running finished");
+        waitForJobStatus(
+                jobClient,
+                Collections.singletonList(FINISHED),
+                Deadline.fromNow(Duration.ofSeconds(60)));
+
+        String queryRes =
+                String.format(
+                        "select id,task_id from %s.%s ", DATABASE, 
TABLE_MULTI_CSV_NO_EXIST_TBL);
+        List<String> expected = Arrays.asList("1,3");
+
+        if (!batchMode) {
+            ContainerUtils.checkResult(
+                    getDorisQueryConnection(), LOG, expected, queryRes, 2, 
false);
+        } else {
+            List<String> actualResult =
+                    ContainerUtils.getResult(getDorisQueryConnection(), LOG, 
expected, queryRes, 2);
+            LOG.info("actual size: {}, expected size: {}", 
actualResult.size(), expected.size());
+            Assert.assertTrue(
+                    actualResult.size() >= expected.size() && 
actualResult.containsAll(expected));
+        }
+    }
+
     /**
      * Four exceptions are simulated in one job 1. Add a table that does not 
exist 2. flink
      * checkpoint failed 3. doris cluster restart 4. stream load fail
      */
     @Test
-    public void testDorisClusterFailoverSink() throws Exception {
+    public void testMultiTblFailoverSink() throws Exception {
+        LOG.info("Start to testMultiTblFailoverSink");
         int totalTblNum = 3;
         for (int i = 1; i <= totalTblNum; i++) {
             String tableName = TABLE_MULTI_CSV + i;
@@ -274,4 +386,11 @@ public class DorisSinkMultiTblFailoverITCase extends 
AbstractITCaseService {
                 LOG,
                 String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
     }
+
+    private void dropDatabase() {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 3e1ab2c2..c1f59c74 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -26,20 +26,18 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.apache.doris.flink.exception.LabelAlreadyExistsException;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.mockito.MockedStatic;
 
 import java.io.IOException;
@@ -51,7 +49,7 @@ import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.contains;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
@@ -61,9 +59,7 @@ public class TestDorisWriter {
     DorisOptions dorisOptions;
     DorisReadOptions readOptions;
     DorisExecutionOptions executionOptions;
-
     private MockedStatic<BackendUtil> backendUtilMockedStatic;
-    @Rule public ExpectedException thrown = ExpectedException.none();
 
     @Before
     public void setUp() {
@@ -79,7 +75,7 @@ public class TestDorisWriter {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse preCommitResponse =
                 HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, 
true);
-        when(httpClient.execute(any())).thenReturn(preCommitResponse);
+        
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
         DorisWriter<String> dorisWriter = initWriter(httpClient);
         dorisWriter.write("doris,1", null);
         Collection<DorisCommittable> committableList = 
dorisWriter.prepareCommit();
@@ -96,12 +92,14 @@ public class TestDorisWriter {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse preCommitResponse =
                 
HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE, 
true);
-        when(httpClient.execute(any())).thenReturn(preCommitResponse);
+        
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
         DorisWriter<String> dorisWriter = initWriter(httpClient);
         dorisWriter.write("doris,1", null);
-        thrown.expect(LabelAlreadyExistsException.class);
-        thrown.expectMessage("Exist label abort finished, retry");
-        dorisWriter.prepareCommit();
+        try {
+            dorisWriter.prepareCommit();
+        } catch (DorisRuntimeException e) {
+            Assert.assertTrue(e.getMessage().contains("Exist label abort 
finished, retry"));
+        }
     }
 
     @Test
@@ -110,12 +108,14 @@ public class TestDorisWriter {
         CloseableHttpResponse preCommitResponse =
                 HttpTestUtil.getResponse(
                         
HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_FINISH_RESPONSE, true);
-        when(httpClient.execute(any())).thenReturn(preCommitResponse);
+        
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
         DorisWriter<String> dorisWriter = initWriter(httpClient);
         dorisWriter.write("doris,1", null);
-        thrown.expect(DorisRuntimeException.class);
-        thrown.expectMessage(contains("stream load error"));
-        dorisWriter.prepareCommit();
+        try {
+            dorisWriter.prepareCommit();
+        } catch (DorisRuntimeException e) {
+            Assert.assertTrue(e.getMessage().contains("stream load error"));
+        }
     }
 
     @Test
@@ -124,7 +124,7 @@ public class TestDorisWriter {
         CloseableHttpResponse preCommitResponse =
                 HttpTestUtil.getResponse(
                         
HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_FINISH_RESPONSE, true);
-        when(httpClient.execute(any())).thenReturn(preCommitResponse);
+        
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(preCommitResponse);
         Map<String, DorisStreamLoad> dorisStreamLoadMap = new 
ConcurrentHashMap<>();
         Map<String, DorisWriteMetrics> dorisWriteMetricsMap = new 
ConcurrentHashMap<>();
         Sink.InitContext initContext = mock(Sink.InitContext.class);
@@ -154,13 +154,14 @@ public class TestDorisWriter {
                         new LabelGenerator("", true),
                         httpClient);
         dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), 
dorisStreamLoad);
-        dorisStreamLoad.startLoad("", false);
         dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
         dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap);
         dorisWriter.write("doris,1", null);
-        thrown.expect(DorisRuntimeException.class);
-        thrown.expectMessage(contains("stream load error"));
-        dorisWriter.prepareCommit();
+        try {
+            dorisWriter.prepareCommit();
+        } catch (DorisRuntimeException e) {
+            Assert.assertTrue(e.getMessage().contains("stream load error"));
+        }
     }
 
     @Test
@@ -171,9 +172,11 @@ public class TestDorisWriter {
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         DorisWriter<String> dorisWriter = initWriter(httpClient);
         dorisWriter.write("doris,1", null);
-        thrown.expect(DorisRuntimeException.class);
-        thrown.expectMessage(contains("stream load error"));
-        dorisWriter.prepareCommit();
+        try {
+            dorisWriter.prepareCommit();
+        } catch (DorisRuntimeException e) {
+            Assert.assertTrue(e.getMessage().contains("stream load error"));
+        }
     }
 
     private DorisWriter<String> initWriter(CloseableHttpClient httpClient) 
throws IOException {
@@ -205,7 +208,6 @@ public class TestDorisWriter {
                         new LabelGenerator("", true),
                         httpClient);
         dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), 
dorisStreamLoad);
-        dorisStreamLoad.startLoad("", false);
         dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
         dorisWriter.setDorisMetricsMap(dorisWriteMetricsMap);
         return dorisWriter;
@@ -219,13 +221,13 @@ public class TestDorisWriter {
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         DorisWriter<String> dorisWriter = initWriter(httpClient);
         BackendUtil mock = mock(BackendUtil.class);
-        when(mock.tryHttpConnection(any())).thenReturn(true);
+        when(mock.tryHttpConnection(anyString())).thenReturn(true);
         dorisWriter.setBackendUtil(mock);
+        dorisWriter.write("doris,1", null);
         List<DorisWriterState> writerStates = dorisWriter.snapshotState(1);
 
         Assert.assertEquals(1, writerStates.size());
         Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
-        Assert.assertTrue(!dorisWriter.isLoading());
     }
 
     public DorisWriteMetrics getMockWriteMetrics(SinkWriterMetricGroup 
sinkWriterMetricGroup) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to