This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ff1012da3e9 [fix](be) Fix time-sharing executor queued task count
(#63568)
ff1012da3e9 is described below
commit ff1012da3e9b708814898c458613c5a31acce415
Author: Raiden <[email protected]>
AuthorDate: Tue May 26 11:59:51 2026 +0800
[fix](be) Fix time-sharing executor queued task count (#63568)
### What problem does this PR solve?
Issue Number: N/A
Related PR: N/A
Problem Summary:
`TimeSharingTaskExecutor` uses `_total_queued_tasks` for queue-size
metrics and capacity checks. When queued splits were removed before
execution, for example when a task was cancelled or removed, the split
queue removed those splits but `_total_queued_tasks` was not
decremented.
After repeated removals, `_total_queued_tasks` could become larger than
the real queue size. This made the executor report a non-zero queue size
even when there were no active or queued splits, and later submissions
could be rejected as if the queue were full.
This PR keeps queue offer/remove operations consistent by updating
`_total_queued_tasks` together with the split queue and token state.
### Release note
Fix a bug where the time-sharing scan executor queue size could become
inaccurate after queued splits were removed before execution.
### Check List (For Author)
- Test
- [x] Regression test
- [x] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason
- Behavior changed:
- [ ] No.
- [x] Yes. Queued splits removed before execution now decrement executor
queued-task accounting, so queue-size metrics and capacity checks
reflect the real queued split count.
- Does this need documentation?
- [x] No.
- [ ] Yes.
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label
---
.../time_sharing/time_sharing_task_executor.cpp | 44 ++++++++++--
.../time_sharing/time_sharing_task_executor.h | 9 +++
.../time_sharing_task_executor_test.cpp | 78 +++++++++++++++++++++-
3 files changed, 123 insertions(+), 8 deletions(-)
diff --git
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index 32636a2cf8f..20217e6cc3c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -276,7 +276,7 @@ TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
}
{
std::unique_lock<std::mutex> l(_lock);
- _tokenless->_entries->remove_all(splits_to_destroy);
+ _remove_queued_splits_unlocked(splits_to_destroy);
}
}
@@ -421,7 +421,7 @@ Status
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
DCHECK(state == SplitThreadPoolToken::State::IDLE ||
state == SplitThreadPoolToken::State::RUNNING);
split->submit_time_watch().start();
- _tokenless->_entries->offer(std::move(split));
+ _offer_split_unlocked(std::move(split));
if (state == SplitThreadPoolToken::State::IDLE) {
_tokenless->transition(SplitThreadPoolToken::State::RUNNING);
}
@@ -433,8 +433,6 @@ Status
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
// 1. If it is a SERIAL token, and there are unsubmitted tasks, submit
them to the queue.
// 2. If it is a CONCURRENT token, and there are still unsubmitted
tasks, and the upper limit of concurrency is not reached,
// then submitted to the queue.
- _total_queued_tasks++;
-
// Wake up an idle thread for this task. Choosing the thread at the front
of
// the list ensures LIFO semantics as idling threads are also added to the
front.
//
@@ -570,7 +568,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
lock.unlock();
l.lock();
if (_tokenless->state() ==
SplitThreadPoolToken::State::RUNNING) {
- _tokenless->_entries->offer(split);
+ split->submit_time_watch().reset();
+ _offer_split_unlocked(split);
}
l.unlock();
} else {
@@ -586,7 +585,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
split->reset_level_priority();
std::unique_lock<std::mutex> l(_lock);
if (_tokenless->state() ==
SplitThreadPoolToken::State::RUNNING) {
- _tokenless->_entries->offer(split);
+ split->submit_time_watch().reset();
+ _offer_split_unlocked(split);
}
} else {
LOG(WARNING) << "blocked split is failed,
split_id: "
@@ -770,7 +770,7 @@ Status
TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_han
}
{
std::unique_lock<std::mutex> l(_lock);
- _tokenless->_entries->remove_all(splits_to_destroy);
+ _remove_queued_splits_unlocked(splits_to_destroy);
}
}
@@ -846,6 +846,36 @@ Status
TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> tas
return _do_submit(prioritized_split);
}
+void
TimeSharingTaskExecutor::_offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner>
split) {
+ _tokenless->_entries->offer(std::move(split));
+ ++_total_queued_tasks;
+}
+
+void TimeSharingTaskExecutor::_remove_queued_splits_unlocked(
+ const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits) {
+ if (splits.empty()) {
+ return;
+ }
+
+ const size_t queue_size_before = _tokenless->_entries->size();
+ _tokenless->_entries->remove_all(splits);
+ const size_t queue_size_after = _tokenless->_entries->size();
+ DCHECK_GE(queue_size_before, queue_size_after);
+
+ const auto removed = static_cast<int>(queue_size_before -
queue_size_after);
+ DCHECK_GE(_total_queued_tasks, removed);
+ _total_queued_tasks -= removed;
+
+ if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING &&
+ _tokenless->_active_threads == 0 && _tokenless->_entries->size() == 0)
{
+ _tokenless->transition(SplitThreadPoolToken::State::IDLE);
+ }
+
+ if (_total_queued_tasks == 0 && _active_threads == 0) {
+ _idle_cond.notify_all();
+ }
+}
+
void
TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner>
split,
const Status& status) {
_completed_splits_per_level[split->priority().level()]++;
diff --git
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
index ba38ddb04da..13a42a1385c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
@@ -294,6 +294,15 @@ private:
// // Submits a task to be run via token.
Status _do_submit(std::shared_ptr<PrioritizedSplitRunner> split);
+ // Offer a split to the executor queue and keep _total_queued_tasks
consistent.
+ // REQUIRES: _lock is held.
+ void _offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner> split);
+
+ // Remove queued splits and keep _total_queued_tasks/token state
consistent.
+ // REQUIRES: _lock is held.
+ void _remove_queued_splits_unlocked(
+ const std::vector<std::shared_ptr<PrioritizedSplitRunner>>&
splits);
+
//NOTE: not thread safe, caller should keep it thread-safe by using lock
Status _try_create_thread(int thread_num, std::lock_guard<std::mutex>&);
diff --git
a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
index b6f90daf79b..1636b04c425 100644
--- a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
+++ b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -314,13 +314,37 @@ private:
Status _status;
};
+class QueueOnlySplitRunner : public SplitRunner {
+public:
+ Status init() override { return Status::OK(); }
+
+ Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds)
override {
+ _started = true;
+ _finished = true;
+ return SharedListenableFuture<Void>::create_ready();
+ }
+
+ void close(const Status& status) override {}
+
+ bool is_finished() override { return _finished.load(); }
+
+ Status finished_status() override { return Status::OK(); }
+
+ std::string get_info() const override { return "queue_only_split"; }
+
+ bool is_started() const { return _started.load(); }
+
+private:
+ std::atomic<bool> _started {false};
+ std::atomic<bool> _finished {false};
+};
+
class TimeSharingTaskExecutorTest : public testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
-private:
template <typename Container>
void assert_split_states(int end_index, const Container& splits) {
for (int i = 0; i <= end_index; ++i) {
@@ -348,6 +372,58 @@ private:
}
};
+TEST_F(TimeSharingTaskExecutorTest, test_remove_task_clears_queued_task_count)
{
+ auto ticker = std::make_shared<TestingTicker>();
+
+ TimeSharingTaskExecutor::ThreadConfig thread_config;
+ thread_config.thread_name = "leak_repro";
+ thread_config.workload_group = "normal";
+ thread_config.max_thread_num = 0;
+ thread_config.min_thread_num = 0;
+ thread_config.max_queue_size = 2;
+ TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
+ ASSERT_TRUE(executor.init().ok());
+ ASSERT_TRUE(executor.start().ok());
+
+ try {
+ for (int i = 0; i < thread_config.max_queue_size; ++i) {
+ auto task_handle = TEST_TRY(executor.create_task(
+ TaskId("removed_task_" + std::to_string(i)), []() { return
0.0; }, 1,
+ std::chrono::milliseconds(1), std::optional<int>(1)));
+ auto split = std::make_shared<QueueOnlySplitRunner>();
+
+ auto enqueue_result = executor.enqueue_splits(task_handle, false,
{split});
+ ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+ EXPECT_EQ(executor.waiting_splits_size(), 1);
+
+ ASSERT_TRUE(executor.remove_task(task_handle).ok());
+ EXPECT_FALSE(split->is_started());
+ EXPECT_EQ(executor.waiting_splits_size(), 0);
+ EXPECT_EQ(executor.get_queue_size(), 0);
+ }
+
+ EXPECT_EQ(executor.num_active_threads(), 0);
+ EXPECT_EQ(executor.waiting_splits_size(), 0);
+ EXPECT_EQ(executor.get_queue_size(), 0);
+
+ auto task_handle = TEST_TRY(executor.create_task(
+ TaskId("next_task"), []() { return 0.0; }, 1,
std::chrono::milliseconds(1),
+ std::optional<int>(1)));
+ auto split = std::make_shared<QueueOnlySplitRunner>();
+
+ auto enqueue_result = executor.enqueue_splits(task_handle, false,
{split});
+ ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+ EXPECT_EQ(executor.waiting_splits_size(), 1);
+ EXPECT_EQ(executor.get_queue_size(), 1);
+
+ static_cast<void>(executor.remove_task(task_handle));
+ } catch (...) {
+ executor.stop();
+ throw;
+ }
+ executor.stop();
+}
+
TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
auto ticker = std::make_shared<TestingTicker>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]