github-actions[bot] commented on code in PR #25560:
URL: https://github.com/apache/doris/pull/25560#discussion_r1363091465


##########
be/src/runtime/group_commit_mgr.cpp:
##########
@@ -515,139 +523,15 @@
     return Status::OK();
 }
 
-Status 
GroupCommitMgr::group_commit_stream_load(std::shared_ptr<StreamLoadContext> 
ctx) {
-    return _insert_into_thread_pool->submit_func([ctx, this] {
-        Status st = _group_commit_stream_load(ctx);
-        if (!st.ok()) {
-            ctx->promise.set_value(st);
-        }
-    });
-}
-
-Status 
GroupCommitMgr::_group_commit_stream_load(std::shared_ptr<StreamLoadContext> 
ctx) {
-    auto& fragment_params = ctx->put_result.params;
-    auto& tdesc_tbl = fragment_params.desc_tbl;
-    DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1);
-    DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size() 
== 1);
-    auto& tscan_range_params = 
fragment_params.params.per_node_scan_ranges.begin()->second.at(0);
-    auto& nodes = fragment_params.fragment.plan.nodes;
-    DCHECK(nodes.size() > 0);
-    auto& plan_node = nodes.at(0);
-
-    std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
-    {
-        std::shared_ptr<LoadBlockQueue> load_block_queue;
-        // 1. FileScanNode consumes data from the pipe.
-        std::unique_ptr<RuntimeState> runtime_state = 
RuntimeState::create_unique();
-        TUniqueId load_id;
-        load_id.hi = ctx->id.hi;
-        load_id.lo = ctx->id.lo;
-        TQueryOptions query_options;
-        query_options.query_type = TQueryType::LOAD;
-        TQueryGlobals query_globals;
-        static_cast<void>(runtime_state->init(load_id, query_options, 
query_globals, _exec_env));
-        
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
-                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", 
ctx->id.to_string()), -1));
-        DescriptorTbl* desc_tbl = nullptr;
-        RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), 
tdesc_tbl, &desc_tbl));
-        runtime_state->set_desc_tbl(desc_tbl);
-        auto file_scan_node =
-                vectorized::NewFileScanNode(runtime_state->obj_pool(), 
plan_node, *desc_tbl);
-        Status status = Status::OK();
-        auto sink = stream_load::GroupCommitBlockSink(
-                runtime_state->obj_pool(), file_scan_node.row_desc(),
-                fragment_params.fragment.output_exprs, &status);
-        std::unique_ptr<int, std::function<void(int*)>> 
close_scan_node_func((int*)0x01, [&](int*) {
-            if (load_block_queue != nullptr) {
-                load_block_queue->remove_load_id(load_id);
-            }
-            static_cast<void>(file_scan_node.close(runtime_state.get()));
-            static_cast<void>(sink.close(runtime_state.get(), status));
-        });
-        RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
-        RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
-        std::vector<TScanRangeParams> params_vector;
-        params_vector.emplace_back(tscan_range_params);
-        file_scan_node.set_scan_ranges(params_vector);
-        RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
-
-        RETURN_IF_ERROR(status);
-        RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink));
-        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get()));
-        RETURN_IF_ERROR(sink.open(runtime_state.get()));
-
-        // 2. Put the block into block queue.
-        std::unique_ptr<doris::vectorized::Block> _block =
-                doris::vectorized::Block::create_unique();
-        bool first = true;
-        bool eof = false;
-        while (!eof) {
-            // TODO what to do if scan one block error
-            RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), 
_block.get(), &eof));
-            RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get()));
-            std::shared_ptr<doris::vectorized::FutureBlock> future_block =
-                    std::make_shared<doris::vectorized::FutureBlock>();
-            future_block->swap(*(_block.get()));
-            future_block->set_info(ctx->schema_version, load_id, first, eof);
-            // TODO what to do if add one block error
-            if (load_block_queue == nullptr) {
-                RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id, 
ctx->table_id, future_block,
-                                                            load_block_queue));
-                ctx->label = load_block_queue->label;
-                ctx->txn_id = load_block_queue->txn_id;
-            }
-            if (future_block->rows() > 0) {
-                future_blocks.emplace_back(future_block);
-            }
-            RETURN_IF_ERROR(load_block_queue->add_block(future_block));
-            first = false;
-        }
-        ctx->number_unselected_rows = 
runtime_state->num_rows_load_unselected();
-        ctx->number_filtered_rows = runtime_state->num_rows_load_filtered();
-        ctx->error_url = runtime_state->get_error_log_file_path();
-        if (!runtime_state->get_error_log_file_path().empty()) {
-            LOG(INFO) << "id=" << print_id(load_id)
-                      << ", url=" << runtime_state->get_error_log_file_path()
-                      << ", load rows=" << runtime_state->num_rows_load_total()
-                      << ", filter rows=" << 
runtime_state->num_rows_load_filtered()
-                      << ", unselect rows=" << 
runtime_state->num_rows_load_unselected()
-                      << ", success rows=" << 
runtime_state->num_rows_load_success();
-        }
-    }
-
-    int64_t total_rows = 0;
-    int64_t loaded_rows = 0;
-    // 3. wait to wal
-    for (const auto& future_block : future_blocks) {
-        std::unique_lock<doris::Mutex> l(*(future_block->lock));
-        if (!future_block->is_handled()) {
-            future_block->cv->wait(l);
-        }
-        // future_block->get_status()
-        total_rows += future_block->get_total_rows();
-        loaded_rows += future_block->get_loaded_rows();
-    }
-    ctx->number_total_rows = total_rows + ctx->number_unselected_rows + 
ctx->number_filtered_rows;
-    ctx->number_loaded_rows = loaded_rows;
-    ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows;
-    ctx->promise.set_value(Status::OK());
-    VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string()
-               << ", total rows=" << ctx->number_total_rows
-               << ", loaded rows=" << ctx->number_loaded_rows
-               << ", filtered rows=" << ctx->number_filtered_rows
-               << ", unselected rows=" << ctx->number_unselected_rows;
-    return Status::OK();
-}
-
-Status GroupCommitMgr::_get_first_block_load_queue(
+Status GroupCommitMgr::get_first_block_load_queue(

Review Comment:
   warning: method 'get_first_block_load_queue' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/group_commit_mgr.h:126:
   ```diff
   -     Status get_first_block_load_queue(int64_t db_id, int64_t table_id,
   +     static Status get_first_block_load_queue(int64_t db_id, int64_t 
table_id,
   ```
   



##########
be/src/runtime/group_commit_mgr.cpp:
##########
@@ -194,8 +197,8 @@ Status GroupCommitTable::_create_group_commit_load(
     std::regex reg("-");
     std::string label = "group_commit_" + 
std::regex_replace(load_id.to_string(), reg, "_");
     std::stringstream ss;
-    ss << "insert into table_id(" << table_id << ") WITH LABEL " << label
-       << " select * from group_commit(\"table_id\"=\"" << table_id << "\")";
+    ss << "insert into table_id(" << _table_id << ") WITH LABEL " << label
+       << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")";

Review Comment:
   warning: escaped string literal can be written as a raw string literal 
[modernize-raw-string-literal]
   
   ```suggestion
          << R"( select * from group_commit("table_id"=")" << _table_id << 
"\")";
   ```
   



##########
be/src/vec/sink/group_commit_block_sink.cpp:
##########
@@ -77,6 +80,27 @@ Status GroupCommitBlockSink::open(RuntimeState* state) {
     return vectorized::VExpr::open(_output_vexpr_ctxs, state);
 }
 
+Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {

Review Comment:
   warning: method 'close' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/sink/group_commit_block_sink.h:47:
   ```diff
   -     Status close(RuntimeState* state, Status close_status) override;
   +     static Status close(RuntimeState* state, Status close_status) override;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to