liaoxin01 commented on code in PR #35458:
URL: https://github.com/apache/doris/pull/35458#discussion_r1624004047


##########
be/src/io/fs/multi_table_pipe.cpp:
##########
@@ -151,112 +161,106 @@ Status MultiTablePipe::dispatch(const std::string& 
table, const char* data, size
 
 #ifndef BE_TEST
 Status MultiTablePipe::request_and_exec_plans() {
-    if (_unplanned_pipes.empty()) {
+    if (_unplanned_tables.empty()) {
         return Status::OK();
     }
 
-    // get list of table names in unplanned pipes
-    std::vector<std::string> tables;
     fmt::memory_buffer log_buffer;
     log_buffer.clear();
-    fmt::format_to(log_buffer, "request plans for {} tables: [ ", 
_unplanned_pipes.size());
-    for (auto& pair : _unplanned_pipes) {
-        tables.push_back(pair.first);
+    fmt::format_to(log_buffer, "request plans for {} tables: [ ", 
_unplanned_tables.size());
+    for (auto& pair : _unplanned_tables) {
         fmt::format_to(log_buffer, "{} ", pair.first);
     }
     fmt::format_to(log_buffer, "]");
     LOG(INFO) << fmt::to_string(log_buffer);
 
-    TStreamLoadPutRequest request;
-    set_request_auth(&request, _ctx->auth);
-    request.db = _ctx->db;
-    request.table_names = tables;
-    request.__isset.table_names = true;
-    request.txnId = _ctx->txn_id;
-    request.formatType = _ctx->format;
-    request.__set_compress_type(_ctx->compress_type);
-    request.__set_header_type(_ctx->header_type);
-    request.__set_loadId(_ctx->id.to_thrift());
-    request.fileType = TFileType::FILE_STREAM;
-    request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
-    request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
-    request.__set_user(_ctx->qualified_user);
-    request.__set_cloud_cluster(_ctx->cloud_cluster);
-    // no need to register new_load_stream_mgr coz it is already done in 
routineload submit task
-
-    // plan this load
-    ExecEnv* exec_env = doris::ExecEnv::GetInstance();
-    TNetworkAddress master_addr = exec_env->master_info()->network_address;
-    int64_t stream_load_put_start_time = MonotonicNanos();
-    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
-            [&request, this](FrontendServiceConnection& client) {
-                client->streamLoadMultiTablePut(_ctx->multi_table_put_result, 
request);
-            }));
-    _ctx->stream_load_put_cost_nanos = MonotonicNanos() - 
stream_load_put_start_time;
-
-    Status plan_status(Status::create(_ctx->multi_table_put_result.status));
-    if (!plan_status.ok()) {
-        LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status 
<< _ctx->brief();
-        return plan_status;
-    }
-
     Status st;
-    if (_ctx->multi_table_put_result.__isset.params &&
-        !_ctx->multi_table_put_result.__isset.pipeline_params) {
-        st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
-    } else if (!_ctx->multi_table_put_result.__isset.params &&
-               _ctx->multi_table_put_result.__isset.pipeline_params) {
-        st = exec_plans(exec_env, 
_ctx->multi_table_put_result.pipeline_params);
-    } else {
-        return Status::Aborted("too many or too few params are set in 
multi_table_put_result.");
-    }
+    for (auto& pair : _unplanned_tables) {
+        TStreamLoadPutRequest request;
+        set_request_auth(&request, _ctx->auth);
+        std::vector<std::string> tables;
+        tables.push_back(pair.first);
+        request.db = _ctx->db;
+        request.table_names = tables;
+        request.__isset.table_names = true;
+        request.txnId = _ctx->txn_id;
+        request.formatType = _ctx->format;
+        request.__set_compress_type(_ctx->compress_type);
+        request.__set_header_type(_ctx->header_type);
+        request.__set_loadId((pair.second->id).to_thrift());
+        request.fileType = TFileType::FILE_STREAM;
+        request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
+        request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
+        request.__set_user(_ctx->qualified_user);
+        request.__set_cloud_cluster(_ctx->cloud_cluster);
+        // no need to register new_load_stream_mgr coz it is already done in 
routineload submit task
+
+        // plan this load
+        ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+        TNetworkAddress master_addr = exec_env->master_info()->network_address;
+        int64_t stream_load_put_start_time = MonotonicNanos();
+        RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(

Review Comment:
   Can't retrieve execution plans for multiple tables in one RPC now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to