This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a02be87c09d [fix](broker) fix no error url when broker data quality error (#35643) a02be87c09d is described below commit a02be87c09d5673712d4163ddd564a446f39786e Author: xueweizhang <zxw520bl...@163.com> AuthorDate: Tue Jun 4 23:19:51 2024 +0800 [fix](broker) fix no error url when broker data quality error (#35643) --- .../main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 3 ++- .../apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java | 9 ++++++++- .../main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 6 ++++-- .../suites/load_p0/broker_load/test_etl_failed.groovy | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 275cd371cdd..2ffe85bb36a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -51,6 +51,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -320,7 +321,7 @@ public class BrokerLoadJob extends BulkLoadJob { } // check data quality - if (!checkDataQuality()) { + if (!checkDataQuality() || attachment.getStatus().getErrorCode() == TStatusCode.DATA_QUALITY_ERROR) { cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG), true, true); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java index 7aefd332a29..e5f8973d33a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.common.Status; import org.apache.doris.transaction.ErrorTabletInfo; import org.apache.doris.transaction.TabletCommitInfo; @@ -29,15 +30,17 @@ public class BrokerLoadingTaskAttachment extends TaskAttachment { private String trackingUrl; private List<TabletCommitInfo> commitInfoList; List<ErrorTabletInfo> errorTabletInfos; + private Status status = new Status(); public BrokerLoadingTaskAttachment(long taskId, Map<String, String> counters, String trackingUrl, List<TabletCommitInfo> commitInfoList, - List<ErrorTabletInfo> errorTabletInfos) { + List<ErrorTabletInfo> errorTabletInfos, Status status) { super(taskId); this.trackingUrl = trackingUrl; this.counters = counters; this.commitInfoList = commitInfoList; this.errorTabletInfos = errorTabletInfos; + this.status = status; } public String getCounter(String key) { @@ -55,4 +58,8 @@ public class BrokerLoadingTaskAttachment extends TaskAttachment { public List<ErrorTabletInfo> getErrorTabletInfos() { return errorTabletInfos; } + + public Status getStatus() { + return status; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 71823f84d32..64b8e9a037f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -37,6 +37,7 @@ import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueryType; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.ErrorTabletInfo; import org.apache.doris.transaction.TabletCommitInfo; @@ -198,13 +199,14 @@ public class LoadLoadingTask extends LoadTask { curCoordinator.exec(); if (curCoordinator.join(waitSecond)) { Status status = curCoordinator.getExecStatus(); - if (status.ok()) { + if (status.ok() || status.getErrorCode() == TStatusCode.DATA_QUALITY_ERROR) { attachment = new BrokerLoadingTaskAttachment(signature, curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl(), TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()), ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos() - .stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList()))); + .stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())), + status); curCoordinator.getErrorTabletInfos().clear(); } else { throw new LoadException(status.getErrorMsg()); diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy index 3b4eac874d5..928b4e38542 100644 --- a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy +++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy @@ -63,7 +63,7 @@ suite("test_etl_failed", "load_p0") { assertTrue(1 == 2, "etl should be failed") break; } - if (result[0][2].equals("CANCELLED")) { + if (result[0][2].equals("CANCELLED") && result[0][13].contains("_load_error_log")) { break; } Thread.sleep(1000) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org