This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f8d1fa2be3532fd26af17a1c9b3fc9200cdcdd10
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Sun Apr 7 23:45:37 2024 +0800

    [chore](multi-table-load) add context info in log when using 
single-stream-multi-table load (#33317)
---
 be/src/io/fs/multi_table_pipe.cpp                  | 26 +++++++++++++---------
 .../routine_load/routine_load_task_executor.cpp    |  2 +-
 2 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 916f8151739..816561cfbc8 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -96,7 +96,7 @@ std::string MultiTablePipe::parse_dst_table(const char* data, 
size_t size) {
 Status MultiTablePipe::dispatch(const std::string& table, const char* data, 
size_t size,
                                 AppendFunc cb) {
     if (size == 0 || strlen(data) == 0) {
-        LOG(WARNING) << "empty data for table: " << table;
+        LOG(WARNING) << "empty data for table: " << table << ", ctx: " << 
_ctx->brief();
         return Status::InternalError("empty data");
     }
     KafkaConsumerPipePtr pipe = nullptr;
@@ -109,7 +109,7 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
         iter = _unplanned_pipes.find(table);
         if (iter == _unplanned_pipes.end()) {
             pipe = std::make_shared<io::KafkaConsumerPipe>();
-            LOG(INFO) << "create new unplanned pipe: " << pipe.get();
+            LOG(INFO) << "create new unplanned pipe: " << pipe.get() << ", 
ctx: " << _ctx->brief();
             _unplanned_pipes.emplace(table, pipe);
         } else {
             pipe = iter->second;
@@ -121,9 +121,11 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
         if (_unplanned_row_cnt >= _row_threshold ||
             _unplanned_pipes.size() >= _wait_tables_threshold) {
             LOG(INFO) << fmt::format(
-                    "unplanned row cnt={} reach row_threshold={} or 
wait_plan_table_threshold={}, "
-                    "plan them",
-                    _unplanned_row_cnt, _row_threshold, 
_wait_tables_threshold);
+                                 "unplanned row cnt={} reach row_threshold={} 
or "
+                                 "wait_plan_table_threshold={}, "
+                                 "plan them",
+                                 _unplanned_row_cnt, _row_threshold, 
_wait_tables_threshold)
+                      << ", ctx: " << _ctx->brief();
             Status st = request_and_exec_plans();
             _unplanned_row_cnt = 0;
             if (!st.ok()) {
@@ -206,7 +208,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
         _planned_pipes.emplace(pipe.first, pipe.second);
     }
     LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, 
returned plan cnt={}",
-                             _unplanned_pipes.size(), _planned_pipes.size(), 
params.size());
+                             _unplanned_pipes.size(), _planned_pipes.size(), 
params.size())
+              << ", ctx: " << _ctx->brief();
     _unplanned_pipes.clear();
 
     _inflight_cnt += params.size();
@@ -220,14 +223,16 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
             RETURN_IF_ERROR(
                     put_pipe(plan.params.fragment_instance_id, 
_planned_pipes[plan.table_name]));
             LOG(INFO) << "fragment_instance_id=" << 
print_id(plan.params.fragment_instance_id)
-                      << " table=" << plan.table_name;
+                      << " table=" << plan.table_name << ", ctx: " << 
_ctx->brief();
         } else if constexpr (std::is_same_v<ExecParam, 
TPipelineFragmentParams>) {
             auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
             RETURN_IF_ERROR(put_pipe(pipe_id, 
_planned_pipes[plan.table_name]));
-            LOG(INFO) << "pipe_id=" << pipe_id << "table=" << plan.table_name;
+            LOG(INFO) << "pipe_id=" << pipe_id << ", table=" << plan.table_name
+                      << ", ctx: " << _ctx->brief();
         } else {
             LOG(WARNING) << "illegal exec param type, need 
`TExecPlanFragmentParams` or "
-                            "`TPipelineFragmentParams`, will crash";
+                            "`TPipelineFragmentParams`, will crash"
+                         << ", ctx: " << _ctx->brief();
             CHECK(false);
         }
 
@@ -301,7 +306,8 @@ void MultiTablePipe::_handle_consumer_finished() {
     LOG(INFO) << "all plan for multi-table load complete. number_total_rows="
               << _ctx->number_total_rows << " number_loaded_rows=" << 
_ctx->number_loaded_rows
               << " number_filtered_rows=" << _ctx->number_filtered_rows
-              << " number_unselected_rows=" << _ctx->number_unselected_rows;
+              << " number_unselected_rows=" << _ctx->number_unselected_rows
+              << ", ctx: " << _ctx->brief();
     _ctx->promise.set_value(_status); // when all done, finish the routine 
load task
 }
 
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d22f2bb4a8c..845b01c3216 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -298,7 +298,7 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
     switch (ctx->load_src_type) {
     case TLoadSourceType::KAFKA: {
         if (ctx->is_multi_table) {
-            LOG(INFO) << "recv single-stream-multi-table request, ctx=" << 
ctx->brief();
+            LOG(INFO) << "recv single-stream-multi-table request, ctx: " << 
ctx->brief();
             pipe = std::make_shared<io::MultiTablePipe>(ctx);
         } else {
             pipe = std::make_shared<io::KafkaConsumerPipe>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to