This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aca8406e319 [refactor](executor)remove scan group #28847
aca8406e319 is described below

commit aca8406e319530030e065a34f19a496fab86e85b
Author: wangbo <wan...@apache.org>
AuthorDate: Fri Dec 22 17:05:50 2023 +0800

    [refactor](executor)remove scan group #28847
---
 be/src/common/config.cpp                   |   2 -
 be/src/common/config.h                     |   2 -
 be/src/runtime/task_group/task_group.cpp   |   5 -
 be/src/runtime/task_group/task_group.h     |   6 -
 be/src/vec/exec/scan/scan_task_queue.cpp   | 221 -----------------------------
 be/src/vec/exec/scan/scan_task_queue.h     |  99 -------------
 be/src/vec/exec/scan/scanner_context.cpp   |   4 -
 be/src/vec/exec/scan/scanner_context.h     |   1 -
 be/src/vec/exec/scan/scanner_scheduler.cpp |  44 ------
 be/src/vec/exec/scan/scanner_scheduler.h   |  13 +-
 10 files changed, 1 insertion(+), 396 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 03eaee7b23c..ecc44a08e47 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -970,8 +970,6 @@ DEFINE_Bool(enable_fuzzy_mode, "false");
 DEFINE_Bool(enable_debug_points, "false");
 
 DEFINE_Int32(pipeline_executor_size, "0");
-DEFINE_Bool(enable_workload_group_for_scan, "false");
-DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
 // 128 MB
 DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e011073d44d..a9508c6e8af 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1024,8 +1024,6 @@ DECLARE_Bool(enable_fuzzy_mode);
 DECLARE_Bool(enable_debug_points);
 
 DECLARE_Int32(pipeline_executor_size);
-DECLARE_Bool(enable_workload_group_for_scan);
-DECLARE_mInt64(workload_group_scan_task_wait_timeout_ms);
 
 // Temp config. True to use optimization for bitmap_index apply predicate 
except leaf node of the and node.
 // Will remove after fully test.
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index 137f5ea2345..9e86f8b831b 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -33,7 +33,6 @@
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "util/mem_info.h"
 #include "util/parse_util.h"
-#include "vec/exec/scan/scan_task_queue.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 
 namespace doris {
@@ -102,7 +101,6 @@ std::string TaskGroupEntity<QueueType>::debug_string() 
const {
 }
 
 template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
-template class TaskGroupEntity<ScanTaskQueue>;
 
 TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
         : _id(tg_info.id),
@@ -112,7 +110,6 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
           _enable_memory_overcommit(tg_info.enable_memory_overcommit),
           _cpu_share(tg_info.cpu_share),
           _task_entity(this, "pipeline task entity"),
-          _local_scan_entity(this, "local scan entity"),
           _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
           _cpu_hard_limit(tg_info.cpu_hard_limit) {}
 
@@ -150,8 +147,6 @@ void TaskGroup::check_and_update(const TaskGroupInfo& 
tg_info) {
     }
     
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
             tg_info, &_task_entity);
-    
ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share(
-            tg_info, &_local_scan_entity);
 }
 
 int64_t TaskGroup::memory_used() {
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index f1c8523664e..04dbf518f0d 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -43,7 +43,6 @@ namespace taskgroup {
 
 class TaskGroup;
 struct TaskGroupInfo;
-class ScanTaskQueue;
 
 template <typename QueueType>
 class TaskGroupEntity {
@@ -88,9 +87,6 @@ private:
 using TaskGroupPipelineTaskEntity = 
TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
 using TGPTEntityPtr = TaskGroupPipelineTaskEntity*;
 
-using TaskGroupScanTaskEntity = TaskGroupEntity<ScanTaskQueue>;
-using TGSTEntityPtr = TaskGroupScanTaskEntity*;
-
 struct TgTrackerLimiterGroup {
     std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
     std::mutex group_lock;
@@ -101,7 +97,6 @@ public:
     explicit TaskGroup(const TaskGroupInfo& tg_info);
 
     TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; }
-    TGSTEntityPtr local_scan_task_entity() { return &_local_scan_entity; }
 
     int64_t version() const { return _version; }
 
@@ -155,7 +150,6 @@ private:
     bool _enable_memory_overcommit;
     std::atomic<uint64_t> _cpu_share;
     TaskGroupPipelineTaskEntity _task_entity;
-    TaskGroupScanTaskEntity _local_scan_entity;
     std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
     std::atomic<int> _cpu_hard_limit;
 };
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp 
b/be/src/vec/exec/scan/scan_task_queue.cpp
deleted file mode 100644
index 7c2068e5715..00000000000
--- a/be/src/vec/exec/scan/scan_task_queue.cpp
+++ /dev/null
@@ -1,221 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "scan_task_queue.h"
-
-#include "pipeline/pipeline_task.h"
-#include "runtime/task_group/task_group.h"
-#include "vec/exec/scan/scanner_context.h"
-
-namespace doris {
-namespace taskgroup {
-static void empty_function() {}
-ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {}
-
-ScanTask::ScanTask(WorkFunction scan_func,
-                   std::shared_ptr<vectorized::ScannerContext> scanner_context,
-                   TGSTEntityPtr scan_entity, int priority)
-        : scan_func(std::move(scan_func)),
-          scanner_context(scanner_context),
-          scan_entity(scan_entity),
-          priority(priority) {}
-
-ScanTaskQueue::ScanTaskQueue() : 
_queue(config::doris_scanner_thread_pool_queue_size) {}
-
-Status ScanTaskQueue::try_push_back(ScanTask scan_task) {
-    if (_queue.try_put(std::move(scan_task))) {
-        VLOG_DEBUG << "try_push_back scan task " << 
scan_task.scanner_context->ctx_id << " "
-                   << scan_task.priority;
-        return Status::OK();
-    } else {
-        return Status::InternalError("failed to submit scan task to 
ScanTaskQueue");
-    }
-}
-bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) {
-    auto r = _queue.blocking_get(scan_task, timeout_ms);
-    if (r) {
-        VLOG_DEBUG << "try get scan task " << 
scan_task->scanner_context->ctx_id << " "
-                   << scan_task->priority;
-    }
-    return r;
-}
-
-ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : 
_core_size(core_size) {}
-ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default;
-
-void ScanTaskTaskGroupQueue::close() {
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    _closed = true;
-    _wait_task.notify_all();
-}
-
-bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    taskgroup::TGSTEntityPtr entity = nullptr;
-    while (entity == nullptr) {
-        if (_closed) {
-            return false;
-        }
-        if (_group_entities.empty()) {
-            _wait_task.wait_for(lock, std::chrono::milliseconds(
-                                              
config::workload_group_scan_task_wait_timeout_ms));
-        } else {
-            entity = _next_tg_entity();
-            if (!entity) {
-                _wait_task.wait_for(lock,
-                                    std::chrono::milliseconds(
-                                            
config::workload_group_scan_task_wait_timeout_ms));
-            }
-        }
-    }
-    DCHECK(entity->task_size() > 0);
-    if (entity->task_size() == 1) {
-        _dequeue_task_group(entity);
-    }
-    return entity->task_queue()->try_get(
-            scan_task, config::workload_group_scan_task_wait_timeout_ms /* 
timeout_ms */);
-}
-
-bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
-    auto* entity = 
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    auto status = entity->task_queue()->try_push_back(scan_task);
-    if (!status.ok()) {
-        LOG(WARNING) << "try_push_back scan task fail: " << status;
-        return false;
-    }
-    if (_group_entities.find(entity) == _group_entities.end()) {
-        _enqueue_task_group(entity);
-    }
-    _wait_task.notify_one();
-    return true;
-}
-
-void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t 
time_spent) {
-    auto* entity = scan_task.scan_entity;
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    auto find_entity = _group_entities.find(entity);
-    bool is_in_queue = find_entity != _group_entities.end();
-    VLOG_DEBUG << "scan task task group queue update_statistics " << 
entity->debug_string()
-               << ", in queue:" << is_in_queue << ", time_spent: " << 
time_spent;
-    if (is_in_queue) {
-        _group_entities.erase(entity);
-    }
-    entity->incr_runtime_ns(time_spent);
-    if (is_in_queue) {
-        _group_entities.emplace(entity);
-        _update_min_tg();
-    }
-}
-
-void ScanTaskTaskGroupQueue::update_tg_cpu_share(const 
taskgroup::TaskGroupInfo& task_group_info,
-                                                 taskgroup::TGSTEntityPtr 
entity) {
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
-    if (is_in_queue) {
-        _group_entities.erase(entity);
-        _total_cpu_share -= entity->cpu_share();
-    }
-    entity->check_and_update_cpu_share(task_group_info);
-    if (is_in_queue) {
-        _group_entities.emplace(entity);
-        _total_cpu_share += entity->cpu_share();
-    }
-}
-
-void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) {
-    _total_cpu_share += tg_entity->cpu_share();
-    // TODO llj tg If submitted back to this queue from the scanner thread, 
`adjust_vruntime_ns`
-    // should be avoided.
-    /**
-     * If a task group entity leaves task queue for a long time, its v runtime 
will be very
-     * small. This can cause it to preempt too many execution time. So, in 
order to avoid this
-     * situation, it is necessary to adjust the task group's v runtime.
-     * */
-    auto old_v_ns = tg_entity->vruntime_ns();
-    auto* min_entity = _min_tg_entity.load();
-    if (min_entity) {
-        auto min_tg_v = min_entity->vruntime_ns();
-        auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
-        uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : 
min_tg_v;
-        if (new_vruntime_ns > old_v_ns) {
-            VLOG_DEBUG << tg_entity->debug_string() << ", adjust to new " << 
new_vruntime_ns;
-            tg_entity->adjust_vruntime_ns(new_vruntime_ns);
-        }
-    } else if (old_v_ns < _min_tg_v_runtime_ns) {
-        VLOG_DEBUG << tg_entity->debug_string() << ", adjust to " << 
_min_tg_v_runtime_ns;
-        tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns);
-    }
-    _group_entities.emplace(tg_entity);
-    VLOG_DEBUG << "scan enqueue tg " << tg_entity->debug_string()
-               << ", group entity size: " << _group_entities.size();
-    _update_min_tg();
-}
-
-void ScanTaskTaskGroupQueue::_dequeue_task_group(TGSTEntityPtr tg_entity) {
-    _total_cpu_share -= tg_entity->cpu_share();
-    _group_entities.erase(tg_entity);
-    VLOG_DEBUG << "scan task group queue dequeue tg " << 
tg_entity->debug_string()
-               << ", group entity size: " << _group_entities.size();
-    _update_min_tg();
-}
-
-TGSTEntityPtr ScanTaskTaskGroupQueue::_next_tg_entity() {
-    taskgroup::TGSTEntityPtr res = nullptr;
-    for (auto* entity : _group_entities) {
-        res = entity;
-        break;
-    }
-    return res;
-}
-
-uint64_t ScanTaskTaskGroupQueue::_ideal_runtime_ns(TGSTEntityPtr tg_entity) 
const {
-    // Scan task does not have time slice, so we use pipeline task's instead.
-    return pipeline::PipelineTask::THREAD_TIME_SLICE * _core_size * 
tg_entity->cpu_share() /
-           _total_cpu_share;
-}
-
-void ScanTaskTaskGroupQueue::_update_min_tg() {
-    auto* min_entity = _next_tg_entity();
-    _min_tg_entity = min_entity;
-    if (min_entity) {
-        auto min_v_runtime = min_entity->vruntime_ns();
-        if (min_v_runtime > _min_tg_v_runtime_ns) {
-            _min_tg_v_runtime_ns = min_v_runtime;
-        }
-    }
-}
-
-bool ScanTaskTaskGroupQueue::TaskGroupSchedEntityComparator::operator()(
-        const taskgroup::TGSTEntityPtr& lhs_ptr, const 
taskgroup::TGSTEntityPtr& rhs_ptr) const {
-    auto lhs_val = lhs_ptr->vruntime_ns();
-    auto rhs_val = rhs_ptr->vruntime_ns();
-    if (lhs_val != rhs_val) {
-        return lhs_val < rhs_val;
-    } else {
-        auto l_share = lhs_ptr->cpu_share();
-        auto r_share = rhs_ptr->cpu_share();
-        if (l_share != r_share) {
-            return l_share < r_share;
-        } else {
-            return lhs_ptr->task_group_id() < rhs_ptr->task_group_id();
-        }
-    }
-}
-
-} // namespace taskgroup
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/scan_task_queue.h 
b/be/src/vec/exec/scan/scan_task_queue.h
deleted file mode 100644
index 18ef18872ed..00000000000
--- a/be/src/vec/exec/scan/scan_task_queue.h
+++ /dev/null
@@ -1,99 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include "olap/tablet.h"
-#include "runtime/task_group/task_group.h"
-#include "util/blocking_priority_queue.hpp"
-
-namespace doris {
-namespace vectorized {
-class ScannerContext;
-};
-
-namespace taskgroup {
-
-using WorkFunction = std::function<void()>;
-
-// Like PriorityThreadPool::Task
-struct ScanTask {
-    ScanTask();
-    ScanTask(WorkFunction scan_func, 
std::shared_ptr<vectorized::ScannerContext> scanner_context,
-             TGSTEntityPtr scan_entity, int priority);
-    bool operator<(const ScanTask& o) const { return priority < o.priority; }
-    ScanTask& operator++() {
-        priority += 2;
-        return *this;
-    }
-
-    WorkFunction scan_func;
-    std::shared_ptr<vectorized::ScannerContext> scanner_context = nullptr;
-    TGSTEntityPtr scan_entity;
-    int priority;
-};
-
-// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly?
-class ScanTaskQueue {
-public:
-    ScanTaskQueue();
-    Status try_push_back(ScanTask);
-    bool try_get(ScanTask* scan_task, uint32_t timeout_ms);
-    int size() { return _queue.get_size(); }
-
-private:
-    BlockingPriorityQueue<ScanTask> _queue;
-};
-
-// Like TaskGroupTaskQueue
-class ScanTaskTaskGroupQueue {
-public:
-    explicit ScanTaskTaskGroupQueue(size_t core_size);
-    ~ScanTaskTaskGroupQueue();
-
-    void close();
-    bool take(ScanTask* scan_task);
-    bool push_back(ScanTask);
-
-    void update_statistics(ScanTask task, int64_t time_spent);
-
-    void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, 
taskgroup::TGSTEntityPtr);
-
-private:
-    TGSTEntityPtr _task_entity(ScanTask& scan_task);
-    void _enqueue_task_group(TGSTEntityPtr);
-    void _dequeue_task_group(TGSTEntityPtr);
-    TGSTEntityPtr _next_tg_entity();
-    uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const;
-    void _update_min_tg();
-
-    // Like cfs rb tree in sched_entity
-    struct TaskGroupSchedEntityComparator {
-        bool operator()(const taskgroup::TGSTEntityPtr&, const 
taskgroup::TGSTEntityPtr&) const;
-    };
-    using ResouceGroupSet = std::set<taskgroup::TGSTEntityPtr, 
TaskGroupSchedEntityComparator>;
-    ResouceGroupSet _group_entities;
-    std::condition_variable _wait_task;
-    std::mutex _rs_mutex;
-    bool _closed = false;
-    int _total_cpu_share = 0;
-    std::atomic<taskgroup::TGSTEntityPtr> _min_tg_entity = nullptr;
-    uint64_t _min_tg_v_runtime_ns = 0;
-    size_t _core_size;
-};
-
-} // namespace taskgroup
-} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 99f645ca9e5..5ad2dbec5b6 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -601,10 +601,6 @@ void 
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
     }
 }
 
-taskgroup::TaskGroup* ScannerContext::get_task_group() const {
-    return _state->get_query_ctx()->get_task_group();
-}
-
 template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase* 
parent,
                                              RuntimeState* state);
 template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState* 
state);
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index a64b5444712..ba9c1fdee10 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -167,7 +167,6 @@ public:
         return blocks_num;
     }
 
-    taskgroup::TaskGroup* get_task_group() const;
     SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
     void reschedule_scanner_ctx();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 2e4db75a241..e8d7f8a7139 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -35,7 +35,6 @@
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
-#include "scan_task_queue.h"
 #include "util/async_io.h" // IWYU pragma: keep
 #include "util/blocking_queue.hpp"
 #include "util/cpu_info.h"
@@ -88,18 +87,15 @@ void ScannerScheduler::stop() {
 
     _is_closed = true;
 
-    _task_group_local_scan_queue->close();
     _scheduler_pool->shutdown();
     _local_scan_thread_pool->shutdown();
     _remote_scan_thread_pool->shutdown();
     _limited_scan_thread_pool->shutdown();
-    _group_local_scan_thread_pool->shutdown();
 
     _scheduler_pool->wait();
     _local_scan_thread_pool->join();
     _remote_scan_thread_pool->join();
     _limited_scan_thread_pool->wait();
-    _group_local_scan_thread_pool->wait();
 
     LOG(INFO) << "ScannerScheduler stopped";
 }
@@ -136,19 +132,6 @@ Status ScannerScheduler::init(ExecEnv* env) {
                               
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
                               
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
                               .build(&_limited_scan_thread_pool));
-
-    // 5. task group local scan
-    _task_group_local_scan_queue = 
std::make_unique<taskgroup::ScanTaskTaskGroupQueue>(
-            config::doris_scanner_thread_pool_thread_num);
-    static_cast<void>(ThreadPoolBuilder("local_scan_group")
-                              
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
-                              
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
-                              .build(&_group_local_scan_thread_pool));
-    for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
-        static_cast<void>(_group_local_scan_thread_pool->submit_func([this] {
-            this->_task_group_scanner_scan(this, 
_task_group_local_scan_queue.get());
-        }));
-    }
     _register_metrics();
     _is_init = true;
     return Status::OK();
@@ -251,13 +234,6 @@ void 
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
                     };
                     SimplifiedScanTask simple_scan_task = {work_func, ctx};
                     ret = 
scan_sche->get_scan_queue()->try_put(simple_scan_task);
-                } else if (ctx->get_task_group() && 
config::enable_workload_group_for_scan) {
-                    auto work_func = [this, scanner = *iter, ctx] {
-                        this->_scanner_scan(this, ctx, scanner);
-                    };
-                    taskgroup::ScanTask scan_task = {
-                            work_func, ctx, 
ctx->get_task_group()->local_scan_task_entity(), nice};
-                    ret = _task_group_local_scan_queue->push_back(scan_task);
                 } else {
                     PriorityThreadPool::Task task;
                     task.work_function = [this, scanner = *iter, ctx] {
@@ -427,22 +403,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
     ctx->push_back_scanner_and_reschedule(scanner);
 }
 
-void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
-                                                
taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
-    while (!_is_closed) {
-        taskgroup::ScanTask scan_task;
-        auto success = scan_queue->take(&scan_task);
-        if (success) {
-            int64_t time_spent = 0;
-            {
-                SCOPED_RAW_TIMER(&time_spent);
-                scan_task.scan_func();
-            }
-            scan_queue->update_statistics(scan_task, time_spent);
-        }
-    }
-}
-
 void ScannerScheduler::_register_metrics() {
     REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
                          [this]() { return 
_local_scan_thread_pool->get_queue_size(); });
@@ -456,10 +416,6 @@ void ScannerScheduler::_register_metrics() {
                          [this]() { return 
_limited_scan_thread_pool->get_queue_size(); });
     REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
                          [this]() { return 
_limited_scan_thread_pool->num_threads(); });
-    REGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size,
-                         [this]() { return 
_group_local_scan_thread_pool->get_queue_size(); })
-    REGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num,
-                         [this]() { return 
_group_local_scan_thread_pool->num_threads(); });
 }
 
 void ScannerScheduler::_deregister_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 91d341613df..eb4d1380e39 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -21,7 +21,6 @@
 #include <memory>
 
 #include "common/status.h"
-#include "scan_task_queue.h"
 #include "util/threadpool.h"
 #include "vec/exec/scan/vscanner.h"
 
@@ -31,9 +30,7 @@ class ExecEnv;
 namespace vectorized {
 class VScanner;
 } // namespace vectorized
-namespace taskgroup {
-class ScanTaskTaskGroupQueue;
-}
+
 template <typename T>
 class BlockingQueue;
 } // namespace doris
@@ -72,9 +69,6 @@ public:
 
     std::unique_ptr<ThreadPoolToken> 
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
                                                                  int 
max_concurrency);
-    taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() {
-        return _task_group_local_scan_queue.get();
-    }
 
     int remote_thread_pool_max_size() const { return 
_remote_thread_pool_max_size; }
 
@@ -87,8 +81,6 @@ private:
     void _scanner_scan(ScannerScheduler* scheduler, 
std::shared_ptr<ScannerContext> ctx,
                        VScannerSPtr scanner);
 
-    void _task_group_scanner_scan(ScannerScheduler* scheduler,
-                                  taskgroup::ScanTaskTaskGroupQueue* 
scan_queue);
     void _register_metrics();
 
     static void _deregister_metrics();
@@ -115,9 +107,6 @@ private:
     std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
     std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
-    std::unique_ptr<taskgroup::ScanTaskTaskGroupQueue> 
_task_group_local_scan_queue;
-    std::unique_ptr<ThreadPool> _group_local_scan_thread_pool;
-
     // true is the scheduler is closed.
     std::atomic_bool _is_closed = {false};
     bool _is_init = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to