This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a05406ecc9d [branch-2.1] Picks "[Fix](delete) Fix delete job timeout 
when executing delete from ... #37363" (#37374)
a05406ecc9d is described below

commit a05406ecc9dd1362ca9613a4bdc945e63ad05615
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Sun Jul 7 18:33:17 2024 +0800

    [branch-2.1] Picks "[Fix](delete) Fix delete job timeout when executing 
delete from ... #37363" (#37374)
    
    ## Proposed changes
    
    picks https://github.com/apache/doris/pull/37363
---
 be/src/olap/delete_handler.cpp                     |  5 ++
 .../main/java/org/apache/doris/load/DeleteJob.java |  6 +++
 .../java/org/apache/doris/master/MasterImpl.java   | 28 +++++++---
 .../main/java/org/apache/doris/task/PushTask.java  | 12 +++++
 .../test_delete_from_timeout.groovy                | 59 ++++++++++++++++++++++
 5 files changed, 104 insertions(+), 6 deletions(-)

diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 6e390874126..8d85eb84bab 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -35,6 +35,7 @@
 #include "olap/predicate_creator.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
+#include "util/debug_points.h"
 
 using apache::thrift::ThriftDebugString;
 using std::vector;
@@ -90,6 +91,10 @@ std::string trans_op(const std::string& opt) {
 Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
                                                 const std::vector<TCondition>& 
conditions,
                                                 DeletePredicatePB* del_pred) {
+    DBUG_EXECUTE_IF("DeleteHandler::generate_delete_predicate.inject_failure", 
{
+        return Status::Error<false>(dp->param<int>("error_code"),
+                                    dp->param<std::string>("error_msg"));
+    })
     if (conditions.empty()) {
         return Status::Error<DELETE_INVALID_PARAMETERS>(
                 "invalid parameters for store_cond. condition_size={}", 
conditions.size());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
index f9c94284db8..761db0f4725 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -373,6 +373,12 @@ public class DeleteJob extends 
AbstractTxnStateChangeCallback implements DeleteJ
         long timeoutMs = getTimeoutMs();
         boolean ok = countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
         if (ok) {
+            if (!countDownLatch.getStatus().ok()) {
+                // encounter some errors that don't need to retry, abort 
directly
+                LOG.warn("delete job failed, errmsg={}", 
countDownLatch.getStatus().getErrorMsg());
+                throw new UserException(String.format("delete job failed, 
errmsg:%s",
+                        countDownLatch.getStatus().getErrorMsg()));
+            }
             return;
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 485463d8daf..12d908ff317 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -137,7 +137,8 @@ public class MasterImpl {
                         && taskType != TTaskType.DOWNLOAD && taskType != 
TTaskType.MOVE
                         && taskType != TTaskType.CLONE && taskType != 
TTaskType.PUBLISH_VERSION
                         && taskType != TTaskType.CREATE && taskType != 
TTaskType.UPDATE_TABLET_META_INFO
-                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
+                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
+                        && taskType != TTaskType.REALTIME_PUSH) {
                     return result;
                 }
             }
@@ -150,7 +151,6 @@ public class MasterImpl {
                     finishCreateReplica(task, request);
                     break;
                 case REALTIME_PUSH:
-                    checkHasTabletInfo(request);
                     Preconditions.checkState(request.isSetReportVersion());
                     finishRealtimePush(task, request);
                     break;
@@ -295,16 +295,32 @@ public class MasterImpl {
         }
     }
 
-    private void finishRealtimePush(AgentTask task, TFinishTaskRequest 
request) {
-        List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
-        Preconditions.checkState(finishTabletInfos != null && 
!finishTabletInfos.isEmpty());
-
+    private void finishRealtimePush(AgentTask task, TFinishTaskRequest 
request) throws Exception {
         PushTask pushTask = (PushTask) task;
 
         long dbId = pushTask.getDbId();
         long backendId = pushTask.getBackendId();
         long signature = task.getSignature();
         long transactionId = ((PushTask) task).getTransactionId();
+
+        if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
+            if (pushTask.getPushType() == TPushType.DELETE) {
+                // DeleteHandler may return status code 
DELETE_INVALID_CONDITION and DELETE_INVALID_PARAMETERS,
+                // we don't need to retry if meet them.
+                // note that they will be converted to 
TStatusCode.INTERNAL_ERROR when being sent from be to fe
+                if (request.getTaskStatus().getStatusCode() == 
TStatusCode.INTERNAL_ERROR) {
+                    
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
+                            task.getBackendId() + ": " + 
request.getTaskStatus().getErrorMsgs().toString());
+                    AgentTaskQueue.removeTask(backendId, 
TTaskType.REALTIME_PUSH, signature);
+                    LOG.warn("finish push replica error: {}", 
request.getTaskStatus().getErrorMsgs().toString());
+                }
+            }
+            return;
+        }
+
+        checkHasTabletInfo(request);
+        List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
+
         Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
         if (db == null) {
             AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, 
signature);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index ca29cc78e6f..df361593b49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.Predicate;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
 import org.apache.doris.thrift.TBrokerScanRange;
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TCondition;
@@ -34,6 +35,7 @@ import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushReq;
 import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.collect.Maps;
@@ -211,6 +213,16 @@ public class PushTask extends AgentTask {
         }
     }
 
+    // call this always means one of tasks is failed. count down to zero to 
finish entire task
+    public void countDownToZero(TStatusCode code, String errMsg) {
+        if (this.latch != null) {
+            latch.countDownToZero(new Status(code, errMsg));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("PushTask count down to zero. error msg: {}", 
errMsg);
+            }
+        }
+    }
+
     public long getReplicaId() {
         return replicaId;
     }
diff --git 
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy 
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
new file mode 100644
index 00000000000..2d5bf41b3db
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_delete_from_timeout","nonConcurrent") {
+
+    def tableName = "test_delete_from_timeout"
+ 
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName} (
+        `col1` BOOLEAN NOT NULL,
+        `col2` DECIMAL(17, 1) NOT NULL,
+        `col3` INT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`col1`, `col2`, `col3`)
+        DISTRIBUTED BY HASH(`col1`, `col2`, `col3`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    try {
+        sql "insert into ${tableName} values(1, 99.9, 234);"
+        
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
+            [error_code: -1900 /* DELETE_INVALID_CONDITION */, error_msg: 
"data type is float or double."])
+        test {
+            sql """delete from ${tableName} where col1 = "false" and col2 = 
"-9999782574499444.2" and col3 = "-25"; """
+            exception "data type is float or double."
+        }
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
+            [error_code: -1903 /* DELETE_INVALID_PARAMETERS */, error_msg: 
"invalid parameters for store_cond. condition_size=1"])
+        test {
+            sql """delete from ${tableName} where col1 = "false" and col2 = 
"-9999782574499444.2" and col3 = "-25"; """
+            exception "invalid parameters for store_cond. condition_size=1"
+        }
+    } catch (Exception e) {
+        logger.info(e.getMessage())
+        AssertTrue(false) 
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure")
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to