This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fcf5c964234 branch-3.0: [fix](group commit) fix group_commit get_block 
too slow (#51358) (#51375)
fcf5c964234 is described below

commit fcf5c96423459725d1abf0ef7f5891d9cb08e7f6
Author: meiyi <[email protected]>
AuthorDate: Fri May 30 16:09:13 2025 +0800

    branch-3.0: [fix](group commit) fix group_commit get_block too slow 
(#51358) (#51375)
    
    pick https://github.com/apache/doris/pull/51358
---
 be/src/pipeline/dependency.cpp                     |   2 +-
 be/src/pipeline/dependency.h                       |   9 +-
 .../pipeline/exec/group_commit_scan_operator.cpp   |   8 +-
 be/src/pipeline/exec/group_commit_scan_operator.h  |   3 +-
 be/src/runtime/group_commit_mgr.cpp                | 103 +++++++++------------
 be/src/runtime/group_commit_mgr.h                  |   5 +-
 .../plans/commands/insert/OlapInsertExecutor.java  |   5 +
 .../group_commit/test_group_commit_error.groovy    |  12 +++
 8 files changed, 76 insertions(+), 71 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index ffba01b05b2..b15005e03a8 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -161,7 +161,7 @@ void RuntimeFilterTimerQueue::start() {
                 if (it.use_count() == 1) {
                     // `use_count == 1` means this runtime filter has been 
released
                 } else if (it->should_be_check_timeout()) {
-                    if (it->_parent->is_blocked_by(nullptr)) {
+                    if (it->force_wait_timeout() || 
it->_parent->is_blocked_by(nullptr)) {
                         // This means runtime filter is not ready, so we call 
timeout or continue to poll this timer.
                         int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
                         if (ms_since_registration > it->wait_time_ms()) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 496f11f7877..ea6cacf51b1 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -207,10 +207,11 @@ struct RuntimeFilterTimerQueue;
 class RuntimeFilterTimer {
 public:
     RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
-                       std::shared_ptr<Dependency> parent)
+                       std::shared_ptr<Dependency> parent, bool 
force_wait_timeout = false)
             : _parent(std::move(parent)),
               _registration_time(registration_time),
-              _wait_time_ms(wait_time_ms) {}
+              _wait_time_ms(wait_time_ms),
+              _force_wait_timeout(force_wait_timeout) {}
 
     // Called by runtime filter producer.
     void call_ready();
@@ -228,6 +229,8 @@ public:
 
     bool should_be_check_timeout();
 
+    bool force_wait_timeout() { return _force_wait_timeout; }
+
 private:
     friend struct RuntimeFilterTimerQueue;
     std::shared_ptr<Dependency> _parent = nullptr;
@@ -235,6 +238,8 @@ private:
     std::mutex _lock;
     int64_t _registration_time;
     const int32_t _wait_time_ms;
+    // true only for group_commit_scan_operator
+    bool _force_wait_timeout;
 };
 
 struct RuntimeFilterTimerQueue {
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp 
b/be/src/pipeline/exec/group_commit_scan_operator.cpp
index a3da7f9c3a0..b7dfff79536 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp
@@ -34,8 +34,7 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
     SCOPED_TIMER(local_state.exec_time_counter());
     bool find_node = false;
     RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, 
&find_node, eos,
-                                                            
local_state._get_block_dependency,
-                                                            
local_state._timer_dependency));
+                                                            
local_state._get_block_dependency));
     return Status::OK();
 }
 
@@ -45,16 +44,13 @@ Status GroupCommitLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     auto& p = _parent->cast<GroupCommitOperatorX>();
     _get_block_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                       
"GroupCommitGetBlockDependency", true);
-    _timer_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                                  
"GroupCommitTimerDependency", true);
     auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue(
             p._table_id, state->fragment_instance_id(), load_block_queue, 
_get_block_dependency);
     if (st.ok()) {
         DCHECK(load_block_queue != nullptr);
-        _timer_dependency->block();
         _runtime_filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
                 MonotonicMillis(), 
load_block_queue->get_group_commit_interval_ms(),
-                _timer_dependency);
+                _get_block_dependency, true);
         std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> timers;
         timers.push_back(_runtime_filter_timer);
         
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers));
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h 
b/be/src/pipeline/exec/group_commit_scan_operator.h
index bf627df64b0..be07645f57c 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.h
+++ b/be/src/pipeline/exec/group_commit_scan_operator.h
@@ -38,7 +38,7 @@ public:
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     std::shared_ptr<LoadBlockQueue> load_block_queue = nullptr;
     std::vector<Dependency*> dependencies() const override {
-        return {_scan_dependency.get(), _get_block_dependency.get(), 
_timer_dependency.get()};
+        return {_scan_dependency.get(), _get_block_dependency.get()};
     }
 
 private:
@@ -46,7 +46,6 @@ private:
     Status _process_conjuncts(RuntimeState* state) override;
 
     std::shared_ptr<Dependency> _get_block_dependency = nullptr;
-    std::shared_ptr<Dependency> _timer_dependency = nullptr;
     std::shared_ptr<pipeline::RuntimeFilterTimer> _runtime_filter_timer = 
nullptr;
 };
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 0f22dbf4573..70345acdfb7 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,6 +35,16 @@
 
 namespace doris {
 
+std::string LoadBlockQueue::_get_load_ids() {
+    std::stringstream ss;
+    ss << "[";
+    for (auto& id : _load_ids_to_write_dep) {
+        ss << id.first.to_string() << ", ";
+    }
+    ss << "]";
+    return ss.str();
+}
+
 Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
                                  std::shared_ptr<vectorized::Block> block, 
bool write_wal,
                                  UniqueId& load_id) {
@@ -53,23 +63,12 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state,
             _data_bytes += block->bytes();
             int before_block_queues_bytes = _all_block_queues_bytes->load();
             _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
-            std::stringstream ss;
-            ss << "[";
-            for (const auto& id : _load_ids_to_write_dep) {
-                ss << id.first.to_string() << ", ";
-            }
-            ss << "]";
             VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
-                       << "block queue size is " << _block_queue.size() << ", 
block rows is "
-                       << block->rows() << ", block bytes is " << 
block->bytes()
-                       << ", before add block, all block queues bytes is "
-                       << before_block_queues_bytes
-                       << ", after add block, all block queues bytes is "
-                       << _all_block_queues_bytes->load() << ", txn_id=" << 
txn_id
-                       << ", label=" << label << ", instance_id=" << 
load_instance_id
-                       << ", load_ids=" << ss.str() << ", runtime_state=" << 
runtime_state
-                       << ", the block is " << block->dump_data() << ", the 
block column size is "
-                       << block->columns_bytes();
+                       << "Cur block rows=" << block->rows() << ", bytes=" << 
block->bytes()
+                       << ". all block queues bytes from " << 
before_block_queues_bytes << " to  "
+                       << _all_block_queues_bytes->load() << ", queue size=" 
<< _block_queue.size()
+                       << ". txn_id=" << txn_id << ", label=" << label
+                       << ", instance_id=" << load_instance_id << ", 
load_ids=" << _get_load_ids();
         }
         if (write_wal || config::group_commit_wait_replay_wal_finish) {
             auto st = _v_wal_writer->write_wal(block.get());
@@ -83,6 +82,9 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
                     config::group_commit_queue_mem_limit) {
             DCHECK(_load_ids_to_write_dep.find(load_id) != 
_load_ids_to_write_dep.end());
             _load_ids_to_write_dep[load_id]->block();
+            VLOG_DEBUG << "block add_block for load_id=" << load_id
+                       << ", memory=" << 
_all_block_queues_bytes->load(std::memory_order_relaxed)
+                       << ". inner load_id=" << load_instance_id << ", label=" 
<< label;
         }
     }
     if (!_need_commit) {
@@ -102,14 +104,14 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state,
     }
     for (auto read_dep : _read_deps) {
         read_dep->set_ready();
+        VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id;
     }
     return Status::OK();
 }
 
 Status LoadBlockQueue::get_block(RuntimeState* runtime_state, 
vectorized::Block* block,
                                  bool* find_block, bool* eos,
-                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep,
-                                 std::shared_ptr<pipeline::Dependency> 
timer_dependency) {
+                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep) {
     *find_block = false;
     *eos = false;
     std::unique_lock l(mutex);
@@ -124,15 +126,6 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
     if (!_need_commit && duration >= _group_commit_interval_ms) {
         _need_commit = true;
     }
-    auto get_load_ids = [&]() {
-        std::stringstream ss;
-        ss << "[";
-        for (auto& id : _load_ids_to_write_dep) {
-            ss << id.first.to_string() << ", ";
-        }
-        ss << "]";
-        return ss.str();
-    };
     if (_block_queue.empty()) {
         if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
             auto last_print_duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -142,12 +135,13 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
                 _last_print_time = std::chrono::steady_clock::now();
                 LOG(INFO) << "find one group_commit need to commit, txn_id=" 
<< txn_id
                           << ", label=" << label << ", instance_id=" << 
load_instance_id
-                          << ", duration=" << duration << ", load_ids=" << 
get_load_ids();
+                          << ", duration=" << duration << ", load_ids=" << 
_get_load_ids();
             }
         }
+        VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", 
but queue is empty";
         if (!_need_commit) {
             get_block_dep->block();
-            VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
+            VLOG_DEBUG << "block get_block for inner load_id=" << 
load_instance_id;
         }
     } else {
         const BlockData block_data = _block_queue.front();
@@ -157,15 +151,11 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         int before_block_queues_bytes = _all_block_queues_bytes->load();
         _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
         VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
-                   << "block queue size is " << _block_queue.size() << ", 
block rows is "
-                   << block->rows() << ", block bytes is " << block->bytes()
-                   << ", before remove block, all block queues bytes is "
-                   << before_block_queues_bytes
-                   << ", after remove block, all block queues bytes is "
-                   << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
-                   << ", label=" << label << ", instance_id=" << 
load_instance_id
-                   << ", load_ids=" << get_load_ids() << ", the block is " << 
block->dump_data()
-                   << ", the block column size is " << block->columns_bytes();
+                   << "Cur block rows=" << block->rows() << ", bytes=" << 
block->bytes()
+                   << ". all block queues bytes from " << 
before_block_queues_bytes << " to  "
+                   << _all_block_queues_bytes->load() << ", queue size=" << 
_block_queue.size()
+                   << ". txn_id=" << txn_id << ", label=" << label
+                   << ", instance_id=" << load_instance_id << ", load_ids=" << 
_get_load_ids();
     }
     if (_block_queue.empty() && _need_commit && 
_load_ids_to_write_dep.empty()) {
         *eos = true;
@@ -177,6 +167,8 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         for (auto& id : _load_ids_to_write_dep) {
             id.second->set_ready();
         }
+        VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
+                   << ". inner load_id=" << load_instance_id << ", label=" << 
label;
     }
     return Status::OK();
 }
@@ -189,6 +181,7 @@ Status LoadBlockQueue::remove_load_id(const UniqueId& 
load_id) {
         for (auto read_dep : _read_deps) {
             read_dep->set_ready();
         }
+        VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner 
load_id=" << load_instance_id;
         return Status::OK();
     }
     return Status::NotFound<false>("load_id=" + load_id.to_string() +
@@ -223,35 +216,25 @@ void LoadBlockQueue::_cancel_without_lock(const Status& 
st) {
               << ", status=" << st.to_string();
     status =
             Status::Cancelled("cancel group_commit, label=" + label + ", 
status=" + st.to_string());
+    int before_block_queues_bytes = _all_block_queues_bytes->load();
     while (!_block_queue.empty()) {
         const BlockData& block_data = _block_queue.front().block;
-        int before_block_queues_bytes = _all_block_queues_bytes->load();
         _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
-        std::stringstream ss;
-        ss << "[";
-        for (const auto& id : _load_ids_to_write_dep) {
-            ss << id.first.to_string() << ", ";
-        }
-        ss << "]";
-        VLOG_DEBUG << "[Group Commit Debug] 
(LoadBlockQueue::_cancel_without_block). "
-                   << "block queue size is " << _block_queue.size() << ", 
block rows is "
-                   << block_data.block->rows() << ", block bytes is " << 
block_data.block->bytes()
-                   << ", before remove block, all block queues bytes is "
-                   << before_block_queues_bytes
-                   << ", after remove block, all block queues bytes is "
-                   << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
-                   << ", label=" << label << ", instance_id=" << 
load_instance_id
-                   << ", load_ids=" << ss.str() << ", the block is "
-                   << block_data.block->dump_data() << ", the block column 
size is "
-                   << block_data.block->columns_bytes();
         _block_queue.pop_front();
     }
+    VLOG_DEBUG << "[Group Commit Debug] 
(LoadBlockQueue::_cancel_without_block). "
+               << "all block queues bytes from " << before_block_queues_bytes 
<< " to "
+               << _all_block_queues_bytes->load() << ", queue size=" << 
_block_queue.size()
+               << ", txn_id=" << txn_id << ", label=" << label
+               << ", instance_id=" << load_instance_id << ", load_ids=" << 
_get_load_ids();
     for (auto& id : _load_ids_to_write_dep) {
         id.second->set_always_ready();
     }
     for (auto read_dep : _read_deps) {
         read_dep->set_ready();
     }
+    VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
+               << ", inner load_id=" << load_instance_id;
 }
 
 Status GroupCommitTable::get_first_block_load_queue(
@@ -285,7 +268,7 @@ Status GroupCommitTable::get_first_block_load_queue(
             }
         }
         return Status::InternalError<false>("can not get a block queue for 
table_id: " +
-                                            std::to_string(_table_id));
+                                            std::to_string(_table_id) + 
_create_plan_failed_reason);
     };
 
     if (try_to_get_matched_queue().ok()) {
@@ -308,7 +291,11 @@ Status GroupCommitTable::get_first_block_load_queue(
                     }};
                     auto st = _create_group_commit_load(be_exe_version, 
mem_tracker);
                     if (!st.ok()) {
-                        LOG(WARNING) << "create group commit load error, st=" 
<< st.to_string();
+                        LOG(WARNING) << "create group commit load error: " << 
st.to_string();
+                        _create_plan_failed_reason = ". create group commit 
load error: " +
+                                                     st.to_string().substr(0, 
300);
+                    } else {
+                        _create_plan_failed_reason = "";
                     }
                 }));
     }
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 2be17400026..dbaade783f4 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -74,8 +74,7 @@ public:
     Status add_block(RuntimeState* runtime_state, 
std::shared_ptr<vectorized::Block> block,
                      bool write_wal, UniqueId& load_id);
     Status get_block(RuntimeState* runtime_state, vectorized::Block* block, 
bool* find_block,
-                     bool* eos, std::shared_ptr<pipeline::Dependency> 
get_block_dep,
-                     std::shared_ptr<pipeline::Dependency> timer_dependency);
+                     bool* eos, std::shared_ptr<pipeline::Dependency> 
get_block_dep);
     bool contain_load_id(const UniqueId& load_id);
     Status add_load_id(const UniqueId& load_id,
                        const std::shared_ptr<pipeline::Dependency> 
put_block_dep);
@@ -124,6 +123,7 @@ public:
 
 private:
     void _cancel_without_lock(const Status& st);
+    std::string _get_load_ids();
 
     // the set of load ids of all blocks in this queue
     std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> 
_load_ids_to_write_dep;
@@ -196,6 +196,7 @@ private:
                        std::tuple<std::shared_ptr<pipeline::Dependency>,
                                   std::shared_ptr<pipeline::Dependency>, 
int64_t, int64_t>>
             _create_plan_deps;
+    std::string _create_plan_failed_reason;
 };
 
 class GroupCommitMgr {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 2349b5ccf44..bc4e4774d8e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -51,6 +52,7 @@ import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TOlapTableLocationParam;
 import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
@@ -90,6 +92,9 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
             return;
         }
         try {
+            if 
(DebugPointUtil.isEnable("OlapInsertExecutor.beginTransaction.failed")) {
+                throw new BeginTransactionException("current running txns on 
db is larger than limit");
+            }
             this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
                     database.getId(), ImmutableList.of(table.getId()), 
labelName,
                     new TxnCoordinator(TxnSourceType.FE, 0,
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index 6e9a89aa0f7..cef9bbdbf27 100644
--- 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -31,6 +31,18 @@ suite("test_group_commit_error", "nonConcurrent") {
 
     GetDebugPoint().clearDebugPointsForAllBEs()
     GetDebugPoint().clearDebugPointsForAllFEs()
+    try {
+        
GetDebugPoint().enableDebugPointForAllFEs("OlapInsertExecutor.beginTransaction.failed")
+        sql """ set group_commit = async_mode """
+        sql """ insert into ${tableName} values (1, 1) """
+        assertTrue(false)
+    } catch (Exception e) {
+        logger.info("failed: " + e.getMessage())
+        assertTrue(e.getMessage().contains("begin transaction failed"))
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+    }
+
     try {
         
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr.exec_plan_fragment.failed")
         sql """ set group_commit = async_mode """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to