This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 40867742116 [pick](branch-3.0) pick #42216  #42421 (#42542)
40867742116 is described below

commit 408677421165ab66e7976790789eb257de57e490
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Mon Oct 28 19:23:36 2024 +0800

    [pick](branch-3.0) pick #42216  #42421 (#42542)
    
    pick #42216  #42421
---
 .../pipeline/exec/memory_scratch_sink_operator.cpp |  2 +-
 be/src/runtime/fragment_mgr.cpp                    | 44 +++++++++++++---------
 be/src/runtime/fragment_mgr.h                      |  1 +
 be/src/runtime/record_batch_queue.cpp              | 10 ++++-
 be/src/runtime/result_queue_mgr.cpp                |  6 ++-
 be/src/service/backend_service.cpp                 | 14 ++++++-
 6 files changed, 52 insertions(+), 25 deletions(-)

diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp 
b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
index 69e30791c13..131f3caf42c 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
@@ -100,7 +100,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, 
arrow::default_memory_pool(),
                                            &result, _timezone_obj));
     local_state._queue->blocking_put(result);
-    if (local_state._queue->size() < 10) {
+    if (local_state._queue->size() > config::max_memory_sink_batch_count) {
         local_state._queue_dependency->block();
     }
     return Status::OK();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7ba73442c90..e683b84e2b4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -299,6 +299,10 @@ Status FragmentMgr::trigger_pipeline_context_report(
 // including the final status when execution finishes.
 void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
     DCHECK(req.status.ok() || req.done); // if !status.ok() => done
+    if (req.coord_addr.hostname == "external") {
+        // External query (flink/spark read tablets) not need to report to FE.
+        return;
+    }
     Status exec_status = req.status;
     Status coord_status;
     FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
req.coord_addr,
@@ -836,31 +840,33 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         query_ctx->set_merge_controller_handler(handler);
     }
 
-    for (const auto& local_param : params.local_params) {
-        const TUniqueId& fragment_instance_id = 
local_param.fragment_instance_id;
+    {
+        // (query_id, fragment_id) is executed only on one BE, locks 
_pipeline_map.
         std::lock_guard<std::mutex> lock(_lock);
-        auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
-        if (iter != _pipeline_map.end()) {
-            return Status::InternalError("exec_plan_fragment input duplicated 
fragment_id({})",
-                                         params.fragment_id);
+        for (const auto& local_param : params.local_params) {
+            const TUniqueId& fragment_instance_id = 
local_param.fragment_instance_id;
+            auto iter = _pipeline_map.find({params.query_id, 
params.fragment_id});
+            if (iter != _pipeline_map.end()) {
+                return Status::InternalError(
+                        "exec_plan_fragment query_id({}) input duplicated 
fragment_id({})",
+                        print_id(params.query_id), params.fragment_id);
+            }
+            query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
         }
-        query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
-    }
 
-    if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
-        query_ctx->set_ready_to_execute_only();
-    }
-
-    int64 now = duration_cast<std::chrono::milliseconds>(
-                        std::chrono::system_clock::now().time_since_epoch())
-                        .count();
-    {
+        int64 now = duration_cast<std::chrono::milliseconds>(
+                            
std::chrono::system_clock::now().time_since_epoch())
+                            .count();
         g_fragment_executing_count << 1;
         g_fragment_last_active_time.set_value(now);
-        std::lock_guard<std::mutex> lock(_lock);
         // TODO: simplify this mapping
         _pipeline_map.insert({{params.query_id, params.fragment_id}, context});
     }
+
+    if (!params.__isset.need_wait_execution_trigger || 
!params.need_wait_execution_trigger) {
+        query_ctx->set_ready_to_execute_only();
+    }
+
     query_ctx->set_pipeline_context(params.fragment_id, context);
 
     RETURN_IF_ERROR(context->submit());
@@ -1070,6 +1076,7 @@ void FragmentMgr::debug(std::stringstream& ss) {}
  */
 Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
                                                 const TQueryPlanInfo& 
t_query_plan_info,
+                                                const TUniqueId& query_id,
                                                 const TUniqueId& 
fragment_instance_id,
                                                 std::vector<TScanColumnDesc>* 
selected_columns) {
     // set up desc tbl
@@ -1110,8 +1117,9 @@ Status FragmentMgr::exec_external_plan_fragment(const 
TScanOpenParams& params,
 
     // assign the param used for executing of PlanFragment-self
     TPipelineInstanceParams fragment_exec_params;
-    exec_fragment_params.query_id = t_query_plan_info.query_id;
+    exec_fragment_params.query_id = query_id;
     fragment_exec_params.fragment_instance_id = fragment_instance_id;
+    exec_fragment_params.coord.hostname = "external";
     std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> 
per_node_scan_ranges;
     std::vector<TScanRangeParams> scan_ranges;
     std::vector<int64_t> tablet_ids = params.tablet_ids;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 41b63db0b23..20b2fd8cdc2 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -112,6 +112,7 @@ public:
     // execute external query, all query info are packed in TScanOpenParams
     Status exec_external_plan_fragment(const TScanOpenParams& params,
                                        const TQueryPlanInfo& t_query_plan_info,
+                                       const TUniqueId& query_id,
                                        const TUniqueId& fragment_instance_id,
                                        std::vector<TScanColumnDesc>* 
selected_columns);
 
diff --git a/be/src/runtime/record_batch_queue.cpp 
b/be/src/runtime/record_batch_queue.cpp
index 83982688880..25db550db3a 100644
--- a/be/src/runtime/record_batch_queue.cpp
+++ b/be/src/runtime/record_batch_queue.cpp
@@ -23,10 +23,16 @@
 namespace doris {
 
 bool RecordBatchQueue::blocking_get(std::shared_ptr<arrow::RecordBatch>* 
result) {
-    auto res = _queue.blocking_get(result);
-    if (_dep && size() <= 10) {
+    if (_dep && size() <= config::max_memory_sink_batch_count) {
         _dep->set_ready();
     }
+    // Before each get queue, will set sink task dependency ready.
+    // so if the sink task put queue faster than the fetch result get queue,
+    // the queue size will always be 10.
+    // be sure to set sink dependency ready before getting queue.
+    // otherwise, if queue is emptied after sink task put queue and before 
block dependency,
+    // get queue will stuck and will never set sink dependency ready.
+    auto res = _queue.blocking_get(result);
     return res;
 }
 
diff --git a/be/src/runtime/result_queue_mgr.cpp 
b/be/src/runtime/result_queue_mgr.cpp
index 8090a3e6ee4..8a6e5b10935 100644
--- a/be/src/runtime/result_queue_mgr.cpp
+++ b/be/src/runtime/result_queue_mgr.cpp
@@ -82,8 +82,10 @@ void ResultQueueMgr::create_queue(const TUniqueId& 
fragment_instance_id,
     if (iter != _fragment_queue_map.end()) {
         *queue = iter->second;
     } else {
-        // the blocking queue size = 20 (default), in this way, one queue have 
20 * 1024 rows at most
-        BlockQueueSharedPtr tmp(new 
RecordBatchQueue(config::max_memory_sink_batch_count));
+        // max_elements will not take effect, because when queue size reaches 
max_memory_sink_batch_count,
+        // MemoryScratchSink will block queue dependency, in this way, one 
queue have 20 * 1024 rows at most.
+        // use MemoryScratchSink queue dependency instead of BlockingQueue to 
achieve blocking.
+        BlockQueueSharedPtr tmp(new 
RecordBatchQueue(config::max_memory_sink_batch_count * 2));
         _fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp));
         *queue = tmp;
     }
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index d56aa49b19b..e6fdfaa8765 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -802,6 +802,11 @@ void BaseBackendService::submit_routine_load_task(TStatus& 
t_status,
 void BaseBackendService::open_scanner(TScanOpenResult& result_, const 
TScanOpenParams& params) {
     TStatus t_status;
     TUniqueId fragment_instance_id = generate_uuid();
+    // A query_id is randomly generated to replace t_query_plan_info.query_id.
+    // external query does not need to report anything to FE, so the query_id 
can be changed.
+    // Otherwise, multiple independent concurrent open tablet scanners have 
the same query_id.
+    // when one of the scanners ends, the other scanners will be canceled 
through FragmentMgr.cancel(query_id).
+    TUniqueId query_id = generate_uuid();
     std::shared_ptr<ScanContext> p_context;
     
static_cast<void>(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context));
     p_context->fragment_instance_id = fragment_instance_id;
@@ -838,13 +843,18 @@ void BaseBackendService::open_scanner(TScanOpenResult& 
result_, const TScanOpenP
                 << " deserialize error, should not be modified after returned 
Doris FE processed";
             exec_st = Status::InvalidArgument(msg.str());
         }
-        p_context->query_id = t_query_plan_info.query_id;
+        p_context->query_id = query_id;
     }
     std::vector<TScanColumnDesc> selected_columns;
     if (exec_st.ok()) {
         // start the scan procedure
+        LOG(INFO) << fmt::format(
+                "exec external scanner, old_query_id = {}, new_query_id = {}, 
fragment_instance_id "
+                "= {}",
+                print_id(t_query_plan_info.query_id), print_id(query_id),
+                print_id(fragment_instance_id));
         exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment(
-                params, t_query_plan_info, fragment_instance_id, 
&selected_columns);
+                params, t_query_plan_info, query_id, fragment_instance_id, 
&selected_columns);
     }
     exec_st.to_thrift(&t_status);
     //return status


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to