liaoxin01 commented on code in PR #35458: URL: https://github.com/apache/doris/pull/35458#discussion_r1624004047
########## be/src/io/fs/multi_table_pipe.cpp: ########## @@ -151,112 +161,106 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size #ifndef BE_TEST Status MultiTablePipe::request_and_exec_plans() { - if (_unplanned_pipes.empty()) { + if (_unplanned_tables.empty()) { return Status::OK(); } - // get list of table names in unplanned pipes - std::vector<std::string> tables; fmt::memory_buffer log_buffer; log_buffer.clear(); - fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_pipes.size()); - for (auto& pair : _unplanned_pipes) { - tables.push_back(pair.first); + fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_tables.size()); + for (auto& pair : _unplanned_tables) { fmt::format_to(log_buffer, "{} ", pair.first); } fmt::format_to(log_buffer, "]"); LOG(INFO) << fmt::to_string(log_buffer); - TStreamLoadPutRequest request; - set_request_auth(&request, _ctx->auth); - request.db = _ctx->db; - request.table_names = tables; - request.__isset.table_names = true; - request.txnId = _ctx->txn_id; - request.formatType = _ctx->format; - request.__set_compress_type(_ctx->compress_type); - request.__set_header_type(_ctx->header_type); - request.__set_loadId(_ctx->id.to_thrift()); - request.fileType = TFileType::FILE_STREAM; - request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); - request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node); - request.__set_user(_ctx->qualified_user); - request.__set_cloud_cluster(_ctx->cloud_cluster); - // no need to register new_load_stream_mgr coz it is already done in routineload submit task - - // plan this load - ExecEnv* exec_env = doris::ExecEnv::GetInstance(); - TNetworkAddress master_addr = exec_env->master_info()->network_address; - int64_t stream_load_put_start_time = MonotonicNanos(); - RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( - master_addr.hostname, master_addr.port, - [&request, this](FrontendServiceConnection& client) { - client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request); - })); - _ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; - - Status plan_status(Status::create(_ctx->multi_table_put_result.status)); - if (!plan_status.ok()) { - LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief(); - return plan_status; - } - Status st; - if (_ctx->multi_table_put_result.__isset.params && - !_ctx->multi_table_put_result.__isset.pipeline_params) { - st = exec_plans(exec_env, _ctx->multi_table_put_result.params); - } else if (!_ctx->multi_table_put_result.__isset.params && - _ctx->multi_table_put_result.__isset.pipeline_params) { - st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params); - } else { - return Status::Aborted("too many or too few params are set in multi_table_put_result."); - } + for (auto& pair : _unplanned_tables) { + TStreamLoadPutRequest request; + set_request_auth(&request, _ctx->auth); + std::vector<std::string> tables; + tables.push_back(pair.first); + request.db = _ctx->db; + request.table_names = tables; + request.__isset.table_names = true; + request.txnId = _ctx->txn_id; + request.formatType = _ctx->format; + request.__set_compress_type(_ctx->compress_type); + request.__set_header_type(_ctx->header_type); + request.__set_loadId((pair.second->id).to_thrift()); + request.fileType = TFileType::FILE_STREAM; + request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); + request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node); + request.__set_user(_ctx->qualified_user); + request.__set_cloud_cluster(_ctx->cloud_cluster); + // no need to register new_load_stream_mgr coz it is already done in routineload submit task + + // plan this load + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + TNetworkAddress master_addr = exec_env->master_info()->network_address; + int64_t stream_load_put_start_time = MonotonicNanos(); + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( Review Comment: Can't retrieve execution plans for multiple tables in one RPC now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org