This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a02be87c09d [fix](broker) fix no error url when broker data quality 
error (#35643)
a02be87c09d is described below

commit a02be87c09d5673712d4163ddd564a446f39786e
Author: xueweizhang <zxw520bl...@163.com>
AuthorDate: Tue Jun 4 23:19:51 2024 +0800

    [fix](broker) fix no error url when broker data quality error (#35643)
---
 .../main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java    | 3 ++-
 .../apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java    | 9 ++++++++-
 .../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java  | 6 ++++--
 .../suites/load_p0/broker_load/test_etl_failed.groovy            | 2 +-
 4 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 275cd371cdd..2ffe85bb36a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -51,6 +51,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.TransactionState;
@@ -320,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob {
         }
 
         // check data quality
-        if (!checkDataQuality()) {
+        if (!checkDataQuality() || attachment.getStatus().getErrorCode() == 
TStatusCode.DATA_QUALITY_ERROR) {
             cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
                             DataQualityException.QUALITY_FAIL_MSG), true, 
true);
             return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
index 7aefd332a29..e5f8973d33a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import org.apache.doris.common.Status;
 import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TabletCommitInfo;
 
@@ -29,15 +30,17 @@ public class BrokerLoadingTaskAttachment extends 
TaskAttachment {
     private String trackingUrl;
     private List<TabletCommitInfo> commitInfoList;
     List<ErrorTabletInfo> errorTabletInfos;
+    private Status status = new Status();
 
     public BrokerLoadingTaskAttachment(long taskId, Map<String, String> 
counters, String trackingUrl,
                                        List<TabletCommitInfo> commitInfoList,
-                                       List<ErrorTabletInfo> errorTabletInfos) 
{
+                                       List<ErrorTabletInfo> errorTabletInfos, 
Status status) {
         super(taskId);
         this.trackingUrl = trackingUrl;
         this.counters = counters;
         this.commitInfoList = commitInfoList;
         this.errorTabletInfos = errorTabletInfos;
+        this.status = status;
     }
 
     public String getCounter(String key) {
@@ -55,4 +58,8 @@ public class BrokerLoadingTaskAttachment extends 
TaskAttachment {
     public List<ErrorTabletInfo> getErrorTabletInfos() {
         return errorTabletInfos;
     }
+
+    public Status getStatus() {
+        return status;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 71823f84d32..64b8e9a037f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -37,6 +37,7 @@ import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TQueryType;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.ErrorTabletInfo;
 import org.apache.doris.transaction.TabletCommitInfo;
@@ -198,13 +199,14 @@ public class LoadLoadingTask extends LoadTask {
         curCoordinator.exec();
         if (curCoordinator.join(waitSecond)) {
             Status status = curCoordinator.getExecStatus();
-            if (status.ok()) {
+            if (status.ok() || status.getErrorCode() == 
TStatusCode.DATA_QUALITY_ERROR) {
                 attachment = new BrokerLoadingTaskAttachment(signature,
                         curCoordinator.getLoadCounters(),
                         curCoordinator.getTrackingUrl(),
                         
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()),
                         
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()
-                                
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())));
+                                
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())),
+                        status);
                 curCoordinator.getErrorTabletInfos().clear();
             } else {
                 throw new LoadException(status.getErrorMsg());
diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy 
b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
index 3b4eac874d5..928b4e38542 100644
--- a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
@@ -63,7 +63,7 @@ suite("test_etl_failed", "load_p0") {
             assertTrue(1 == 2, "etl should be failed")
             break;
         }
-        if (result[0][2].equals("CANCELLED")) {
+        if (result[0][2].equals("CANCELLED") && 
result[0][13].contains("_load_error_log")) {
             break;
         }
         Thread.sleep(1000)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to