This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit d76e4291c281f93bfe2d4666d10fff85d05c04c6
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Fri Jun 3 15:47:40 2022 +0800

    [improvement] Optimize send fragment logic to reduce send fragment timeout 
error (#9720)
    
    This CL mainly changes:
    1. Reducing the rpc timeout problem caused by rpc waiting for the worker 
thread of brpc.
        1. Merge multiple fragment instances on the same BE to send requests to 
reduce the number of send fragment rpcs
        2. If fragments size >= 3, use 2 phase RPC: one is to send all 
fragments, two is to start these fragments. So that there
             will be at most 2 RPC for each query on one BE.
    
    3. Set the timeout of send fragment rpc to the query timeout to ensure the 
consistency of users' expectation of query timeout period.
    
    4. Do not close the connection anymore when rpc timeout occurs.
    5. Change some log level from info to debug to simplify the fe.log content.
    
    NOTICE:
    1. Change the definition of execPlanFragment rpc, must first upgrade BE.
    3. Remove FE config `remote_fragment_exec_timeout_ms`
---
 be/src/runtime/fragment_mgr.cpp                    | 131 +++++---
 be/src/runtime/fragment_mgr.h                      |   2 +
 be/src/runtime/query_fragments_ctx.h               |  22 ++
 be/src/service/internal_service.cpp                |  54 +++-
 be/src/service/internal_service.h                  |  13 +-
 .../org/apache/doris/catalog/TabletStatMgr.java    |   2 +-
 .../org/apache/doris/clone/BeLoadRebalancer.java   |  10 +-
 .../apache/doris/clone/ClusterLoadStatistic.java   |   2 +-
 .../java/org/apache/doris/clone/TabletChecker.java |   2 +-
 .../org/apache/doris/clone/TabletScheduler.java    |   4 +-
 .../main/java/org/apache/doris/common/Config.java  |   9 -
 .../load/routineload/RoutineLoadScheduler.java     |   4 +-
 .../org/apache/doris/master/ReportHandler.java     |   2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 349 +++++++++++++--------
 .../apache/doris/qe/InsertStreamTxnExecutor.java   |   7 +-
 .../java/org/apache/doris/qe/ResultReceiver.java   |   4 +-
 .../org/apache/doris/rpc/BackendServiceClient.java |  13 +-
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  56 ++--
 .../java/org/apache/doris/rpc/RpcException.java    |   5 +
 .../doris/load/sync/canal/CanalSyncDataTest.java   |  13 +-
 .../apache/doris/utframe/MockedBackendFactory.java |  24 +-
 gensrc/proto/internal_service.proto                |  14 +
 gensrc/thrift/PaloInternalService.thrift           |  24 +-
 23 files changed, 528 insertions(+), 238 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 02bf9d5bec..93bed66116 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -139,6 +139,8 @@ public:
     void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; }
     std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; }
 
+    void set_need_wait_execution_trigger() { _need_wait_execution_trigger = 
true; }
+
 private:
     void coordinator_callback(const Status& status, RuntimeProfile* profile, 
bool done);
 
@@ -170,6 +172,9 @@ private:
     std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
     // The pipe for data transfering, such as insert.
     std::shared_ptr<StreamLoadPipe> _pipe;
+
+    // If set the true, this plan fragment will be executed only after FE send 
execution start rpc.
+    bool _need_wait_execution_trigger = false;
 };
 
 FragmentExecState::FragmentExecState(const TUniqueId& query_id,
@@ -224,6 +229,11 @@ Status FragmentExecState::prepare(const 
TExecPlanFragmentParams& params) {
 }
 
 Status FragmentExecState::execute() {
+    if (_need_wait_execution_trigger) {
+        // if _need_wait_execution_trigger is true, which means this instance
+        // is prepared but need to wait for the signal to do the rest 
execution.
+        _fragments_ctx->wait_for_start();
+    }
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
@@ -518,6 +528,22 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params) {
     }
 }
 
+Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* 
request) {
+    std::lock_guard<std::mutex> lock(_lock);
+    TUniqueId query_id;
+    query_id.__set_hi(request->query_id().hi());
+    query_id.__set_lo(request->query_id().lo());
+    auto search = _fragments_ctx_map.find(query_id);
+    if (search == _fragments_ctx_map.end()) {
+        return Status::InternalError(
+                strings::Substitute("Failed to get query fragments context. 
Query may be "
+                                    "timeout or be cancelled. host: ",
+                                    BackendOptions::get_localhost()));
+    }
+    search->second->set_ready_to_execute();
+    return Status::OK();
+}
+
 void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
                            std::shared_ptr<StreamLoadPipe> pipe) {
     {
@@ -553,65 +579,62 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
     }
 
     std::shared_ptr<FragmentExecState> exec_state;
-    if (!params.__isset.is_simplified_param) {
-        // This is an old version params, all @Common components is set in 
TExecPlanFragmentParams.
-        exec_state.reset(new FragmentExecState(params.params.query_id,
-                                               
params.params.fragment_instance_id,
-                                               params.backend_num, _exec_env, 
params.coord));
+    std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
+    if (params.is_simplified_param) {
+        // Get common components from _fragments_ctx_map
+        std::lock_guard<std::mutex> lock(_lock);
+        auto search = _fragments_ctx_map.find(params.params.query_id);
+        if (search == _fragments_ctx_map.end()) {
+            return Status::InternalError(
+                    strings::Substitute("Failed to get query fragments 
context. Query may be "
+                                        "timeout or be cancelled. host: ",
+                                        BackendOptions::get_localhost()));
+        }
+        fragments_ctx = search->second;
     } else {
-        std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
-        if (params.is_simplified_param) {
-            // Get common components from _fragments_ctx_map
-            std::lock_guard<std::mutex> lock(_lock);
-            auto search = _fragments_ctx_map.find(params.params.query_id);
-            if (search == _fragments_ctx_map.end()) {
-                return Status::InternalError(
-                        strings::Substitute("Failed to get query fragments 
context. Query may be "
-                                            "timeout or be cancelled. host: ",
-                                            BackendOptions::get_localhost()));
-            }
-            fragments_ctx = search->second;
-        } else {
-            // This may be a first fragment request of the query.
-            // Create the query fragments context.
-            fragments_ctx.reset(new 
QueryFragmentsCtx(params.fragment_num_on_host, _exec_env));
-            fragments_ctx->query_id = params.params.query_id;
-            RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), 
params.desc_tbl,
-                                                  &(fragments_ctx->desc_tbl)));
-            fragments_ctx->coord_addr = params.coord;
-            fragments_ctx->query_globals = params.query_globals;
-
-            if (params.__isset.resource_info) {
-                fragments_ctx->user = params.resource_info.user;
-                fragments_ctx->group = params.resource_info.group;
-                fragments_ctx->set_rsc_info = true;
-            }
+        // This may be a first fragment request of the query.
+        // Create the query fragments context.
+        fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, 
_exec_env));
+        fragments_ctx->query_id = params.params.query_id;
+        RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), 
params.desc_tbl,
+                                              &(fragments_ctx->desc_tbl)));
+        fragments_ctx->coord_addr = params.coord;
+        fragments_ctx->query_globals = params.query_globals;
 
-            if (params.__isset.query_options) {
-                fragments_ctx->timeout_second = 
params.query_options.query_timeout;
-                if (params.query_options.__isset.resource_limit) {
-                    
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
-                }
+        if (params.__isset.resource_info) {
+            fragments_ctx->user = params.resource_info.user;
+            fragments_ctx->group = params.resource_info.group;
+            fragments_ctx->set_rsc_info = true;
+        }
+
+        if (params.__isset.query_options) {
+            fragments_ctx->timeout_second = params.query_options.query_timeout;
+            if (params.query_options.__isset.resource_limit) {
+                
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
             }
+        }
 
-            {
-                // Find _fragments_ctx_map again, in case some other request 
has already
-                // create the query fragments context.
-                std::lock_guard<std::mutex> lock(_lock);
-                auto search = _fragments_ctx_map.find(params.params.query_id);
-                if (search == _fragments_ctx_map.end()) {
-                    _fragments_ctx_map.insert(
-                            std::make_pair(fragments_ctx->query_id, 
fragments_ctx));
-                } else {
-                    // Already has a query fragmentscontext, use it
-                    fragments_ctx = search->second;
-                }
+        {
+            // Find _fragments_ctx_map again, in case some other request has 
already
+            // create the query fragments context.
+            std::lock_guard<std::mutex> lock(_lock);
+            auto search = _fragments_ctx_map.find(params.params.query_id);
+            if (search == _fragments_ctx_map.end()) {
+                
_fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, 
fragments_ctx));
+            } else {
+                // Already has a query fragmentscontext, use it
+                fragments_ctx = search->second;
             }
         }
+    }
 
-        exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
-                                               
params.params.fragment_instance_id,
-                                               params.backend_num, _exec_env, 
fragments_ctx));
+    exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
+                                           params.params.fragment_instance_id, 
params.backend_num,
+                                           _exec_env, fragments_ctx));
+    if (params.__isset.need_wait_execution_trigger && 
params.need_wait_execution_trigger) {
+        // set need_wait_execution_trigger means this instance will not 
actually being executed
+        // until the execPlanFragmentStart RPC trigger to start it.
+        exec_state->set_need_wait_execution_trigger();
     }
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
@@ -663,6 +686,7 @@ void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
         std::vector<TUniqueId> to_cancel;
+        std::vector<TUniqueId> to_cancel_queries;
         DateTimeValue now = DateTimeValue::local_time();
         {
             std::lock_guard<std::mutex> lock(_lock);
@@ -673,6 +697,9 @@ void FragmentMgr::cancel_worker() {
             }
             for (auto it = _fragments_ctx_map.begin(); it != 
_fragments_ctx_map.end();) {
                 if (it->second->is_timeout(now)) {
+                    // The execution logic of the instance needs to be 
notified.
+                    // The execution logic of the instance will eventually 
cancel the execution plan.
+                    it->second->set_ready_to_execute();
                     it = _fragments_ctx_map.erase(it);
                 } else {
                     ++it;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c208a03fdd..024cbfd23c 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -66,6 +66,8 @@ public:
     // TODO(zc): report this is over
     Status exec_plan_fragment(const TExecPlanFragmentParams& params, 
FinishCallback cb);
 
+    Status start_query_execution(const PExecPlanFragmentStartRequest* request);
+
     Status cancel(const TUniqueId& fragment_id) {
         return cancel(fragment_id, PPlanFragmentCancelReason::INTERNAL_ERROR);
     }
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index 720cda4146..73f6b415b9 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <atomic>
+#include <condition_variable>
 #include <string>
 
 #include "common/object_pool.h"
@@ -65,6 +66,21 @@ public:
         return _thread_token.get();
     }
 
+    void set_ready_to_execute() {
+        {
+            std::lock_guard<std::mutex> l(_start_lock);
+            _ready_to_execute = true;
+        }
+        _start_cond.notify_all();
+    }
+
+    void wait_for_start() {
+        std::unique_lock<std::mutex> l(_start_lock);
+        while (!_ready_to_execute.load()) {
+            _start_cond.wait(l);
+        }
+    }
+
 public:
     TUniqueId query_id;
     DescriptorTbl* desc_tbl;
@@ -94,6 +110,12 @@ private:
     // So that we can control the max thread that a query can be used to 
execute.
     // If this token is not set, the scanner will be executed in 
"_scan_thread_pool" in exec env.
     std::unique_ptr<ThreadPoolToken> _thread_token;
+
+    std::mutex _start_lock;
+    std::condition_variable _start_cond;
+    // Only valid when _need_wait_execution_trigger is set to true in 
FragmentExecState.
+    // And all fragments of this query will start execution when this is set 
to true.
+    std::atomic<bool> _ready_to_execute {false};
 };
 
 } // end of namespace
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 4cb6b8f7ee..c1185591e6 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -104,13 +104,33 @@ void 
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
     bool compact = request->has_compact() ? request->compact() : false;
-    st = _exec_plan_fragment(request->request(), compact);
+    PFragmentRequestVersion version =
+            request->has_version() ? request->version() : 
PFragmentRequestVersion::VERSION_1;
+    st = _exec_plan_fragment(request->request(), version, compact);
     if (!st.ok()) {
         LOG(WARNING) << "exec plan fragment failed, errmsg=" << 
st.get_error_msg();
     }
     st.to_protobuf(response->mutable_status());
 }
 
+template <typename T>
+void 
PInternalServiceImpl<T>::exec_plan_fragment_prepare(google::protobuf::RpcController*
 cntl_base,
+                                                      const 
PExecPlanFragmentRequest* request,
+                                                      PExecPlanFragmentResult* 
response,
+                                                      
google::protobuf::Closure* done) {
+    exec_plan_fragment(cntl_base, request, response, done);
+}
+
+template <typename T>
+void 
PInternalServiceImpl<T>::exec_plan_fragment_start(google::protobuf::RpcController*
 controller,
+                                                    const 
PExecPlanFragmentStartRequest* request,
+                                                    PExecPlanFragmentResult* 
result,
+                                                    google::protobuf::Closure* 
done) {
+    brpc::ClosureGuard closure_guard(done);
+    auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+    st.to_protobuf(result->mutable_status());
+}
+
 template <typename T>
 void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController*
 cntl_base,
                                                       const 
PTabletWriterAddBatchRequest* request,
@@ -162,14 +182,32 @@ void 
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
 }
 
 template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& 
ser_request, bool compact) {
-    TExecPlanFragmentParams t_request;
-    {
-        const uint8_t* buf = (const uint8_t*)ser_request.data();
-        uint32_t len = ser_request.size();
-        RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, 
&t_request));
+Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& 
ser_request,
+                                                 PFragmentRequestVersion 
version, bool compact) {
+    if (version == PFragmentRequestVersion::VERSION_1) {
+        // VERSION_1 should be removed in v1.2
+        TExecPlanFragmentParams t_request;
+        {
+            const uint8_t* buf = (const uint8_t*)ser_request.data();
+            uint32_t len = ser_request.size();
+            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, 
&t_request));
+        }
+        return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
+    } else if (version == PFragmentRequestVersion::VERSION_2) {
+        TExecPlanFragmentParamsList t_request;
+        {
+            const uint8_t* buf = (const uint8_t*)ser_request.data();
+            uint32_t len = ser_request.size();
+            RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, 
&t_request));
+        }
+
+        for (const TExecPlanFragmentParams& params : t_request.paramsList) {
+            
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+        }
+        return Status::OK();
+    } else {
+        return Status::InternalError("invalid version");
     }
-    return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
 }
 
 template <typename T>
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index c4073bf86e..550773b950 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -46,6 +46,16 @@ public:
                             PExecPlanFragmentResult* result,
                             google::protobuf::Closure* done) override;
 
+    void exec_plan_fragment_prepare(google::protobuf::RpcController* 
controller,
+                                    const PExecPlanFragmentRequest* request,
+                                    PExecPlanFragmentResult* result,
+                                    google::protobuf::Closure* done) override;
+
+    void exec_plan_fragment_start(google::protobuf::RpcController* controller,
+                                  const PExecPlanFragmentStartRequest* request,
+                                  PExecPlanFragmentResult* result,
+                                  google::protobuf::Closure* done) override;
+
     void cancel_plan_fragment(google::protobuf::RpcController* controller,
                               const PCancelPlanFragmentRequest* request,
                               PCancelPlanFragmentResult* result,
@@ -117,7 +127,8 @@ public:
                     PHandShakeResponse* response, google::protobuf::Closure* 
done) override;
 
 private:
-    Status _exec_plan_fragment(const std::string& s_request, bool compact);
+    Status _exec_plan_fragment(const std::string& s_request, 
PFragmentRequestVersion version,
+                               bool compact);
 
     Status _fold_constant_expr(const std::string& ser_request, 
PConstantExprResult* response);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index 57a5c1625b..03eac7ca26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -72,7 +72,7 @@ public class TabletStatMgr extends MasterDaemon {
                 }
             }
         });
-        LOG.info("finished to get tablet stat of all backends. cost: {} ms",
+        LOG.debug("finished to get tablet stat of all backends. cost: {} ms",
                 (System.currentTimeMillis() - start));
 
         // after update replica in all backends, update index row num
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index fb99b881da..9ea4bbc9fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -82,7 +82,7 @@ public class BeLoadRebalancer extends Rebalancer {
         clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, 
medium);
 
         if (lowBEs.isEmpty() && highBEs.isEmpty()) {
-            LOG.info("cluster is balance: {} with medium: {}. skip", 
clusterName, medium);
+            LOG.debug("cluster is balance: {} with medium: {}. skip", 
clusterName, medium);
             return alternativeTablets;
         }
 
@@ -186,9 +186,11 @@ public class BeLoadRebalancer extends Rebalancer {
             }
         } // end for high backends
 
-        LOG.info("select alternative tablets for cluster: {}, medium: {}, num: 
{}, detail: {}",
-                clusterName, medium, alternativeTablets.size(),
-                
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        if (!alternativeTablets.isEmpty()) {
+            LOG.info("select alternative tablets for cluster: {}, medium: {}, 
num: {}, detail: {}",
+                    clusterName, medium, alternativeTablets.size(),
+                    
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+        }
         return alternativeTablets;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
index 338336e41c..6afddb206f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
@@ -183,7 +183,7 @@ public class ClusterLoadStatistic {
             }
         }
 
-        LOG.info("classify backend by load. medium: {} avg load score: {}. 
low/mid/high: {}/{}/{}",
+        LOG.debug("classify backend by load. medium: {} avg load score: {}. 
low/mid/high: {}/{}/{}",
                 medium, avgLoadScore, lowCounter, midCounter, highCounter);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index 0025a27dc5..9bcd477a52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -209,7 +209,7 @@ public class TabletChecker extends MasterDaemon {
         removePriosIfNecessary();
 
         stat.counterTabletCheckRound.incrementAndGet();
-        LOG.info(stat.incrementalBrief());
+        LOG.debug(stat.incrementalBrief());
     }
 
     private static class CheckerCounter {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 8231269816..97c47b3418 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -322,7 +322,7 @@ public class TabletScheduler extends MasterDaemon {
                         infoService, invertedIndex);
                 clusterLoadStatistic.init();
                 newStatisticMap.put(clusterName, tag, clusterLoadStatistic);
-                LOG.info("update cluster {} load statistic:\n{}", clusterName, 
clusterLoadStatistic.getBrief());
+                LOG.debug("update cluster {} load statistic:\n{}", 
clusterName, clusterLoadStatistic.getBrief());
             }
         }
 
@@ -352,7 +352,7 @@ public class TabletScheduler extends MasterDaemon {
             pendingTablets.add(tabletCtx);
         }
 
-        LOG.info("adjust priority for all tablets. changed: {}, total: {}", 
changedNum, size);
+        LOG.debug("adjust priority for all tablets. changed: {}, total: {}", 
changedNum, size);
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 80b144cc04..e289b91b99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1031,15 +1031,6 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static boolean enable_local_replica_selection_fallback = false;
 
-
-    /**
-     * The timeout of executing async remote fragment.
-     * In normal case, the async remote fragment will be executed in a short 
time. If system are under high load
-     * condition,try to set this timeout longer.
-     */
-    @ConfField(mutable = true)
-    public static long remote_fragment_exec_timeout_ms = 5000; // 5 sec
-
     /**
      * The number of query retries.
      * A query may retry if we encounter RPC exception and no result has been 
sent to user.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index df15bdc5d6..d2349b1765 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -71,7 +71,9 @@ public class RoutineLoadScheduler extends MasterDaemon {
             LOG.warn("failed to get need schedule routine jobs", e);
         }
 
-        LOG.info("there are {} job need schedule", routineLoadJobList.size());
+        if (!routineLoadJobList.isEmpty()) {
+            LOG.info("there are {} job need schedule", 
routineLoadJobList.size());
+        }
         for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
             RoutineLoadJob.JobState errorJobState = null;
             UserException userException = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d875fecbe6..01192f895a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -338,7 +338,7 @@ public class ReportHandler extends Daemon {
     }
 
     private static void taskReport(long backendId, Map<TTaskType, Set<Long>> 
runningTasks) {
-        LOG.info("begin to handle task report from backend {}", backendId);
+        LOG.debug("begin to handle task report from backend {}", backendId);
         long start = System.currentTimeMillis();
 
         if (LOG.isDebugEnabled()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7c050bdc2c..334f203609 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -57,7 +57,10 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
+import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
 import org.apache.doris.proto.Types;
+import org.apache.doris.proto.Types.PUniqueId;
 import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
@@ -70,6 +73,7 @@ import org.apache.doris.thrift.TDescriptorTable;
 import org.apache.doris.thrift.TErrorTabletInfo;
 import org.apache.doris.thrift.TEsScanRange;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TLoadErrorHubInfo;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloScanRange;
@@ -103,6 +107,7 @@ import org.apache.commons.collections.map.HashedMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
+import org.jetbrains.annotations.NotNull;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -212,6 +217,10 @@ public class Coordinator {
     // parallel execute
     private final TUniqueId nextInstanceId;
 
+    // a timestamp represent the absolute timeout
+    // eg, System.currentTimeMillis() + query_timeout * 1000
+    private long timeoutDeadline;
+
     // Runtime filter merge instance address and ID
     public TNetworkAddress runtimeFilterMergeAddr;
     public TUniqueId runtimeFilterMergeInstanceId;
@@ -223,6 +232,7 @@ public class Coordinator {
     // Runtime filter ID to the builder instance number
     public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
 
+
     // Used for query/insert
     public Coordinator(ConnectContext context, Analyzer analyzer, Planner 
planner) {
         this.isBlockQuery = planner.isBlockQuery();
@@ -490,15 +500,14 @@ public class Coordinator {
         PlanFragmentId topId = fragments.get(0).getFragmentId();
         FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
         DataSink topDataSink = topParams.fragment.getSink();
+        this.timeoutDeadline = System.currentTimeMillis() + 
queryOptions.query_timeout * 1000;
         if (topDataSink instanceof ResultSink || topDataSink instanceof 
ResultFileSink) {
             TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
-            receiver = new ResultReceiver(
-                    topParams.instanceExecParams.get(0).instanceId,
-                    addressToBackendID.get(execBeAddr),
-                    toBrpcHost(execBeAddr),
-                    queryOptions.query_timeout * 1000);
+            receiver = new 
ResultReceiver(topParams.instanceExecParams.get(0).instanceId,
+                    addressToBackendID.get(execBeAddr), 
toBrpcHost(execBeAddr), this.timeoutDeadline);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("dispatch query job: {} to {}", 
DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
+                LOG.debug("dispatch query job: {} to {}", 
DebugUtil.printId(queryId),
+                        topParams.instanceExecParams.get(0).host);
             }
 
             if (topDataSink instanceof ResultFileSink
@@ -531,6 +540,31 @@ public class Coordinator {
         sendFragment();
     }
 
+    /**
+     * The logic for sending query plan fragments is as follows:
+     * First, plan fragments are dependent. According to the order in 
"fragments" list,
+     * it must be ensured that on the BE side, the next fragment instance can 
be executed
+     * only after the previous fragment instance is ready,
+     * <p>
+     * In the previous logic, we will send fragment instances in sequence 
through RPC,
+     * and will wait for the RPC of the previous fragment instance to return 
successfully
+     * before sending the next one. But for some complex queries, this may 
lead to too many RPCs.
+     * <p>
+     * The optimized logic is as follows:
+     * 1. If the number of fragment instance is <= 2, the original logic is 
still used
+     * to complete the sending of fragments through at most 2 RPCs.
+     * 2. If the number of fragment instance is >= 3, first group all 
fragments by BE,
+     * and send all fragment instances to the corresponding BE node through 
the FIRST rpc,
+     * but these fragment instances will only perform the preparation phase 
but will not be actually executed.
+     * After that, the execution logic of all fragment instances is started 
through the SECOND RPC.
+     * <p>
+     * After optimization, a query on a BE node will only send two RPCs at 
most.
+     * Thereby reducing the "send fragment timeout" error caused by too many 
RPCs and BE unable to process in time.
+     *
+     * @throws TException
+     * @throws RpcException
+     * @throws UserException
+     */
     private void sendFragment() throws TException, RpcException, UserException 
{
         lock();
         try {
@@ -540,23 +574,23 @@ public class Coordinator {
                     hostCounter.add(fi.host);
                 }
             }
-            // Execute all instances from up to bottom
-            // NOTICE: We must ensure that these fragments are executed 
sequentially,
-            // otherwise the data dependency between the fragments will be 
destroyed.
+
             int backendIdx = 0;
             int profileFragmentId = 0;
             long memoryLimit = queryOptions.getMemLimit();
+            Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
+            // If #fragments >=3, use twoPhaseExecution with 
exec_plan_fragments_prepare and exec_plan_fragments_start,
+            // else use exec_plan_fragments directly.
+            boolean twoPhaseExecution = fragments.size() >= 3;
             for (PlanFragment fragment : fragments) {
                 FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
 
-                // set up exec states
+                // 1. set up exec states
                 int instanceNum = params.instanceExecParams.size();
                 Preconditions.checkState(instanceNum > 0);
                 List<TExecPlanFragmentParams> tParams = 
params.toThrift(backendIdx);
-                List<Pair<BackendExecState, 
Future<InternalService.PExecPlanFragmentResult>>> futures =
-                        Lists.newArrayList();
 
-                // update memory limit for colocate join
+                // 2. update memory limit for colocate join
                 if 
(colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
                     int rate = 
Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum);
                     long newMemory = memoryLimit / rate;
@@ -574,15 +608,16 @@ public class Coordinator {
                     needCheckBackendState = true;
                 }
 
+                // 3. group BackendExecState by BE. So that we can use one RPC 
to send all fragment instances of a BE.
                 int instanceId = 0;
                 for (TExecPlanFragmentParams tParam : tParams) {
-                    BackendExecState execState = new 
BackendExecState(fragment.getFragmentId(), instanceId++,
-                            profileFragmentId, tParam, 
this.addressToBackendID);
-                    execState.unsetFields();
+                    BackendExecState execState =
+                            new BackendExecState(fragment.getFragmentId(), 
instanceId++, profileFragmentId, tParam, this.addressToBackendID);
                     // Each tParam will set the total number of Fragments that 
need to be executed on the same BE,
                     // and the BE will determine whether all Fragments have 
been executed based on this information.
                     
tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
                     tParam.setBackendId(execState.backend.getId());
+                    tParam.setNeedWaitExecutionTrigger(twoPhaseExecution);
 
                     backendExecStates.add(execState);
                     if (needCheckBackendState) {
@@ -592,69 +627,33 @@ public class Coordinator {
                                     fragment.getFragmentId().asInt(), jobId);
                         }
                     }
-                    futures.add(Pair.create(execState, 
execState.execRemoteFragmentAsync()));
 
-                    backendIdx++;
-                }
-
-                for (Pair<BackendExecState, 
Future<InternalService.PExecPlanFragmentResult>> pair : futures) {
-                    TStatusCode code;
-                    String errMsg = null;
-                    Exception exception = null;
-                    try {
-                        InternalService.PExecPlanFragmentResult result = 
pair.second.get(Config.remote_fragment_exec_timeout_ms,
-                                TimeUnit.MILLISECONDS);
-                        code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-                        if (!result.getStatus().getErrorMsgsList().isEmpty()) {
-                            errMsg = 
result.getStatus().getErrorMsgsList().get(0);
-                        }
-                    } catch (ExecutionException e) {
-                        LOG.warn("catch a execute exception", e);
-                        exception = e;
-                        code = TStatusCode.THRIFT_RPC_ERROR;
-                        
BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress);
-                    } catch (InterruptedException e) {
-                        LOG.warn("catch a interrupt exception", e);
-                        exception = e;
-                        code = TStatusCode.INTERNAL_ERROR;
-                    } catch (TimeoutException e) {
-                        LOG.warn("catch a timeout exception", e);
-                        exception = e;
-                        code = TStatusCode.TIMEOUT;
-                        
BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress);
+                    BackendExecStates states = 
beToExecStates.get(execState.backend.getId());
+                    if (states == null) {
+                        states = new 
BackendExecStates(execState.backend.getId(), execState.brpcAddress,
+                                twoPhaseExecution);
+                        beToExecStates.putIfAbsent(execState.backend.getId(), 
states);
                     }
+                    states.addState(execState);
+                }
+                profileFragmentId += 1;
+            } // end for fragments
 
-                    if (code != TStatusCode.OK) {
-                        if (exception != null) {
-                            errMsg = exception.getMessage();
-                        }
-
-                        if (errMsg == null) {
-                            errMsg = "exec rpc error. backend id: " + 
pair.first.backend.getId();
-                        }
-                        queryStatus.setStatus(errMsg);
-                        LOG.warn("exec plan fragment failed, errmsg={}, code: 
{}, fragmentId={}, backend={}:{}",
-                                errMsg, code, fragment.getFragmentId(),
-                                pair.first.address.hostname, 
pair.first.address.port);
-                        
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
-                        switch (code) {
-                            case TIMEOUT:
-                                throw new 
RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: "
-                                        + pair.first.backend.getId() + " 
fragment: " +
-                                        
DebugUtil.printId(pair.first.rpcParams.params.fragment_instance_id));
-                            case THRIFT_RPC_ERROR:
-                                
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
-                                throw new 
RpcException(pair.first.backend.getHost(), "rpc failed");
-                            default:
-                                throw new UserException(errMsg);
-                        }
-                    }
+            // 4. send and wait fragments rpc
+            List<Pair<BackendExecStates, 
Future<InternalService.PExecPlanFragmentResult>>> futures = 
Lists.newArrayList();
+            for (BackendExecStates states : beToExecStates.values()) {
+                states.unsetFields();
+                futures.add(Pair.create(states, 
states.execRemoteFragmentsAsync()));
+            }
+            waitRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send fragments");
 
-                    // succeed to send the plan fragment, update the 
"alreadySentBackendIds"
-                    alreadySentBackendIds.add(pair.first.backend.getId());
+            if (twoPhaseExecution) {
+                // 5. send and wait execution start rpc
+                futures.clear();
+                for (BackendExecStates states : beToExecStates.values()) {
+                    futures.add(Pair.create(states, 
states.execPlanFragmentStartAsync()));
                 }
-
-                profileFragmentId += 1;
+                waitRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send execution start");
             }
 
             attachInstanceProfileToFragmentProfile();
@@ -663,6 +662,63 @@ public class Coordinator {
         }
     }
 
+    private void waitRpc(List<Pair<BackendExecStates, 
Future<PExecPlanFragmentResult>>> futures, long timeoutMs,
+            String operation) throws RpcException, UserException {
+        if (timeoutMs <= 0) {
+            throw new UserException("timeout before waiting for " + operation 
+ " RPC. Elapse(sec): " + (
+                    (System.currentTimeMillis() - timeoutDeadline) / 1000 + 
queryOptions.query_timeout));
+        }
+        
+        for (Pair<BackendExecStates, Future<PExecPlanFragmentResult>> pair : 
futures) {
+            TStatusCode code;
+            String errMsg = null;
+            Exception exception = null;
+            try {
+                PExecPlanFragmentResult result = pair.second.get(timeoutMs, 
TimeUnit.MILLISECONDS);
+                code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+                        errMsg = result.getStatus().getErrorMsgsList().get(0);
+                    } else {
+                        errMsg = operation + " failed. backend id: " + 
pair.first.beId;
+                    }
+                }
+            } catch (ExecutionException e) {
+                exception = e;
+                code = TStatusCode.THRIFT_RPC_ERROR;
+                
BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr);
+            } catch (InterruptedException e) {
+                exception = e;
+                code = TStatusCode.INTERNAL_ERROR;
+            } catch (TimeoutException e) {
+                exception = e;
+                errMsg = "timeout when waiting for " + operation + " RPC. 
Elapse(sec): "
+                        + ((System.currentTimeMillis() - timeoutDeadline) / 
1000 + queryOptions.query_timeout);
+                code = TStatusCode.TIMEOUT;
+            }
+
+            if (code != TStatusCode.OK) {
+                if (exception != null && errMsg == null) {
+                    errMsg = operation + " failed. " + exception.getMessage();
+                }
+                queryStatus.setStatus(errMsg);
+                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+                switch (code) {
+                    case TIMEOUT:
+                        throw new RpcException(pair.first.brpcAddr.hostname, 
errMsg, exception);
+                    case THRIFT_RPC_ERROR:
+                        SimpleScheduler.addToBlacklist(pair.first.beId, 
errMsg);
+                        throw new RpcException(pair.first.brpcAddr.hostname, 
errMsg, exception);
+                    default:
+                        throw new UserException(errMsg, exception);
+                }
+            }
+
+            // succeed to send the plan fragment, update the 
"alreadySentBackendIds"
+            alreadySentBackendIds.add(pair.first.beId);
+        }
+    }
+
     public List<String> getExportFiles() {
         return exportFiles;
     }
@@ -1896,7 +1952,6 @@ public class Coordinator {
     public class BackendExecState {
         TExecPlanFragmentParams rpcParams;
         PlanFragmentId fragmentId;
-        int instanceId;
         boolean initiated;
         volatile boolean done;
         boolean hasCanceled;
@@ -1906,18 +1961,20 @@ public class Coordinator {
         TNetworkAddress address;
         Backend backend;
         long lastMissingHeartbeatTime = -1;
+        TUniqueId instanceId;
 
         public BackendExecState(PlanFragmentId fragmentId, int instanceId, int 
profileFragmentId,
                                 TExecPlanFragmentParams rpcParams, 
Map<TNetworkAddress, Long> addressToBackendID) {
             this.profileFragmentId = profileFragmentId;
             this.fragmentId = fragmentId;
-            this.instanceId = instanceId;
             this.rpcParams = rpcParams;
             this.initiated = false;
             this.done = false;
             FInstanceExecParam fi = 
fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId);
+            this.instanceId = fi.instanceId;
             this.address = fi.host;
             this.backend = idToBackend.get(addressToBackendID.get(address));
+            this.brpcAddress = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
 
             String name = "Instance " + DebugUtil.printId(fi.instanceId) + " 
(host=" + address + ")";
             this.profile = new RuntimeProfile(name);
@@ -2016,62 +2073,110 @@ public class Coordinator {
             return true;
         }
 
-        public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentAsync() throws TException, RpcException {
-            try {
-                brpcAddress = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
-            } catch (Exception e) {
-                throw new TException(e.getMessage());
+        public FragmentInstanceInfo buildFragmentInstanceInfo() {
+            return new 
QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId())
+                    
.fragmentId(String.valueOf(fragmentId)).address(this.address).build();
+        }
+
+        private TUniqueId fragmentInstanceId() {
+            return this.rpcParams.params.getFragmentInstanceId();
+        }
+    }
+
+    /**
+     * A set of BackendExecState for same Backend
+     */
+    public class BackendExecStates {
+        long beId;
+        TNetworkAddress brpcAddr;
+        List<BackendExecState> states = Lists.newArrayList();
+        boolean twoPhaseExecution = false;
+
+        public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean 
twoPhaseExecution) {
+            this.beId = beId;
+            this.brpcAddr = brpcAddr;
+            this.twoPhaseExecution = twoPhaseExecution;
+        }
+
+        public void addState(BackendExecState state) {
+            this.states.add(state);
+        }
+
+        /**
+         * The BackendExecState in states are all send to the same BE.
+         * So only the first BackendExecState need to carry some common 
fields, such as DescriptorTbl,
+         * the other BackendExecState does not need those fields. Unset them 
to reduce size.
+         */
+        public void unsetFields() {
+            boolean first = true;
+            for (BackendExecState state : states) {
+                if (first) {
+                    first = false;
+                    continue;
+                }
+                state.unsetFields();
             }
-            this.initiated = true;
+        }
+
+        public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentsAsync() throws TException {
             try {
-                return 
BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams);
+                TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
+                for (BackendExecState state : states) {
+                    paramsList.addToParamsList(state.rpcParams);
+                }
+                return BackendServiceProxy.getInstance()
+                        .execPlanFragmentsAsync(brpcAddr, paramsList, 
twoPhaseExecution);
             } catch (RpcException e) {
                 // DO NOT throw exception here, return a complete future with 
error code,
                 // so that the following logic will cancel the fragment.
-                return new Future<InternalService.PExecPlanFragmentResult>() {
-                    @Override
-                    public boolean cancel(boolean mayInterruptIfRunning) {
-                        return false;
-                    }
+                return futureWithException(e);
+            }
+        }
 
-                    @Override
-                    public boolean isCancelled() {
-                        return false;
-                    }
+        public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentStartAsync() throws TException {
+            try {
+                PExecPlanFragmentStartRequest.Builder builder = 
PExecPlanFragmentStartRequest.newBuilder();
+                PUniqueId qid = 
PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
+                builder.setQueryId(qid);
+                return 
BackendServiceProxy.getInstance().execPlanFragmentStartAsync(brpcAddr, 
builder.build());
+            } catch (RpcException e) {
+                // DO NOT throw exception here, return a complete future with 
error code,
+                // so that the following logic will cancel the fragment.
+                return futureWithException(e);
+            }
+        }
 
-                    @Override
-                    public boolean isDone() {
-                        return true;
-                    }
+        @NotNull
+        private Future<PExecPlanFragmentResult> 
futureWithException(RpcException e) {
+            return new Future<PExecPlanFragmentResult>() {
+                @Override
+                public boolean cancel(boolean mayInterruptIfRunning) {
+                    return false;
+                }
 
-                    @Override
-                    public InternalService.PExecPlanFragmentResult get() {
-                        InternalService.PExecPlanFragmentResult result = 
InternalService.PExecPlanFragmentResult
-                                .newBuilder()
-                                .setStatus(Types.PStatus.newBuilder()
-                                        .addErrorMsgs(e.getMessage())
-                                        
.setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue())
-                                        .build())
-                                .build();
-                        return result;
-                    }
+                @Override
+                public boolean isCancelled() {
+                    return false;
+                }
 
-                    @Override
-                    public InternalService.PExecPlanFragmentResult get(long 
timeout, TimeUnit unit) {
-                        return get();
-                    }
-                };
-            }
-        }
+                @Override
+                public boolean isDone() {
+                    return true;
+                }
 
-        public FragmentInstanceInfo buildFragmentInstanceInfo() {
-            return new QueryStatisticsItem.FragmentInstanceInfo.Builder()
-                    
.instanceId(fragmentInstanceId()).fragmentId(String.valueOf(fragmentId)).address(this.address)
-                    .build();
-        }
+                @Override
+                public PExecPlanFragmentResult get() {
+                    PExecPlanFragmentResult result = 
PExecPlanFragmentResult.newBuilder().setStatus(
+                            
Types.PStatus.newBuilder().addErrorMsgs(e.getMessage())
+                                    
.setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build();
+                    return result;
+                }
 
-        private TUniqueId fragmentInstanceId() {
-            return this.rpcParams.params.getFragmentInstanceId();
+                @Override
+                public PExecPlanFragmentResult get(long timeout, TimeUnit 
unit) {
+                    return get();
+                }
+            };
         }
     }
 
@@ -2288,5 +2393,3 @@ public class Coordinator {
     }
 }
 
-
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index ea0cb6cfda..41689a9b6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -30,6 +30,7 @@ import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TBrokerRangeDesc;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeParams;
@@ -85,8 +86,10 @@ public class InsertStreamTxnExecutor {
         txnEntry.setBackend(backend);
         TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
         try {
-            Future<InternalService.PExecPlanFragmentResult> future = 
BackendServiceProxy.getInstance().execPlanFragmentAsync(
-                    address, tRequest);
+            TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
+            paramsList.addToParamsList(tRequest);
+            Future<InternalService.PExecPlanFragmentResult> future =
+                    
BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, 
false);
             InternalService.PExecPlanFragmentResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 7978ebc2be..d713ed8a93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -48,11 +48,11 @@ public class ResultReceiver {
     private Long backendId;
     private Thread currentThread;
 
-    public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress 
address, int timeoutMs) {
+    public ResultReceiver(TUniqueId tid, Long backendId, TNetworkAddress 
address, long timeoutTs) {
         this.finstId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
         this.backendId = backendId;
         this.address = address;
-        this.timeoutTs = System.currentTimeMillis() + timeoutMs;
+        this.timeoutTs = timeoutTs;
     }
 
     public RowBatch getNext(Status status) throws TException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 73309047d7..82bd1508bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -44,8 +44,7 @@ public class BackendServiceClient {
         this.address = address;
         channel = NettyChannelBuilder.forAddress(address.getHostname(), 
address.getPort())
                 .flowControlWindow(Config.grpc_max_message_size_bytes)
-                .maxInboundMessageSize(Config.grpc_max_message_size_bytes)
-                .enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
+                
.maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
                 .usePlaintext().build();
         stub = PBackendServiceGrpc.newFutureStub(channel);
         blockingStub = PBackendServiceGrpc.newBlockingStub(channel);
@@ -56,6 +55,16 @@ public class BackendServiceClient {
         return stub.execPlanFragment(request);
     }
 
+    public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentPrepareAsync(
+            InternalService.PExecPlanFragmentRequest request) {
+        return stub.execPlanFragmentPrepare(request);
+    }
+
+    public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentStartAsync(
+            InternalService.PExecPlanFragmentStartRequest request) {
+        return stub.execPlanFragmentStart(request);
+    }
+
     public Future<InternalService.PCancelPlanFragmentResult> 
cancelPlanFragmentAsync(
             InternalService.PCancelPlanFragmentRequest request) {
         return stub.cancelPlanFragment(request);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index cbf43cac1e..95148aa8e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -19,8 +19,9 @@ package org.apache.doris.rpc;
 
 import org.apache.doris.common.Config;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
 import org.apache.doris.proto.Types;
-import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TFoldConstantParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
@@ -93,43 +94,58 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentAsync(
-            TNetworkAddress address, TExecPlanFragmentParams tRequest)
-            throws TException, RpcException {
-        InternalService.PExecPlanFragmentRequest.Builder builder = 
InternalService.PExecPlanFragmentRequest.newBuilder();
+    public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentsAsync(TNetworkAddress address,
+            TExecPlanFragmentParamsList paramsList, boolean twoPhaseExecution) 
throws TException, RpcException {
+        InternalService.PExecPlanFragmentRequest.Builder builder =
+                InternalService.PExecPlanFragmentRequest.newBuilder();
         if (Config.use_compact_thrift_rpc) {
-            builder.setRequest(ByteString.copyFrom(new TSerializer(new 
TCompactProtocol.Factory()).serialize(tRequest)));
+            builder.setRequest(
+                    ByteString.copyFrom(new TSerializer(new 
TCompactProtocol.Factory()).serialize(paramsList)));
             builder.setCompact(true);
         } else {
-            builder.setRequest(ByteString.copyFrom(new 
TSerializer().serialize(tRequest))).build();
+            builder.setRequest(ByteString.copyFrom(new 
TSerializer().serialize(paramsList))).build();
             builder.setCompact(false);
         }
+        // VERSION 2 means we send TExecPlanFragmentParamsList, not single 
TExecPlanFragmentParams
+        builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2);
 
         final InternalService.PExecPlanFragmentRequest pRequest = 
builder.build();
         try {
             final BackendServiceClient client = getProxy(address);
-            return client.execPlanFragmentAsync(pRequest);
+            if (twoPhaseExecution) {
+                return client.execPlanFragmentPrepareAsync(pRequest);
+            } else {
+                return client.execPlanFragmentAsync(pRequest);
+            }
         } catch (Throwable e) {
-            LOG.warn("Execute plan fragment catch a exception, address={}:{}",
-                    address.getHostname(), address.getPort(), e);
+            LOG.warn("Execute plan fragment catch a exception, address={}:{}", 
address.getHostname(), address.getPort(),
+                    e);
             throw new RpcException(address.hostname, e.getMessage());
         }
     }
 
-    public Future<InternalService.PCancelPlanFragmentResult> 
cancelPlanFragmentAsync(
-            TNetworkAddress address, TUniqueId finstId, 
Types.PPlanFragmentCancelReason cancelReason)
-            throws RpcException {
-        final InternalService.PCancelPlanFragmentRequest pRequest = 
InternalService.PCancelPlanFragmentRequest
-                .newBuilder()
-                .setFinstId(
-                        
Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build())
-                .setCancelReason(cancelReason).build();
+    public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentStartAsync(TNetworkAddress address,
+            PExecPlanFragmentStartRequest request) throws TException, 
RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.execPlanFragmentStartAsync(request);
+        } catch (Exception e) {
+            throw new RpcException(address.hostname, e.getMessage(), e);
+        }
+    }
+
+    public Future<InternalService.PCancelPlanFragmentResult> 
cancelPlanFragmentAsync(TNetworkAddress address,
+            TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason) 
throws RpcException {
+        final InternalService.PCancelPlanFragmentRequest pRequest =
+                InternalService.PCancelPlanFragmentRequest.newBuilder()
+                        
.setFinstId(Types.PUniqueId.newBuilder().setHi(finstId.hi).setLo(finstId.lo).build())
+                        .setCancelReason(cancelReason).build();
         try {
             final BackendServiceClient client = getProxy(address);
             return client.cancelPlanFragmentAsync(pRequest);
         } catch (Throwable e) {
-            LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
-                    address.getHostname(), address.getPort(), e);
+            LOG.warn("Cancel plan fragment catch a exception, address={}:{}", 
address.getHostname(), address.getPort(),
+                    e);
             throw new RpcException(address.hostname, e.getMessage());
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java
index d65802633b..45c765f01b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java
@@ -28,6 +28,11 @@ public class RpcException extends Exception {
         this.host = host;
     }
 
+    public RpcException(String host, String message, Exception e) {
+        super(message, e);
+        this.host = host;
+    }
+
     @Override
     public String getMessage() {
         if (Strings.isNullOrEmpty(host)) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 3b1b4eddfc..4a7f4cf3c3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.StreamLoadTask;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanFragmentExecParams;
 import org.apache.doris.thrift.TStorageMedium;
@@ -253,7 +254,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = 105L;
 
-                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) 
any, (TExecPlanFragmentParams) any);
+                backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) 
any, (TExecPlanFragmentParamsList) any,
+                        anyBoolean);
                 minTimes = 0;
                 result = execFuture;
 
@@ -261,7 +263,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = commitFuture;
 
-                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any,
+                        (List<InternalService.PDataRow>) any);
                 minTimes = 0;
                 result = sendDataFuture;
 
@@ -324,7 +327,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = 105L;
 
-                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) 
any, (TExecPlanFragmentParams) any);
+                backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) 
any, (TExecPlanFragmentParamsList) any,
+                        anyBoolean);
                 minTimes = 0;
                 result = execFuture;
 
@@ -390,7 +394,8 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = 105L;
 
-                backendServiceProxy.execPlanFragmentAsync((TNetworkAddress) 
any, (TExecPlanFragmentParams) any);
+                backendServiceProxy.execPlanFragmentsAsync((TNetworkAddress) 
any, (TExecPlanFragmentParamsList) any,
+                        anyBoolean);
                 minTimes = 0;
                 result = execFuture;
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index bc3e7928fd..902de437b8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -337,7 +337,8 @@ public class MockedBackendFactory {
         }
 
         @Override
-        public void execPlanFragment(InternalService.PExecPlanFragmentRequest 
request, StreamObserver<InternalService.PExecPlanFragmentResult> 
responseObserver) {
+        public void execPlanFragment(InternalService.PExecPlanFragmentRequest 
request,
+                StreamObserver<InternalService.PExecPlanFragmentResult> 
responseObserver) {
             System.out.println("get exec_plan_fragment request");
             
responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
                     
.setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
@@ -345,7 +346,26 @@ public class MockedBackendFactory {
         }
 
         @Override
-        public void 
cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request, 
StreamObserver<InternalService.PCancelPlanFragmentResult> responseObserver) {
+        public void 
execPlanFragmentPrepare(InternalService.PExecPlanFragmentRequest request,
+                StreamObserver<InternalService.PExecPlanFragmentResult> 
responseObserver) {
+            System.out.println("get exec_plan_fragment_prepare request");
+            
responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
+                    
.setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void 
execPlanFragmentStart(InternalService.PExecPlanFragmentStartRequest request,
+                StreamObserver<InternalService.PExecPlanFragmentResult> 
responseObserver) {
+            System.out.println("get exec_plan_fragment_start request");
+            
responseObserver.onNext(InternalService.PExecPlanFragmentResult.newBuilder()
+                    
.setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void 
cancelPlanFragment(InternalService.PCancelPlanFragmentRequest request,
+                StreamObserver<InternalService.PCancelPlanFragmentResult> 
responseObserver) {
             System.out.println("get cancel_plan_fragment request");
             
responseObserver.onNext(InternalService.PCancelPlanFragmentResult.newBuilder()
                     
.setStatus(Types.PStatus.newBuilder().setStatusCode(0)).build());
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 4d73bcb701..45662c10cd 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -128,11 +128,21 @@ message PTabletWriterCancelRequest {
 message PTabletWriterCancelResult {
 };
 
+enum PFragmentRequestVersion {
+    VERSION_1 = 1;  // only one TExecPlanFragmentParams in request
+    VERSION_2 = 2;  // multi TExecPlanFragmentParams in request
+};
+
 message PExecPlanFragmentRequest {
     optional bytes request = 1;
     optional bool compact = 2;
+    optional PFragmentRequestVersion version = 3 [default = VERSION_2];
 };
 
+message PExecPlanFragmentStartRequest {
+    optional PUniqueId query_id = 1;
+}
+
 message PExecPlanFragmentResult {
     required PStatus status = 1;
 };
@@ -423,7 +433,11 @@ message PResetRPCChannelResponse {
 
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
+    // If #fragments of a query is < 3, use exec_plan_fragment directly.
+    // If #fragments of a query is >=3, use exec_plan_fragment_prepare + 
exec_plan_fragment_start
     rpc exec_plan_fragment(PExecPlanFragmentRequest) returns 
(PExecPlanFragmentResult);
+    rpc exec_plan_fragment_prepare(PExecPlanFragmentRequest) returns 
(PExecPlanFragmentResult);
+    rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns 
(PExecPlanFragmentResult);
     rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns 
(PCancelPlanFragmentResult);
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns 
(PTabletWriterOpenResult);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 19ae35a64e..aa1576b95a 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -277,8 +277,18 @@ struct TTxnParams {
   10: optional double max_filter_ratio
 }
 
-// ExecPlanFragment
+// Definition of global dict, global dict is used to accelerate query 
performance of low cardinality data
+struct TColumnDict {
+  1: optional Types.TPrimitiveType type
+  2: list<string> str_dict  // map one string to a integer, using offset as id
+}
+
+struct TGlobalDict {
+  1: optional map<i32, TColumnDict> dicts,  // map dict_id to column dict
+  2: optional map<i32, i32> slot_dicts // map from slot id to column dict id, 
because 2 or more column may share the dict
+}
 
+// ExecPlanFragment
 struct TExecPlanFragmentParams {
   1: required PaloInternalServiceVersion protocol_version
 
@@ -330,9 +340,19 @@ struct TExecPlanFragmentParams {
 
   // If true, all @Common components is unset and should be got from BE's cache
   // If this field is unset or it set to false, all @Common components is set.
-  16: optional bool is_simplified_param
+  16: optional bool is_simplified_param = false;
   17: optional TTxnParams txn_conf
   18: optional i64 backend_id
+  19: optional TGlobalDict global_dict  // scan node could use the global dict 
to encode the string value to an integer
+
+  // If it is true, after this fragment is prepared on the BE side,
+  // it will wait for the FE to send the "start execution" command before it 
is actually executed.
+  // Otherwise, the fragment will start executing directly on the BE side.
+  20: optional bool need_wait_execution_trigger = false;
+}
+
+struct TExecPlanFragmentParamsList {
+    1: optional list<TExecPlanFragmentParams> paramsList;
 }
 
 struct TExecPlanFragmentResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to