This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 616c9b2  [improvement] fix some successfully precommitted cannot be 
aborted lead to LabelAlreadyExists (#362)
616c9b2 is described below

commit 616c9b210cfa0e366557c3ff18049d52e3f44b70
Author: wudi <[email protected]>
AuthorDate: Mon Apr 15 15:49:54 2024 +0800

    [improvement] fix some successfully precommitted cannot be aborted lead to 
LabelAlreadyExists (#362)
---
 .../exception/LabelAlreadyExistsException.java     | 44 +++++++++++
 .../doris/flink/rest/models/RespContent.java       |  4 +
 .../org/apache/doris/flink/sink/ResponseUtil.java  |  8 ++
 .../doris/flink/sink/batch/RecordWithMeta.java     | 15 ++++
 .../doris/flink/sink/writer/DorisStreamLoad.java   | 78 ++++++++++++++++++--
 .../doris/flink/sink/writer/DorisWriter.java       | 86 ++++++++++++++++++----
 .../doris/flink/sink/writer/LabelGenerator.java    |  5 ++
 .../apache/doris/flink/sink/TestResponseUtil.java  | 21 ++++++
 .../flink/sink/writer/TestDorisStreamLoad.java     |  3 +-
 9 files changed, 242 insertions(+), 22 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
new file mode 100644
index 0000000..ea86c60
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.exception;
+
+public class LabelAlreadyExistsException extends RuntimeException {
+    public LabelAlreadyExistsException() {
+        super();
+    }
+
+    public LabelAlreadyExistsException(String message) {
+        super(message);
+    }
+
+    public LabelAlreadyExistsException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public LabelAlreadyExistsException(Throwable cause) {
+        super(cause);
+    }
+
+    protected LabelAlreadyExistsException(
+            String message,
+            Throwable cause,
+            boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
index 94a1dc4..225fb5a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
@@ -143,6 +143,10 @@ public class RespContent {
         return commitAndPublishTimeMs;
     }
 
+    public String getLabel() {
+        return label;
+    }
+
     @Override
     public String toString() {
         ObjectMapper mapper = new ObjectMapper();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
index 9eb1541..c348e73 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
@@ -27,10 +27,18 @@ public class ResponseUtil {
             Pattern.compile(
                     "transaction \\[(\\d+)\\] is already 
\\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
 
+    public static final Pattern ABORTTED_PATTERN =
+            Pattern.compile(
+                    "transaction \\[(\\d+)\\] is already|transaction 
\\[(\\d+)\\] not found");
+
     public static boolean isCommitted(String msg) {
         return COMMITTED_PATTERN.matcher(msg).find();
     }
 
+    public static boolean isAborted(String msg) {
+        return ABORTTED_PATTERN.matcher(msg).find();
+    }
+
     static final Pattern COPY_COMMITTED_PATTERN =
             Pattern.compile("errCode = 2, detailMessage = No files can be 
copied.*");
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
index 936487d..e11da6a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
@@ -57,4 +57,19 @@ public class RecordWithMeta {
     public String getTableIdentifier() {
         return this.database + "." + this.table;
     }
+
+    @Override
+    public String toString() {
+        return "RecordWithMeta{"
+                + "database='"
+                + database
+                + '\''
+                + ", table='"
+                + table
+                + '\''
+                + ", record='"
+                + record
+                + '\''
+                + '}';
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index d049a2b..cbfa8ba 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.sink.writer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -162,13 +163,18 @@ public class DorisStreamLoad implements Serializable {
      */
     public void abortPreCommit(String labelPrefix, long chkID) throws 
Exception {
         long startChkID = chkID;
-        LOG.info("abort for labelPrefix {}. start chkId {}.", labelPrefix, 
chkID);
+        LOG.info(
+                "abort for labelPrefix {}, concat labelPrefix {},  start chkId 
{}.",
+                labelPrefix,
+                labelGenerator.getConcatLabelPrefix(),
+                chkID);
         while (true) {
             try {
                 // TODO: According to label abort txn.
                 //  Currently, it can only be aborted based on txnid, so we 
must
                 //  first request a streamload based on the label to get the 
txnid.
                 String label = labelGenerator.generateTableLabel(startChkID);
+                LOG.info("start a check label {} to load.", label);
                 HttpPutBuilder builder = new HttpPutBuilder();
                 builder.setUrl(loadUrlStr)
                         .baseAuth(user, passwd)
@@ -210,7 +216,7 @@ public class DorisStreamLoad implements Serializable {
                 }
                 startChkID++;
             } catch (Exception e) {
-                LOG.warn("failed to stream load data", e);
+                LOG.warn("failed to abort labelPrefix {}", labelPrefix, e);
                 throw e;
             }
         }
@@ -320,9 +326,9 @@ public class DorisStreamLoad implements Serializable {
                 mapper.readValue(loadResult, new TypeReference<HashMap<String, 
String>>() {});
         if (!SUCCESS.equals(res.get("status"))) {
             String msg = res.get("msg");
-            if (msg != null && ResponseUtil.isCommitted(msg)) {
-                throw new DorisException(
-                        "try abort committed transaction, " + "do you recover 
from old savepoint?");
+            if (msg != null && ResponseUtil.isAborted(msg)) {
+                LOG.warn("Failed to abort transaction, {}", msg);
+                return;
             }
 
             LOG.error("Fail to abort transaction. txnId: {}, error: {}", 
txnID, msg);
@@ -330,6 +336,68 @@ public class DorisStreamLoad implements Serializable {
         }
     }
 
+    public void abortTransactionByLabel(String label) throws Exception {
+        if (StringUtils.isNullOrWhitespaceOnly(label)) {
+            return;
+        }
+        HttpPutBuilder builder = new HttpPutBuilder();
+        builder.setUrl(abortUrlStr)
+                .baseAuth(user, passwd)
+                .addCommonHeader()
+                .setLabel(label)
+                .setEmptyEntity()
+                .abort();
+        CloseableHttpResponse response = httpClient.execute(builder.build());
+
+        int statusCode = response.getStatusLine().getStatusCode();
+        if (statusCode != 200 || response.getEntity() == null) {
+            LOG.warn("abort transaction by label response: " + 
response.getStatusLine().toString());
+            throw new DorisRuntimeException(
+                    "Fail to abort transaction by label " + label + " with url 
" + abortUrlStr);
+        }
+
+        ObjectMapper mapper = new ObjectMapper();
+        String loadResult = EntityUtils.toString(response.getEntity());
+        LOG.info("abort Result {}", loadResult);
+        Map<String, String> res =
+                mapper.readValue(loadResult, new TypeReference<HashMap<String, 
String>>() {});
+        if (!SUCCESS.equals(res.get("status"))) {
+            String msg = res.get("msg");
+            if (msg != null && ResponseUtil.isCommitted(msg)) {
+                throw new DorisException(
+                        "try abort committed transaction by label, "
+                                + "do you recover from old savepoint?");
+            }
+
+            LOG.error("Fail to abort transaction by label. label: {}, error: 
{}", label, msg);
+            throw new DorisException("Fail to abort transaction by label, " + 
loadResult);
+        }
+    }
+
+    public void abortLabelExistTransaction(RespContent respContent) {
+        if (respContent == null || respContent.getMessage() == null) {
+            return;
+        }
+        try {
+            Matcher matcher = 
LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
+            if (matcher.find()) {
+                long txnId = Long.parseLong(matcher.group(2));
+                abortTransaction(txnId);
+                LOG.info(
+                        "Finish to abort transaction {} for label already 
exist {}",
+                        txnId,
+                        respContent.getLabel());
+            }
+        } catch (Exception ex) {
+            LOG.error(
+                    "Failed abort transaction {} for label already exist", 
respContent.getLabel());
+        }
+    }
+
+    public String getCurrentLabel() {
+        return currentLabel;
+    }
+
     public void close() throws IOException {
         if (null != httpClient) {
             try {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 54facc7..db6094e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -28,11 +28,13 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.LabelAlreadyExistsException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.LoadStatus;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.slf4j.Logger;
@@ -72,7 +74,7 @@ public class DorisWriter<IN>
     private final DorisOptions dorisOptions;
     private final DorisReadOptions dorisReadOptions;
     private final DorisExecutionOptions executionOptions;
-    private final String labelPrefix;
+    private String labelPrefix;
     private final int subtaskId;
     private final int intervalTime;
     private final DorisRecordSerializer<IN> serializer;
@@ -82,6 +84,7 @@ public class DorisWriter<IN>
     private BackendUtil backendUtil;
     private SinkWriterMetricGroup sinkMetricGroup;
     private Map<String, DorisWriteMetrics> sinkMetricsMap = new 
ConcurrentHashMap<>();
+    private volatile boolean multiTableLoad = false;
 
     public DorisWriter(
             Sink.InitContext initContext,
@@ -95,13 +98,17 @@ public class DorisWriter<IN>
                         .getRestoredCheckpointId()
                         .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
         this.curCheckpointId = lastCheckpointId + 1;
-        LOG.info("restore checkpointId {}", lastCheckpointId);
+        LOG.info("restore from checkpointId {}", lastCheckpointId);
         LOG.info("labelPrefix {}", executionOptions.getLabelPrefix());
         this.labelPrefix = executionOptions.getLabelPrefix();
         this.subtaskId = initContext.getSubtaskId();
         this.scheduledExecutorService =
                 new ScheduledThreadPoolExecutor(1, new 
ExecutorThreadFactory("stream-load-check"));
         this.serializer = serializer;
+        if (StringUtils.isBlank(dorisOptions.getTableIdentifier())) {
+            this.multiTableLoad = true;
+            LOG.info("table.identifier is empty, multiple table writes.");
+        }
         this.dorisOptions = dorisOptions;
         this.dorisReadOptions = dorisReadOptions;
         this.executionOptions = executionOptions;
@@ -135,6 +142,7 @@ public class DorisWriter<IN>
         List<String> alreadyAborts = new ArrayList<>();
         // abort label in state
         for (DorisWriterState state : recoveredStates) {
+            LOG.info("try to abort txn from DorisWriterState {}", 
state.toString());
             // Todo: When the sink parallelism is reduced,
             //  the txn of the redundant task before aborting is also needed.
             if (!state.getLabelPrefix().equals(labelPrefix)) {
@@ -177,7 +185,6 @@ public class DorisWriter<IN>
     }
 
     public void writeOneDorisRecord(DorisRecord record) throws IOException, 
InterruptedException {
-
         if (record == null || record.getRow() == null) {
             // ddl or value is null
             return;
@@ -221,9 +228,9 @@ public class DorisWriter<IN>
         if (!globalLoading && 
loadingMap.values().stream().noneMatch(Boolean::booleanValue)) {
             return Collections.emptyList();
         }
+
         // disable exception checker before stop load.
         globalLoading = false;
-
         // submit stream load http request
         List<DorisCommittable> committableList = new ArrayList<>();
         for (Map.Entry<String, DorisStreamLoad> streamLoader : 
dorisStreamLoadMap.entrySet()) {
@@ -240,13 +247,21 @@ public class DorisWriter<IN>
                 dorisWriteMetrics.flush(respContent);
             }
             if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-                String errMsg =
-                        String.format(
-                                "table %s stream load error: %s, see more in 
%s",
-                                tableIdentifier,
-                                respContent.getMessage(),
-                                respContent.getErrorURL());
-                throw new DorisRuntimeException(errMsg);
+                if (executionOptions.enabled2PC()
+                        && 
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
+                    LOG.info("try to abort {} cause Label Already Exists", 
respContent.getLabel());
+                    dorisStreamLoad.abortLabelExistTransaction(respContent);
+                    throw new LabelAlreadyExistsException("Exist label abort 
finished, retry");
+                } else {
+                    String errMsg =
+                            String.format(
+                                    "table %s stream load error: %s, see more 
in %s",
+                                    tableIdentifier,
+                                    respContent.getMessage(),
+                                    respContent.getErrorURL());
+                    LOG.error("Failed to load, {}", errMsg);
+                    throw new DorisRuntimeException(errMsg);
+                }
             }
             if (executionOptions.enabled2PC()) {
                 long txnId = respContent.getTxnId();
@@ -255,11 +270,32 @@ public class DorisWriter<IN>
                                 dorisStreamLoad.getHostPort(), 
dorisStreamLoad.getDb(), txnId));
             }
         }
+
         // clean loadingMap
         loadingMap.clear();
         return committableList;
     }
 
+    private void abortPossibleSuccessfulTransaction() {
+        // In the case of multi-table writing, if a new table is added during 
the period
+        // (there is no streamloader for this table in the previous Checkpoint 
state),
+        // in the precommit phase, if some tables succeed and others fail,
+        // the txn of successful precommit cannot be aborted.
+        if (executionOptions.enabled2PC() && multiTableLoad) {
+            LOG.info("Try to abort may have successfully preCommitted label.");
+            for (Map.Entry<String, DorisStreamLoad> entry : 
dorisStreamLoadMap.entrySet()) {
+                DorisStreamLoad abortLoader = entry.getValue();
+                try {
+                    
abortLoader.abortTransactionByLabel(abortLoader.getCurrentLabel());
+                } catch (Exception ex) {
+                    LOG.warn(
+                            "Skip abort transaction failed by label, reason is 
{}.",
+                            ex.getMessage());
+                }
+            }
+        }
+    }
+
     @Override
     public List<DorisWriterState> snapshotState(long checkpointId) throws 
IOException {
         List<DorisWriterState> writerStates = new ArrayList<>();
@@ -302,8 +338,12 @@ public class DorisWriter<IN>
 
     /** Check the streamload http request regularly. */
     private void checkDone() {
-        for (Map.Entry<String, DorisStreamLoad> streamLoadMap : 
dorisStreamLoadMap.entrySet()) {
-            checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+        // todo: When writing to multiple tables,
+        //  the checkdone thread may cause problems. Disable it first.
+        if (!multiTableLoad) {
+            for (Map.Entry<String, DorisStreamLoad> streamLoadMap : 
dorisStreamLoadMap.entrySet()) {
+                checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+            }
         }
     }
 
@@ -347,16 +387,29 @@ public class DorisWriter<IN>
                         RespContent content =
                                 dorisStreamLoad.handlePreCommitResponse(
                                         
dorisStreamLoad.getPendingLoadFuture().get());
-                        errorMsg = content.getMessage();
+                        if (executionOptions.enabled2PC()
+                                && 
LoadStatus.LABEL_ALREADY_EXIST.equals(content.getStatus())) {
+                            LOG.info(
+                                    "try to abort {} cause Label Already 
Exists",
+                                    content.getLabel());
+                            
dorisStreamLoad.abortLabelExistTransaction(content);
+                            errorMsg = "Exist label abort finished, retry";
+                            LOG.info(errorMsg);
+                            return;
+                        } else {
+                            errorMsg = content.getMessage();
+                            loadException = new StreamLoadException(errorMsg);
+                        }
                     } catch (Exception e) {
                         errorMsg = e.getMessage();
+                        loadException = new DorisRuntimeException(e);
                     }
 
-                    loadException = new StreamLoadException(errorMsg);
                     LOG.error(
                             "table {} stream load finished unexpectedly, 
interrupt worker thread! {}",
                             tableIdentifier,
                             errorMsg);
+
                     // set the executor thread interrupted in case blocking in 
write data.
                     executorThread.interrupt();
                 }
@@ -391,6 +444,9 @@ public class DorisWriter<IN>
         if (scheduledExecutorService != null) {
             scheduledExecutorService.shutdownNow();
         }
+        LOG.info("Try to abort txn before closing.");
+        abortPossibleSuccessfulTransaction();
+
         if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) {
             for (DorisStreamLoad dorisStreamLoad : 
dorisStreamLoadMap.values()) {
                 dorisStreamLoad.close();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 91eaf9e..5f3a8d6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -69,4 +69,9 @@ public class LabelGenerator {
     public String generateCopyBatchLabel(String table, long chkId, int 
fileNum) {
         return String.format("%s_%s_%s_%s_%s", labelPrefix, table, subtaskId, 
chkId, fileNum);
     }
+
+    public String getConcatLabelPrefix() {
+        String concatPrefix = String.format("%s_%s_%s", labelPrefix, 
tableIdentifier, subtaskId);
+        return concatPrefix;
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
index 1225424..f8d36a7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
@@ -44,4 +44,25 @@ public class TestResponseUtil {
         Assert.assertFalse(ResponseUtil.isCommitted(commitMsg));
         Assert.assertFalse(ResponseUtil.isCommitted(abortedMsg));
     }
+
+    @Test
+    public void testIsAborted() {
+        String notFoundMsg = "errCode = 2, detailMessage = transaction [2] not 
found";
+        String alreadyAbort =
+                "errCode = 2, detailMessage = transaction [2] is already 
aborted. abort reason: User Abort";
+        String systemAlreadAbort =
+                "errCode = 2, detailMessage = transaction [2] is already 
aborted. abort reason: timeout by txn manager";
+        String alreadCommit =
+                "errCode = 2, detailMessage = transaction [2] is already 
COMMITTED, could not abort.";
+        String alreadVISIBLE =
+                "errCode = 2, detailMessage = transaction [2] is already 
VISIBLE, could not abort.";
+        String errormsg =
+                "tCouldn't open transport for :0 (Could not resolve host for 
client socket.";
+        Assert.assertTrue(ResponseUtil.isAborted(notFoundMsg));
+        Assert.assertTrue(ResponseUtil.isAborted(alreadyAbort));
+        Assert.assertTrue(ResponseUtil.isAborted(systemAlreadAbort));
+        Assert.assertTrue(ResponseUtil.isAborted(alreadCommit));
+        Assert.assertTrue(ResponseUtil.isAborted(alreadVISIBLE));
+        Assert.assertFalse(ResponseUtil.isAborted(errormsg));
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index 1352a26..94beae8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.sink.writer;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -89,7 +88,7 @@ public class TestDorisStreamLoad {
         dorisStreamLoad.abortTransaction(anyLong());
     }
 
-    @Test(expected = DorisException.class)
+    @Test(expected = Exception.class)
     public void testAbortTransactionFailed() throws Exception {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse abortFailedResponse =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to