This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fcf5c964234 branch-3.0: [fix](group commit) fix group_commit get_block
too slow (#51358) (#51375)
fcf5c964234 is described below
commit fcf5c96423459725d1abf0ef7f5891d9cb08e7f6
Author: meiyi <[email protected]>
AuthorDate: Fri May 30 16:09:13 2025 +0800
branch-3.0: [fix](group commit) fix group_commit get_block too slow
(#51358) (#51375)
pick https://github.com/apache/doris/pull/51358
---
be/src/pipeline/dependency.cpp | 2 +-
be/src/pipeline/dependency.h | 9 +-
.../pipeline/exec/group_commit_scan_operator.cpp | 8 +-
be/src/pipeline/exec/group_commit_scan_operator.h | 3 +-
be/src/runtime/group_commit_mgr.cpp | 103 +++++++++------------
be/src/runtime/group_commit_mgr.h | 5 +-
.../plans/commands/insert/OlapInsertExecutor.java | 5 +
.../group_commit/test_group_commit_error.groovy | 12 +++
8 files changed, 76 insertions(+), 71 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index ffba01b05b2..b15005e03a8 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -161,7 +161,7 @@ void RuntimeFilterTimerQueue::start() {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been
released
} else if (it->should_be_check_timeout()) {
- if (it->_parent->is_blocked_by(nullptr)) {
+ if (it->force_wait_timeout() ||
it->_parent->is_blocked_by(nullptr)) {
// This means runtime filter is not ready, so we call
timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 496f11f7877..ea6cacf51b1 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -207,10 +207,11 @@ struct RuntimeFilterTimerQueue;
class RuntimeFilterTimer {
public:
RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
- std::shared_ptr<Dependency> parent)
+ std::shared_ptr<Dependency> parent, bool
force_wait_timeout = false)
: _parent(std::move(parent)),
_registration_time(registration_time),
- _wait_time_ms(wait_time_ms) {}
+ _wait_time_ms(wait_time_ms),
+ _force_wait_timeout(force_wait_timeout) {}
// Called by runtime filter producer.
void call_ready();
@@ -228,6 +229,8 @@ public:
bool should_be_check_timeout();
+ bool force_wait_timeout() { return _force_wait_timeout; }
+
private:
friend struct RuntimeFilterTimerQueue;
std::shared_ptr<Dependency> _parent = nullptr;
@@ -235,6 +238,8 @@ private:
std::mutex _lock;
int64_t _registration_time;
const int32_t _wait_time_ms;
+ // true only for group_commit_scan_operator
+ bool _force_wait_timeout;
};
struct RuntimeFilterTimerQueue {
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp
b/be/src/pipeline/exec/group_commit_scan_operator.cpp
index a3da7f9c3a0..b7dfff79536 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp
@@ -34,8 +34,7 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
SCOPED_TIMER(local_state.exec_time_counter());
bool find_node = false;
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block,
&find_node, eos,
-
local_state._get_block_dependency,
-
local_state._timer_dependency));
+
local_state._get_block_dependency));
return Status::OK();
}
@@ -45,16 +44,13 @@ Status GroupCommitLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
auto& p = _parent->cast<GroupCommitOperatorX>();
_get_block_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"GroupCommitGetBlockDependency", true);
- _timer_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
-
"GroupCommitTimerDependency", true);
auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue(
p._table_id, state->fragment_instance_id(), load_block_queue,
_get_block_dependency);
if (st.ok()) {
DCHECK(load_block_queue != nullptr);
- _timer_dependency->block();
_runtime_filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
MonotonicMillis(),
load_block_queue->get_group_commit_interval_ms(),
- _timer_dependency);
+ _get_block_dependency, true);
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> timers;
timers.push_back(_runtime_filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers));
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h
b/be/src/pipeline/exec/group_commit_scan_operator.h
index bf627df64b0..be07645f57c 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.h
+++ b/be/src/pipeline/exec/group_commit_scan_operator.h
@@ -38,7 +38,7 @@ public:
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::shared_ptr<LoadBlockQueue> load_block_queue = nullptr;
std::vector<Dependency*> dependencies() const override {
- return {_scan_dependency.get(), _get_block_dependency.get(),
_timer_dependency.get()};
+ return {_scan_dependency.get(), _get_block_dependency.get()};
}
private:
@@ -46,7 +46,6 @@ private:
Status _process_conjuncts(RuntimeState* state) override;
std::shared_ptr<Dependency> _get_block_dependency = nullptr;
- std::shared_ptr<Dependency> _timer_dependency = nullptr;
std::shared_ptr<pipeline::RuntimeFilterTimer> _runtime_filter_timer =
nullptr;
};
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 0f22dbf4573..70345acdfb7 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,6 +35,16 @@
namespace doris {
+std::string LoadBlockQueue::_get_load_ids() {
+ std::stringstream ss;
+ ss << "[";
+ for (auto& id : _load_ids_to_write_dep) {
+ ss << id.first.to_string() << ", ";
+ }
+ ss << "]";
+ return ss.str();
+}
+
Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
bool write_wal,
UniqueId& load_id) {
@@ -53,23 +63,12 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state,
_data_bytes += block->bytes();
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
- std::stringstream ss;
- ss << "[";
- for (const auto& id : _load_ids_to_write_dep) {
- ss << id.first.to_string() << ", ";
- }
- ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
- << "block queue size is " << _block_queue.size() << ",
block rows is "
- << block->rows() << ", block bytes is " <<
block->bytes()
- << ", before add block, all block queues bytes is "
- << before_block_queues_bytes
- << ", after add block, all block queues bytes is "
- << _all_block_queues_bytes->load() << ", txn_id=" <<
txn_id
- << ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", load_ids=" << ss.str() << ", runtime_state=" <<
runtime_state
- << ", the block is " << block->dump_data() << ", the
block column size is "
- << block->columns_bytes();
+ << "Cur block rows=" << block->rows() << ", bytes=" <<
block->bytes()
+ << ". all block queues bytes from " <<
before_block_queues_bytes << " to "
+ << _all_block_queues_bytes->load() << ", queue size="
<< _block_queue.size()
+ << ". txn_id=" << txn_id << ", label=" << label
+ << ", instance_id=" << load_instance_id << ",
load_ids=" << _get_load_ids();
}
if (write_wal || config::group_commit_wait_replay_wal_finish) {
auto st = _v_wal_writer->write_wal(block.get());
@@ -83,6 +82,9 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
config::group_commit_queue_mem_limit) {
DCHECK(_load_ids_to_write_dep.find(load_id) !=
_load_ids_to_write_dep.end());
_load_ids_to_write_dep[load_id]->block();
+ VLOG_DEBUG << "block add_block for load_id=" << load_id
+ << ", memory=" <<
_all_block_queues_bytes->load(std::memory_order_relaxed)
+ << ". inner load_id=" << load_instance_id << ", label="
<< label;
}
}
if (!_need_commit) {
@@ -102,14 +104,14 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state,
}
for (auto read_dep : _read_deps) {
read_dep->set_ready();
+ VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id;
}
return Status::OK();
}
Status LoadBlockQueue::get_block(RuntimeState* runtime_state,
vectorized::Block* block,
bool* find_block, bool* eos,
- std::shared_ptr<pipeline::Dependency>
get_block_dep,
- std::shared_ptr<pipeline::Dependency>
timer_dependency) {
+ std::shared_ptr<pipeline::Dependency>
get_block_dep) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
@@ -124,15 +126,6 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
if (!_need_commit && duration >= _group_commit_interval_ms) {
_need_commit = true;
}
- auto get_load_ids = [&]() {
- std::stringstream ss;
- ss << "[";
- for (auto& id : _load_ids_to_write_dep) {
- ss << id.first.to_string() << ", ";
- }
- ss << "]";
- return ss.str();
- };
if (_block_queue.empty()) {
if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
auto last_print_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -142,12 +135,13 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
_last_print_time = std::chrono::steady_clock::now();
LOG(INFO) << "find one group_commit need to commit, txn_id="
<< txn_id
<< ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", duration=" << duration << ", load_ids=" <<
get_load_ids();
+ << ", duration=" << duration << ", load_ids=" <<
_get_load_ids();
}
}
+ VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ",
but queue is empty";
if (!_need_commit) {
get_block_dep->block();
- VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
+ VLOG_DEBUG << "block get_block for inner load_id=" <<
load_instance_id;
}
} else {
const BlockData block_data = _block_queue.front();
@@ -157,15 +151,11 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
- << "block queue size is " << _block_queue.size() << ",
block rows is "
- << block->rows() << ", block bytes is " << block->bytes()
- << ", before remove block, all block queues bytes is "
- << before_block_queues_bytes
- << ", after remove block, all block queues bytes is "
- << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
- << ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", load_ids=" << get_load_ids() << ", the block is " <<
block->dump_data()
- << ", the block column size is " << block->columns_bytes();
+ << "Cur block rows=" << block->rows() << ", bytes=" <<
block->bytes()
+ << ". all block queues bytes from " <<
before_block_queues_bytes << " to "
+ << _all_block_queues_bytes->load() << ", queue size=" <<
_block_queue.size()
+ << ". txn_id=" << txn_id << ", label=" << label
+ << ", instance_id=" << load_instance_id << ", load_ids=" <<
_get_load_ids();
}
if (_block_queue.empty() && _need_commit &&
_load_ids_to_write_dep.empty()) {
*eos = true;
@@ -177,6 +167,8 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
for (auto& id : _load_ids_to_write_dep) {
id.second->set_ready();
}
+ VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
+ << ". inner load_id=" << load_instance_id << ", label=" <<
label;
}
return Status::OK();
}
@@ -189,6 +181,7 @@ Status LoadBlockQueue::remove_load_id(const UniqueId&
load_id) {
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
+ VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner
load_id=" << load_instance_id;
return Status::OK();
}
return Status::NotFound<false>("load_id=" + load_id.to_string() +
@@ -223,35 +216,25 @@ void LoadBlockQueue::_cancel_without_lock(const Status&
st) {
<< ", status=" << st.to_string();
status =
Status::Cancelled("cancel group_commit, label=" + label + ",
status=" + st.to_string());
+ int before_block_queues_bytes = _all_block_queues_bytes->load();
while (!_block_queue.empty()) {
const BlockData& block_data = _block_queue.front().block;
- int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
- std::stringstream ss;
- ss << "[";
- for (const auto& id : _load_ids_to_write_dep) {
- ss << id.first.to_string() << ", ";
- }
- ss << "]";
- VLOG_DEBUG << "[Group Commit Debug]
(LoadBlockQueue::_cancel_without_block). "
- << "block queue size is " << _block_queue.size() << ",
block rows is "
- << block_data.block->rows() << ", block bytes is " <<
block_data.block->bytes()
- << ", before remove block, all block queues bytes is "
- << before_block_queues_bytes
- << ", after remove block, all block queues bytes is "
- << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
- << ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", load_ids=" << ss.str() << ", the block is "
- << block_data.block->dump_data() << ", the block column
size is "
- << block_data.block->columns_bytes();
_block_queue.pop_front();
}
+ VLOG_DEBUG << "[Group Commit Debug]
(LoadBlockQueue::_cancel_without_block). "
+ << "all block queues bytes from " << before_block_queues_bytes
<< " to "
+ << _all_block_queues_bytes->load() << ", queue size=" <<
_block_queue.size()
+ << ", txn_id=" << txn_id << ", label=" << label
+ << ", instance_id=" << load_instance_id << ", load_ids=" <<
_get_load_ids();
for (auto& id : _load_ids_to_write_dep) {
id.second->set_always_ready();
}
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
+ VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
+ << ", inner load_id=" << load_instance_id;
}
Status GroupCommitTable::get_first_block_load_queue(
@@ -285,7 +268,7 @@ Status GroupCommitTable::get_first_block_load_queue(
}
}
return Status::InternalError<false>("can not get a block queue for
table_id: " +
- std::to_string(_table_id));
+ std::to_string(_table_id) +
_create_plan_failed_reason);
};
if (try_to_get_matched_queue().ok()) {
@@ -308,7 +291,11 @@ Status GroupCommitTable::get_first_block_load_queue(
}};
auto st = _create_group_commit_load(be_exe_version,
mem_tracker);
if (!st.ok()) {
- LOG(WARNING) << "create group commit load error, st="
<< st.to_string();
+ LOG(WARNING) << "create group commit load error: " <<
st.to_string();
+ _create_plan_failed_reason = ". create group commit
load error: " +
+ st.to_string().substr(0,
300);
+ } else {
+ _create_plan_failed_reason = "";
}
}));
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 2be17400026..dbaade783f4 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -74,8 +74,7 @@ public:
Status add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block,
- bool* eos, std::shared_ptr<pipeline::Dependency>
get_block_dep,
- std::shared_ptr<pipeline::Dependency> timer_dependency);
+ bool* eos, std::shared_ptr<pipeline::Dependency>
get_block_dep);
bool contain_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency>
put_block_dep);
@@ -124,6 +123,7 @@ public:
private:
void _cancel_without_lock(const Status& st);
+ std::string _get_load_ids();
// the set of load ids of all blocks in this queue
std::map<UniqueId, std::shared_ptr<pipeline::Dependency>>
_load_ids_to_write_dep;
@@ -196,6 +196,7 @@ private:
std::tuple<std::shared_ptr<pipeline::Dependency>,
std::shared_ptr<pipeline::Dependency>,
int64_t, int64_t>>
_create_plan_deps;
+ std::string _create_plan_failed_reason;
};
class GroupCommitMgr {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 2349b5ccf44..bc4e4774d8e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
@@ -51,6 +52,7 @@ import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
@@ -90,6 +92,9 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
return;
}
try {
+ if
(DebugPointUtil.isEnable("OlapInsertExecutor.beginTransaction.failed")) {
+ throw new BeginTransactionException("current running txns on
db is larger than limit");
+ }
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()),
labelName,
new TxnCoordinator(TxnSourceType.FE, 0,
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index 6e9a89aa0f7..cef9bbdbf27 100644
---
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -31,6 +31,18 @@ suite("test_group_commit_error", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllFEs("OlapInsertExecutor.beginTransaction.failed")
+ sql """ set group_commit = async_mode """
+ sql """ insert into ${tableName} values (1, 1) """
+ assertTrue(false)
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(e.getMessage().contains("begin transaction failed"))
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+
try {
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr.exec_plan_fragment.failed")
sql """ set group_commit = async_mode """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]