This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new 08232851fc [cherry-pick][improvement](scan) remove concurrency limit 
if scan has predicate (#13021) (#13037)
08232851fc is described below

commit 08232851fc10e9e04baaa0fc39022a374687d1ca
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Sep 28 18:51:41 2022 +0800

    [cherry-pick][improvement](scan) remove concurrency limit if scan has 
predicate (#13021) (#13037)
---
 be/src/runtime/fragment_mgr.cpp | 48 ++++++++++++++++++++++++++++-------------
 be/src/runtime/fragment_mgr.h   |  4 ++++
 2 files changed, 37 insertions(+), 15 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b45ef9b1ee..b81be8568b 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -593,6 +593,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
                                         BackendOptions::get_localhost()));
         }
         fragments_ctx = search->second;
+        _set_scan_concurrency(params, fragments_ctx.get());
     } else {
         // This may be a first fragment request of the query.
         // Create the query fragments context.
@@ -609,21 +610,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
             fragments_ctx->set_rsc_info = true;
         }
 
-        if (params.__isset.query_options) {
-            fragments_ctx->timeout_second = params.query_options.query_timeout;
-            if (params.query_options.__isset.resource_limit) {
-                
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
-            }
-        }
-        if (params.__isset.fragment && params.fragment.__isset.plan &&
-            params.fragment.plan.nodes.size() > 0) {
-            for (auto& node : params.fragment.plan.nodes) {
-                if (node.limit > 0 && node.limit < 1024) {
-                    fragments_ctx->set_serial_thread_token();
-                    break;
-                }
-            }
-        }
+        fragments_ctx->timeout_second = params.query_options.query_timeout;
+        _set_scan_concurrency(params, fragments_ctx.get());
 
         {
             // Find _fragments_ctx_map again, in case some other request has 
already
@@ -678,6 +666,36 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
     return Status::OK();
 }
 
+void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, 
QueryFragmentsCtx* fragments_ctx) {
+    if (params.__isset.query_options) {
+        if (params.query_options.__isset.resource_limit) {
+            
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
+            return;
+        }
+    }
+    if (params.__isset.fragment && params.fragment.__isset.plan &&
+        params.fragment.plan.nodes.size() > 0) {
+        for (auto& node : params.fragment.plan.nodes) {
+            // Only for SCAN NODE
+            if (node.node_type != TPlanNodeType::OLAP_SCAN_NODE) {
+                continue;
+            }
+            if (node.__isset.conjuncts && !node.conjuncts.empty()) {
+                // If the scan node has where predicate, do not set concurrency
+                continue;
+            }
+            if (node.limit > 0 && node.limit < 1024) {
+                fragments_ctx->set_serial_thread_token();
+                return;
+            } 
+        }
+    }
+}
+
+bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
+    return type == TPlanNodeType::OLAP_SCAN_NODE;
+}
+
 Status FragmentMgr::cancel(const TUniqueId& fragment_id, const 
PPlanFragmentCancelReason& reason,
                            const std::string& msg) {
     std::shared_ptr<FragmentExecState> exec_state;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 024cbfd23c..7ef471b1ed 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -97,6 +97,10 @@ public:
 private:
     void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, 
FinishCallback cb);
 
+    void _set_scan_concurrency(const TExecPlanFragmentParams& params, 
QueryFragmentsCtx* fragments_ctx);
+
+    bool _is_scan_node(const TPlanNodeType::type& type);
+
     // This is input params
     ExecEnv* _exec_env;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to