github-actions[bot] commented on code in PR #25560: URL: https://github.com/apache/doris/pull/25560#discussion_r1363091465
########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -515,139 +523,15 @@ return Status::OK(); } -Status GroupCommitMgr::group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx) { - return _insert_into_thread_pool->submit_func([ctx, this] { - Status st = _group_commit_stream_load(ctx); - if (!st.ok()) { - ctx->promise.set_value(st); - } - }); -} - -Status GroupCommitMgr::_group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx) { - auto& fragment_params = ctx->put_result.params; - auto& tdesc_tbl = fragment_params.desc_tbl; - DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1); - DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size() == 1); - auto& tscan_range_params = fragment_params.params.per_node_scan_ranges.begin()->second.at(0); - auto& nodes = fragment_params.fragment.plan.nodes; - DCHECK(nodes.size() > 0); - auto& plan_node = nodes.at(0); - - std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks; - { - std::shared_ptr<LoadBlockQueue> load_block_queue; - // 1. FileScanNode consumes data from the pipe. - std::unique_ptr<RuntimeState> runtime_state = RuntimeState::create_unique(); - TUniqueId load_id; - load_id.hi = ctx->id.hi; - load_id.lo = ctx->id.lo; - TQueryOptions query_options; - query_options.query_type = TQueryType::LOAD; - TQueryGlobals query_globals; - static_cast<void>(runtime_state->init(load_id, query_options, query_globals, _exec_env)); - runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>( - MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", ctx->id.to_string()), -1)); - DescriptorTbl* desc_tbl = nullptr; - RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), tdesc_tbl, &desc_tbl)); - runtime_state->set_desc_tbl(desc_tbl); - auto file_scan_node = - vectorized::NewFileScanNode(runtime_state->obj_pool(), plan_node, *desc_tbl); - Status status = Status::OK(); - auto sink = stream_load::GroupCommitBlockSink( - runtime_state->obj_pool(), file_scan_node.row_desc(), - fragment_params.fragment.output_exprs, &status); - std::unique_ptr<int, std::function<void(int*)>> close_scan_node_func((int*)0x01, [&](int*) { - if (load_block_queue != nullptr) { - load_block_queue->remove_load_id(load_id); - } - static_cast<void>(file_scan_node.close(runtime_state.get())); - static_cast<void>(sink.close(runtime_state.get(), status)); - }); - RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get())); - RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get())); - std::vector<TScanRangeParams> params_vector; - params_vector.emplace_back(tscan_range_params); - file_scan_node.set_scan_ranges(params_vector); - RETURN_IF_ERROR(file_scan_node.open(runtime_state.get())); - - RETURN_IF_ERROR(status); - RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink)); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get())); - RETURN_IF_ERROR(sink.open(runtime_state.get())); - - // 2. Put the block into block queue. - std::unique_ptr<doris::vectorized::Block> _block = - doris::vectorized::Block::create_unique(); - bool first = true; - bool eof = false; - while (!eof) { - // TODO what to do if scan one block error - RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof)); - RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get())); - std::shared_ptr<doris::vectorized::FutureBlock> future_block = - std::make_shared<doris::vectorized::FutureBlock>(); - future_block->swap(*(_block.get())); - future_block->set_info(ctx->schema_version, load_id, first, eof); - // TODO what to do if add one block error - if (load_block_queue == nullptr) { - RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id, ctx->table_id, future_block, - load_block_queue)); - ctx->label = load_block_queue->label; - ctx->txn_id = load_block_queue->txn_id; - } - if (future_block->rows() > 0) { - future_blocks.emplace_back(future_block); - } - RETURN_IF_ERROR(load_block_queue->add_block(future_block)); - first = false; - } - ctx->number_unselected_rows = runtime_state->num_rows_load_unselected(); - ctx->number_filtered_rows = runtime_state->num_rows_load_filtered(); - ctx->error_url = runtime_state->get_error_log_file_path(); - if (!runtime_state->get_error_log_file_path().empty()) { - LOG(INFO) << "id=" << print_id(load_id) - << ", url=" << runtime_state->get_error_log_file_path() - << ", load rows=" << runtime_state->num_rows_load_total() - << ", filter rows=" << runtime_state->num_rows_load_filtered() - << ", unselect rows=" << runtime_state->num_rows_load_unselected() - << ", success rows=" << runtime_state->num_rows_load_success(); - } - } - - int64_t total_rows = 0; - int64_t loaded_rows = 0; - // 3. wait to wal - for (const auto& future_block : future_blocks) { - std::unique_lock<doris::Mutex> l(*(future_block->lock)); - if (!future_block->is_handled()) { - future_block->cv->wait(l); - } - // future_block->get_status() - total_rows += future_block->get_total_rows(); - loaded_rows += future_block->get_loaded_rows(); - } - ctx->number_total_rows = total_rows + ctx->number_unselected_rows + ctx->number_filtered_rows; - ctx->number_loaded_rows = loaded_rows; - ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows; - ctx->promise.set_value(Status::OK()); - VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string() - << ", total rows=" << ctx->number_total_rows - << ", loaded rows=" << ctx->number_loaded_rows - << ", filtered rows=" << ctx->number_filtered_rows - << ", unselected rows=" << ctx->number_unselected_rows; - return Status::OK(); -} - -Status GroupCommitMgr::_get_first_block_load_queue( +Status GroupCommitMgr::get_first_block_load_queue( Review Comment: warning: method 'get_first_block_load_queue' can be made static [readability-convert-member-functions-to-static] be/src/runtime/group_commit_mgr.h:126: ```diff - Status get_first_block_load_queue(int64_t db_id, int64_t table_id, + static Status get_first_block_load_queue(int64_t db_id, int64_t table_id, ``` ########## be/src/runtime/group_commit_mgr.cpp: ########## @@ -194,8 +197,8 @@ Status GroupCommitTable::_create_group_commit_load( std::regex reg("-"); std::string label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); std::stringstream ss; - ss << "insert into table_id(" << table_id << ") WITH LABEL " << label - << " select * from group_commit(\"table_id\"=\"" << table_id << "\")"; + ss << "insert into table_id(" << _table_id << ") WITH LABEL " << label + << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")"; Review Comment: warning: escaped string literal can be written as a raw string literal [modernize-raw-string-literal] ```suggestion << R"( select * from group_commit("table_id"=")" << _table_id << "\")"; ``` ########## be/src/vec/sink/group_commit_block_sink.cpp: ########## @@ -77,6 +80,27 @@ Status GroupCommitBlockSink::open(RuntimeState* state) { return vectorized::VExpr::open(_output_vexpr_ctxs, state); } +Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { Review Comment: warning: method 'close' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/group_commit_block_sink.h:47: ```diff - Status close(RuntimeState* state, Status close_status) override; + static Status close(RuntimeState* state, Status close_status) override; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org