This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 40867742116 [pick](branch-3.0) pick #42216 #42421 (#42542) 40867742116 is described below commit 408677421165ab66e7976790789eb257de57e490 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Oct 28 19:23:36 2024 +0800 [pick](branch-3.0) pick #42216 #42421 (#42542) pick #42216 #42421 --- .../pipeline/exec/memory_scratch_sink_operator.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 44 +++++++++++++--------- be/src/runtime/fragment_mgr.h | 1 + be/src/runtime/record_batch_queue.cpp | 10 ++++- be/src/runtime/result_queue_mgr.cpp | 6 ++- be/src/service/backend_service.cpp | 14 ++++++- 6 files changed, 52 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index 69e30791c13..131f3caf42c 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -100,7 +100,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(), &result, _timezone_obj)); local_state._queue->blocking_put(result); - if (local_state._queue->size() < 10) { + if (local_state._queue->size() > config::max_memory_sink_batch_count) { local_state._queue_dependency->block(); } return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7ba73442c90..e683b84e2b4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -299,6 +299,10 @@ Status FragmentMgr::trigger_pipeline_context_report( // including the final status when execution finishes. void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.status.ok() || req.done); // if !status.ok() => done + if (req.coord_addr.hostname == "external") { + // External query (flink/spark read tablets) not need to report to FE. + return; + } Status exec_status = req.status; Status coord_status; FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr, @@ -836,31 +840,33 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, query_ctx->set_merge_controller_handler(handler); } - for (const auto& local_param : params.local_params) { - const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; + { + // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. std::lock_guard<std::mutex> lock(_lock); - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { - return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", - params.fragment_id); + for (const auto& local_param : params.local_params) { + const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); + if (iter != _pipeline_map.end()) { + return Status::InternalError( + "exec_plan_fragment query_id({}) input duplicated fragment_id({})", + print_id(params.query_id), params.fragment_id); + } + query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } - query_ctx->fragment_instance_ids.push_back(fragment_instance_id); - } - if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { - query_ctx->set_ready_to_execute_only(); - } - - int64 now = duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - { + int64 now = duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); g_fragment_executing_count << 1; g_fragment_last_active_time.set_value(now); - std::lock_guard<std::mutex> lock(_lock); // TODO: simplify this mapping _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); } + + if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { + query_ctx->set_ready_to_execute_only(); + } + query_ctx->set_pipeline_context(params.fragment_id, context); RETURN_IF_ERROR(context->submit()); @@ -1070,6 +1076,7 @@ void FragmentMgr::debug(std::stringstream& ss) {} */ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, const TQueryPlanInfo& t_query_plan_info, + const TUniqueId& query_id, const TUniqueId& fragment_instance_id, std::vector<TScanColumnDesc>* selected_columns) { // set up desc tbl @@ -1110,8 +1117,9 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, // assign the param used for executing of PlanFragment-self TPipelineInstanceParams fragment_exec_params; - exec_fragment_params.query_id = t_query_plan_info.query_id; + exec_fragment_params.query_id = query_id; fragment_exec_params.fragment_instance_id = fragment_instance_id; + exec_fragment_params.coord.hostname = "external"; std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges; std::vector<TScanRangeParams> scan_ranges; std::vector<int64_t> tablet_ids = params.tablet_ids; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 41b63db0b23..20b2fd8cdc2 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -112,6 +112,7 @@ public: // execute external query, all query info are packed in TScanOpenParams Status exec_external_plan_fragment(const TScanOpenParams& params, const TQueryPlanInfo& t_query_plan_info, + const TUniqueId& query_id, const TUniqueId& fragment_instance_id, std::vector<TScanColumnDesc>* selected_columns); diff --git a/be/src/runtime/record_batch_queue.cpp b/be/src/runtime/record_batch_queue.cpp index 83982688880..25db550db3a 100644 --- a/be/src/runtime/record_batch_queue.cpp +++ b/be/src/runtime/record_batch_queue.cpp @@ -23,10 +23,16 @@ namespace doris { bool RecordBatchQueue::blocking_get(std::shared_ptr<arrow::RecordBatch>* result) { - auto res = _queue.blocking_get(result); - if (_dep && size() <= 10) { + if (_dep && size() <= config::max_memory_sink_batch_count) { _dep->set_ready(); } + // Before each get queue, will set sink task dependency ready. + // so if the sink task put queue faster than the fetch result get queue, + // the queue size will always be 10. + // be sure to set sink dependency ready before getting queue. + // otherwise, if queue is emptied after sink task put queue and before block dependency, + // get queue will stuck and will never set sink dependency ready. + auto res = _queue.blocking_get(result); return res; } diff --git a/be/src/runtime/result_queue_mgr.cpp b/be/src/runtime/result_queue_mgr.cpp index 8090a3e6ee4..8a6e5b10935 100644 --- a/be/src/runtime/result_queue_mgr.cpp +++ b/be/src/runtime/result_queue_mgr.cpp @@ -82,8 +82,10 @@ void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id, if (iter != _fragment_queue_map.end()) { *queue = iter->second; } else { - // the blocking queue size = 20 (default), in this way, one queue have 20 * 1024 rows at most - BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count)); + // max_elements will not take effect, because when queue size reaches max_memory_sink_batch_count, + // MemoryScratchSink will block queue dependency, in this way, one queue have 20 * 1024 rows at most. + // use MemoryScratchSink queue dependency instead of BlockingQueue to achieve blocking. + BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count * 2)); _fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp)); *queue = tmp; } diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index d56aa49b19b..e6fdfaa8765 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -802,6 +802,11 @@ void BaseBackendService::submit_routine_load_task(TStatus& t_status, void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) { TStatus t_status; TUniqueId fragment_instance_id = generate_uuid(); + // A query_id is randomly generated to replace t_query_plan_info.query_id. + // external query does not need to report anything to FE, so the query_id can be changed. + // Otherwise, multiple independent concurrent open tablet scanners have the same query_id. + // when one of the scanners ends, the other scanners will be canceled through FragmentMgr.cancel(query_id). + TUniqueId query_id = generate_uuid(); std::shared_ptr<ScanContext> p_context; static_cast<void>(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context)); p_context->fragment_instance_id = fragment_instance_id; @@ -838,13 +843,18 @@ void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenP << " deserialize error, should not be modified after returned Doris FE processed"; exec_st = Status::InvalidArgument(msg.str()); } - p_context->query_id = t_query_plan_info.query_id; + p_context->query_id = query_id; } std::vector<TScanColumnDesc> selected_columns; if (exec_st.ok()) { // start the scan procedure + LOG(INFO) << fmt::format( + "exec external scanner, old_query_id = {}, new_query_id = {}, fragment_instance_id " + "= {}", + print_id(t_query_plan_info.query_id), print_id(query_id), + print_id(fragment_instance_id)); exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment( - params, t_query_plan_info, fragment_instance_id, &selected_columns); + params, t_query_plan_info, query_id, fragment_instance_id, &selected_columns); } exec_st.to_thrift(&t_status); //return status --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org