This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
commit f8d1fa2be3532fd26af17a1c9b3fc9200cdcdd10 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Sun Apr 7 23:45:37 2024 +0800 [chore](multi-table-load) add context info in log when using single-stream-multi-table load (#33317) --- be/src/io/fs/multi_table_pipe.cpp | 26 +++++++++++++--------- .../routine_load/routine_load_task_executor.cpp | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 916f8151739..816561cfbc8 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -96,7 +96,7 @@ std::string MultiTablePipe::parse_dst_table(const char* data, size_t size) { Status MultiTablePipe::dispatch(const std::string& table, const char* data, size_t size, AppendFunc cb) { if (size == 0 || strlen(data) == 0) { - LOG(WARNING) << "empty data for table: " << table; + LOG(WARNING) << "empty data for table: " << table << ", ctx: " << _ctx->brief(); return Status::InternalError("empty data"); } KafkaConsumerPipePtr pipe = nullptr; @@ -109,7 +109,7 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size iter = _unplanned_pipes.find(table); if (iter == _unplanned_pipes.end()) { pipe = std::make_shared<io::KafkaConsumerPipe>(); - LOG(INFO) << "create new unplanned pipe: " << pipe.get(); + LOG(INFO) << "create new unplanned pipe: " << pipe.get() << ", ctx: " << _ctx->brief(); _unplanned_pipes.emplace(table, pipe); } else { pipe = iter->second; @@ -121,9 +121,11 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size if (_unplanned_row_cnt >= _row_threshold || _unplanned_pipes.size() >= _wait_tables_threshold) { LOG(INFO) << fmt::format( - "unplanned row cnt={} reach row_threshold={} or wait_plan_table_threshold={}, " - "plan them", - _unplanned_row_cnt, _row_threshold, _wait_tables_threshold); + "unplanned row cnt={} reach row_threshold={} or " + "wait_plan_table_threshold={}, " + "plan them", + _unplanned_row_cnt, _row_threshold, _wait_tables_threshold) + << ", ctx: " << _ctx->brief(); Status st = request_and_exec_plans(); _unplanned_row_cnt = 0; if (!st.ok()) { @@ -206,7 +208,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para _planned_pipes.emplace(pipe.first, pipe.second); } LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}", - _unplanned_pipes.size(), _planned_pipes.size(), params.size()); + _unplanned_pipes.size(), _planned_pipes.size(), params.size()) + << ", ctx: " << _ctx->brief(); _unplanned_pipes.clear(); _inflight_cnt += params.size(); @@ -220,14 +223,16 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para RETURN_IF_ERROR( put_pipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name])); LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id) - << " table=" << plan.table_name; + << " table=" << plan.table_name << ", ctx: " << _ctx->brief(); } else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) { auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id); RETURN_IF_ERROR(put_pipe(pipe_id, _planned_pipes[plan.table_name])); - LOG(INFO) << "pipe_id=" << pipe_id << "table=" << plan.table_name; + LOG(INFO) << "pipe_id=" << pipe_id << ", table=" << plan.table_name + << ", ctx: " << _ctx->brief(); } else { LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or " - "`TPipelineFragmentParams`, will crash"; + "`TPipelineFragmentParams`, will crash" + << ", ctx: " << _ctx->brief(); CHECK(false); } @@ -301,7 +306,8 @@ void MultiTablePipe::_handle_consumer_finished() { LOG(INFO) << "all plan for multi-table load complete. number_total_rows=" << _ctx->number_total_rows << " number_loaded_rows=" << _ctx->number_loaded_rows << " number_filtered_rows=" << _ctx->number_filtered_rows - << " number_unselected_rows=" << _ctx->number_unselected_rows; + << " number_unselected_rows=" << _ctx->number_unselected_rows + << ", ctx: " << _ctx->brief(); _ctx->promise.set_value(_status); // when all done, finish the routine load task } diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index d22f2bb4a8c..845b01c3216 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -298,7 +298,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, switch (ctx->load_src_type) { case TLoadSourceType::KAFKA: { if (ctx->is_multi_table) { - LOG(INFO) << "recv single-stream-multi-table request, ctx=" << ctx->brief(); + LOG(INFO) << "recv single-stream-multi-table request, ctx: " << ctx->brief(); pipe = std::make_shared<io::MultiTablePipe>(ctx); } else { pipe = std::make_shared<io::KafkaConsumerPipe>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org