This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 513784188ad [PipelineX](improvement) Prepare tasks in parallel 
(#40844) (#40874)
513784188ad is described below

commit 513784188adf88879dc4c23602527fbbd5fb8e96
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Sep 18 10:02:58 2024 +0800

    [PipelineX](improvement) Prepare tasks in parallel (#40844) (#40874)
    
    pick #40844
---
 be/src/common/config.cpp                           |  2 +-
 be/src/exprs/runtime_filter.cpp                    |  1 +
 be/src/pipeline/pipeline.h                         |  3 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 98 ++++++++++++++++------
 be/src/pipeline/pipeline_fragment_context.h        |  9 +-
 be/src/runtime/fragment_mgr.cpp                    |  3 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  8 ++
 .../java/org/apache/doris/qe/SessionVariable.java  |  9 +-
 gensrc/thrift/PaloInternalService.thrift           |  1 +
 9 files changed, 101 insertions(+), 33 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 75f4bb7c4ea..d6154c491d5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -511,7 +511,7 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
 DEFINE_mBool(enable_bthread_transmit_block, "true");
 
 // The maximum amount of data that can be processed by a stream load
-DEFINE_mInt64(streaming_load_max_mb, "10240");
+DEFINE_mInt64(streaming_load_max_mb, "102400");
 // Some data formats, such as JSON, cannot be streamed.
 // Therefore, it is necessary to limit the maximum number of
 // such data when using stream load to prevent excessive memory consumption.
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 8d7281e9426..456637fb2aa 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1235,6 +1235,7 @@ void IRuntimeFilter::signal() {
 }
 
 void 
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> 
timer) {
+    std::unique_lock lock(_inner_mutex);
     _filter_timer.push_back(timer);
 }
 
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index e7584dccd4a..ae20c760110 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -105,7 +105,6 @@ public:
     void set_children(std::vector<std::shared_ptr<Pipeline>> children) { 
_children = children; }
 
     void incr_created_tasks() { _num_tasks_created++; }
-    bool need_to_create_task() const { return _num_tasks > _num_tasks_created; 
}
     void set_num_tasks(int num_tasks) {
         _num_tasks = num_tasks;
         for (auto& op : operatorXs) {
@@ -160,7 +159,7 @@ private:
     // How many tasks should be created ?
     int _num_tasks = 1;
     // How many tasks are already created?
-    int _num_tasks_created = 0;
+    std::atomic<int> _num_tasks_created = 0;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 4ca3e20ed24..19eb023e27e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -137,8 +137,10 @@ PipelineFragmentContext::~PipelineFragmentContext() {
         }
     }
     _tasks.clear();
-    for (auto& runtime_state : _task_runtime_states) {
-        runtime_state.reset();
+    for (auto& runtime_states : _task_runtime_states) {
+        for (auto& runtime_state : runtime_states) {
+            runtime_state.reset();
+        }
     }
     _pipelines.clear();
     _sink.reset();
@@ -229,7 +231,8 @@ PipelinePtr 
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
     return pipeline;
 }
 
-Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& 
request) {
+Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& 
request,
+                                        ThreadPool* thread_pool) {
     if (_prepared) {
         return Status::InternalError("Already prepared");
     }
@@ -346,7 +349,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     {
         SCOPED_TIMER(_build_tasks_timer);
         // 5. Build pipeline tasks and initialize local state.
-        RETURN_IF_ERROR(_build_pipeline_tasks(request));
+        RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
     }
 
     _init_next_report_time();
@@ -355,17 +358,23 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     return Status::OK();
 }
 
-Status PipelineFragmentContext::_build_pipeline_tasks(
-        const doris::TPipelineFragmentParams& request) {
+Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFragmentParams& request,
+                                                      ThreadPool* thread_pool) 
{
     _total_tasks = 0;
-    int target_size = request.local_params.size();
+    const auto target_size = request.local_params.size();
     _tasks.resize(target_size);
+    _fragment_instance_ids.resize(target_size);
+    _runtime_filter_states.resize(target_size);
+    _task_runtime_states.resize(_pipelines.size());
+    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+        _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
+    }
     auto pipeline_id_to_profile = 
_runtime_state->build_pipeline_profile(_pipelines.size());
 
-    for (size_t i = 0; i < target_size; i++) {
+    auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) {
         const auto& local_params = request.local_params[i];
         auto fragment_instance_id = local_params.fragment_instance_id;
-        _fragment_instance_ids.push_back(fragment_instance_id);
+        _fragment_instance_ids[i] = fragment_instance_id;
         std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
         auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& 
runtime_state) {
             
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
@@ -424,7 +433,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
 
         filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
 
-        _runtime_filter_states.push_back(std::move(filterparams));
+        _runtime_filter_states[i] = std::move(filterparams);
         std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
         auto get_local_exchange_state = [&](PipelinePtr pipeline)
                 -> std::map<int, 
std::pair<std::shared_ptr<LocalExchangeSharedState>,
@@ -447,13 +456,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
 
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             auto& pipeline = _pipelines[pip_idx];
-            if (pipeline->need_to_create_task()) {
-                // build task runtime state
-                _task_runtime_states.push_back(RuntimeState::create_unique(
+            if (pipeline->num_tasks() > 1 || i == 0) {
+                DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
+                        << 
print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
+                        << pipeline->debug_string();
+                _task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
                         this, local_params.fragment_instance_id, 
request.query_id,
                         request.fragment_id, request.query_options, 
_query_ctx->query_globals,
-                        _exec_env, _query_ctx.get()));
-                auto& task_runtime_state = _task_runtime_states.back();
+                        _exec_env, _query_ctx.get());
+                auto& task_runtime_state = _task_runtime_states[pip_idx][i];
                 init_runtime_state(task_runtime_state);
                 auto cur_task_id = _total_tasks++;
                 task_runtime_state->set_task_id(cur_task_id);
@@ -527,6 +538,39 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
             std::lock_guard<std::mutex> l(_state_map_lock);
             _runtime_filter_mgr_map[fragment_instance_id] = 
std::move(runtime_filter_mgr);
         }
+        return Status::OK();
+    };
+    if (target_size > 1 &&
+        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
+         target_size > 
_runtime_state->query_options().parallel_prepare_threshold)) {
+        std::vector<Status> prepare_status(target_size);
+        std::mutex m;
+        std::condition_variable cv;
+        int prepare_done = 0;
+        for (size_t i = 0; i < target_size; i++) {
+            RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
+                SCOPED_ATTACH_TASK(_query_ctx.get());
+                prepare_status[i] = pre_and_submit(i, this);
+                std::unique_lock<std::mutex> lock(m);
+                prepare_done++;
+                if (prepare_done == target_size) {
+                    cv.notify_one();
+                }
+            }));
+        }
+        std::unique_lock<std::mutex> lock(m);
+        if (prepare_done != target_size) {
+            cv.wait(lock);
+            for (size_t i = 0; i < target_size; i++) {
+                if (!prepare_status[i].ok()) {
+                    return prepare_status[i];
+                }
+            }
+        }
+    } else {
+        for (size_t i = 0; i < target_size; i++) {
+            RETURN_IF_ERROR(pre_and_submit(i, this));
+        }
     }
     _pipeline_parent_map.clear();
     _dag.clear();
@@ -1683,8 +1727,12 @@ Status PipelineFragmentContext::send_report(bool done) {
 
     std::vector<RuntimeState*> runtime_states;
 
-    for (auto& task_state : _task_runtime_states) {
-        runtime_states.push_back(task_state.get());
+    for (auto& task_states : _task_runtime_states) {
+        for (auto& task_state : task_states) {
+            if (task_state) {
+                runtime_states.push_back(task_state.get());
+            }
+        }
     }
 
     ReportStatusRequest req {exec_status,
@@ -1755,15 +1803,17 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile_x() const {
         return nullptr;
     }
 
-    for (auto& runtime_state : _task_runtime_states) {
-        if (runtime_state->runtime_profile() == nullptr) {
-            continue;
-        }
+    for (auto& runtime_states : _task_runtime_states) {
+        for (auto& runtime_state : runtime_states) {
+            if (runtime_state->runtime_profile() == nullptr) {
+                continue;
+            }
 
-        auto tmp_load_channel_profile = 
std::make_shared<TRuntimeProfileTree>();
+            auto tmp_load_channel_profile = 
std::make_shared<TRuntimeProfileTree>();
 
-        
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
-        
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+            
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
+            
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+        }
     }
 
     auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 06c88267441..e0e4c12ef0d 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -88,7 +88,7 @@ public:
     // should be protected by lock?
     [[nodiscard]] bool is_canceled() const { return 
_runtime_state->is_cancelled(); }
 
-    Status prepare(const doris::TPipelineFragmentParams& request);
+    Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* 
thread_pool);
 
     Status submit();
 
@@ -187,7 +187,8 @@ private:
 
     bool _enable_local_shuffle() const { return 
_runtime_state->enable_local_shuffle(); }
 
-    Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request);
+    Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
+                                 ThreadPool* thread_pool);
     void _close_fragment_instance();
     void _init_next_report_time();
 
@@ -206,7 +207,7 @@ private:
     int _closed_tasks = 0;
     // After prepared, `_total_tasks` is equal to the size of `_tasks`.
     // When submit fail, `_total_tasks` is equal to the number of tasks 
submitted.
-    int _total_tasks = 0;
+    std::atomic<int> _total_tasks = 0;
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
     bool _is_report_success = false;
@@ -303,7 +304,7 @@ private:
 
     std::vector<TUniqueId> _fragment_instance_ids;
     // Local runtime states for each task
-    std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
+    std::vector<std::vector<std::unique_ptr<RuntimeState>>> 
_task_runtime_states;
 
     std::vector<std::unique_ptr<RuntimeFilterParamsContext>> 
_runtime_filter_states;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3d4f072ed85..5bb01dc9ba5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -820,7 +820,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     {
         SCOPED_RAW_TIMER(&duration_ns);
         Status prepare_st = Status::OK();
-        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = 
context->prepare(params), prepare_st);
+        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, 
_thread_pool.get()),
+                                         prepare_st);
         if (!prepare_st.ok()) {
             query_ctx->cancel(prepare_st, params.fragment_id);
             query_ctx->set_execution_dependency_ready();
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index dd221c6aaa3..f18467fbad9 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -92,6 +92,14 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
 
 template <typename Parent>
 Status Channel<Parent>::open(RuntimeState* state) {
+    if (_is_local) {
+        auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
+                _fragment_instance_id, _dest_node_id, &_local_recvr);
+        if (!st.ok()) {
+            // Recvr not found. Maybe downstream task is finished already.
+            LOG(INFO) << "Recvr is not found : " << st.to_string();
+        }
+    }
     _be_number = state->be_number();
     _brpc_request = std::make_shared<PTransmitDataParams>();
     // initialize brpc request
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7e70576145c..3eba30ebf41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -290,6 +290,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String AUTO_BROADCAST_JOIN_THRESHOLD = 
"auto_broadcast_join_threshold";
 
+    public static final String PARALLEL_PREPARE_THRESHOLD = 
"parallel_prepare_threshold";
+
     public static final String ENABLE_PROJECTION = "enable_projection";
 
     public static final String ENABLE_SHORT_CIRCUIT_QUERY = 
"enable_short_circuit_query";
@@ -1010,7 +1012,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
     @VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = 
true,
             varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
-    private long parallelScanMinRowsPerScanner = 16384; // 16K
+    private long parallelScanMinRowsPerScanner = 2097152; // 16K
 
     @VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = 
false,
             varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
@@ -1053,6 +1055,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
     public double autoBroadcastJoinThreshold = 0.8;
 
+    @VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD, fuzzy = true)
+    public int parallelPrepareThreshold = 32;
+
     @VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
     private boolean enableJoinReorderBasedCost = false;
 
@@ -2109,6 +2114,7 @@ public class SessionVariable implements Serializable, 
Writable {
         Random random = new SecureRandom();
         this.parallelExecInstanceNum = random.nextInt(8) + 1;
         this.parallelPipelineTaskNum = random.nextInt(8);
+        this.parallelPrepareThreshold = random.nextInt(32) + 1;
         this.enableCommonExprPushdown = random.nextBoolean();
         this.enableLocalExchange = random.nextBoolean();
         // This will cause be dead loop, disable it first
@@ -3529,6 +3535,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setNumScannerThreads(numScannerThreads);
         tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
         tResult.setMaxColumnReaderNum(maxColumnReaderNum);
+        tResult.setParallelPrepareThreshold(parallelPrepareThreshold);
 
         // TODO chenhao, reservation will be calculated by cost
         tResult.setMinReservation(0);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b3000b66ea9..d3558b1f0f3 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -332,6 +332,7 @@ struct TQueryOptions {
 
   125: optional bool enable_segment_cache = true;
 
+  132: optional i32 parallel_prepare_threshold = 0;
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to