This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 247d6fa62d9cce801bfedf157d13f9207dd0d15e Author: morningman <morning...@163.com> AuthorDate: Fri Jan 13 12:01:42 2023 +0800 [refactor](scan-pool) move scan pool from env to scanner scheduler (#15604) The origin scan pools are in exec_env. But after enable new_load_scan_node by default, the scan pool in exec_env is no longer used. All scan task will be submitted to the scan pool in scanner_scheduler. BTW, reorganize the scan pool into 3 kinds: local scan pool For olap scan node remote scan pool For file scan node limited scan pool For query which set cpu resource limit or with small limit clause TODO: Use bthread to unify all IO task. Some trivial issues: fix bug that the memtable flush size printed in log is not right Add RuntimeProfile param in VScanner --- be/src/exec/olap_scan_node.cpp | 4 +++- be/src/olap/memtable.cpp | 2 +- be/src/olap/memtable_flush_executor.cpp | 4 ++-- be/src/olap/rowset/beta_rowset_writer.cpp | 16 ++++++++-------- be/src/olap/rowset/beta_rowset_writer.h | 2 +- be/src/olap/rowset/rowset_writer.h | 2 +- be/src/runtime/fragment_mgr.cpp | 9 ++++++++- be/src/runtime/query_fragments_ctx.h | 3 ++- be/src/vec/exec/scan/new_es_scan_node.cpp | 5 +++-- be/src/vec/exec/scan/new_es_scanner.cpp | 4 ++-- be/src/vec/exec/scan/new_es_scanner.h | 3 ++- be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 5 +++-- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 4 ++-- be/src/vec/exec/scan/new_jdbc_scanner.h | 2 +- be/src/vec/exec/scan/new_odbc_scan_node.cpp | 3 ++- be/src/vec/exec/scan/new_odbc_scanner.cpp | 4 ++-- be/src/vec/exec/scan/new_odbc_scanner.h | 2 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 5 ++--- be/src/vec/exec/scan/new_olap_scanner.h | 1 - be/src/vec/exec/scan/scanner_scheduler.cpp | 11 +++++++++++ be/src/vec/exec/scan/scanner_scheduler.h | 6 ++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 3 +-- be/src/vec/exec/scan/vfile_scanner.h | 7 ++++--- be/src/vec/exec/scan/vscanner.cpp | 3 ++- be/src/vec/exec/scan/vscanner.h | 6 ++++-- be/test/runtime/test_env.cc | 2 -- 26 files changed, 74 insertions(+), 44 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 5bf924d1c1..35f3ef5023 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1508,7 +1508,9 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { * The larger the nice value, the more preferentially obtained query resources * 4. Regularly increase the priority of the remaining tasks in the queue to avoid starvation for large queries *********************************/ - ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token(); + // after merge #15604, we no long support thread token to non-vec olap scan node, + // so keep thread_token as null + ThreadPoolToken* thread_token = nullptr; PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); _total_assign_num = 0; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 46cc15275e..1f8465c0f3 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -485,7 +485,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) { } else { _collect_vskiplist_results<true>(); vectorized::Block block = _output_mutable_block.to_block(); - RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block)); + RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block, &_flush_size)); _flush_size = block.allocated_bytes(); } return Status::OK(); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index c2fe380b1b..5dafa36cc5 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -87,6 +87,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) { MonotonicStopWatch timer; timer.start(); + size_t memory_usage = memtable->memory_usage(); Status s = memtable->flush(); if (!s) { LOG(WARNING) << "Flush memtable failed with res = " << s; @@ -101,8 +102,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) { VLOG_CRITICAL << "flush memtable cost: " << timer.elapsed_time() << ", running count: " << _stats.flush_running_count << ", finish count: " << _stats.flush_finish_count - << ", mem size: " << memtable->memory_usage() - << ", disk size: " << memtable->flush_size(); + << ", mem size: " << memory_usage << ", disk size: " << memtable->flush_size(); _stats.flush_time_ns += timer.elapsed_time(); _stats.flush_finish_count++; _stats.flush_running_count--; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 42f57812dc..e64d4f97ee 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -689,7 +689,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus return Status::OK(); } -Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) { +Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, int64* flush_size) { if (block->rows() == 0) { return Status::OK(); } @@ -697,7 +697,7 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) { std::unique_ptr<segment_v2::SegmentWriter> writer; RETURN_NOT_OK(_create_segment_writer(&writer)); RETURN_NOT_OK(_add_block(block, &writer)); - RETURN_NOT_OK(_flush_segment_writer(&writer)); + RETURN_NOT_OK(_flush_segment_writer(&writer, flush_size)); return Status::OK(); } @@ -944,12 +944,12 @@ Status BetaRowsetWriter::_create_segment_writer( std::unique_ptr<segment_v2::SegmentWriter>* writer) { size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { - LOG(ERROR) << "too many segments in rowset." - << " tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id - << " max:" << config::max_segment_num_per_rowset - << " _num_segment:" << _num_segment - << " _segcompacted_point:" << _segcompacted_point - << " _num_segcompacted:" << _num_segcompacted; + LOG(WARNING) << "too many segments in rowset." + << " tablet_id:" << _context.tablet_id << " rowset_id:" << _context.rowset_id + << " max:" << config::max_segment_num_per_rowset + << " _num_segment:" << _num_segment + << " _segcompacted_point:" << _segcompacted_point + << " _num_segcompacted:" << _num_segcompacted; return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); } else { return _do_create_segment_writer(writer, false, -1, -1); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 2d37788930..754491bc82 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -56,7 +56,7 @@ public: // Return the file size flushed to disk in "flush_size" // This method is thread-safe. Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) override; - Status flush_single_memtable(const vectorized::Block* block) override; + Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size) override; RowsetSharedPtr build() override; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 26d933c8d0..dbf7776c8d 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -69,7 +69,7 @@ public: virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) { return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED); } - virtual Status flush_single_memtable(const vectorized::Block* block) { + virtual Status flush_single_memtable(const vectorized::Block* block, int64_t* flush_size) { return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5e0d4f4645..d7992aa9e2 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -737,11 +737,15 @@ void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, // the thread token will be set if // 1. the cpu_limit is set, or // 2. the limit is very small ( < 1024) + // If the token is set, the scan task will use limited_scan_pool in scanner scheduler. + // Otherwise, the scan task will use local/remote scan pool in scanner scheduler int concurrency = 1; bool is_serial = false; + bool need_token = false; if (params.query_options.__isset.resource_limit && params.query_options.resource_limit.__isset.cpu_limit) { concurrency = params.query_options.resource_limit.cpu_limit; + need_token = true; } else { concurrency = config::doris_scanner_thread_pool_thread_num; } @@ -759,11 +763,14 @@ void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, if (node.limit > 0 && node.limit < 1024) { concurrency = 1; is_serial = true; + need_token = true; break; } } } - fragments_ctx->set_thread_token(concurrency, is_serial); + if (need_token) { + fragments_ctx->set_thread_token(concurrency, is_serial); + } #endif } diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index f3565b9f30..543b9868b5 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -30,6 +30,7 @@ #include "runtime/memory/mem_tracker_limiter.h" #include "util/pretty_printer.h" #include "util/threadpool.h" +#include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/shared_hash_table_controller.h" namespace doris { @@ -76,7 +77,7 @@ public: } void set_thread_token(int concurrency, bool is_serial) { - _thread_token = _exec_env->limited_scan_thread_pool()->new_token( + _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token( is_serial ? ThreadPool::ExecutionMode::SERIAL : ThreadPool::ExecutionMode::CONCURRENT, concurrency); diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index 6a57e917b4..bd6f676fe5 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -201,8 +201,9 @@ Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) { properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build( properties, _column_names, _predicates, _docvalue_context, &doc_value_mode); - NewEsScanner* scanner = new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id, - properties, _docvalue_context, doc_value_mode); + NewEsScanner* scanner = + new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id, properties, + _docvalue_context, doc_value_mode, _state->runtime_profile()); _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index 358a226d1e..bae49801a0 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -26,8 +26,8 @@ namespace doris::vectorized { NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, TupleId tuple_id, const std::map<std::string, std::string>& properties, const std::map<std::string, std::string>& docvalue_context, - bool doc_value_mode) - : VScanner(state, static_cast<VScanNode*>(parent), limit), + bool doc_value_mode, RuntimeProfile* profile) + : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), _is_init(false), _es_eof(false), _properties(properties), diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index 4e82d72af9..4b97237340 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -30,7 +30,8 @@ class NewEsScanner : public VScanner { public: NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, TupleId tuple_id, const std::map<std::string, std::string>& properties, - const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode); + const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode, + RuntimeProfile* profile); Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index 955a33970d..eaa511f40d 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -50,8 +50,9 @@ Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) { if (_eos == true) { return Status::OK(); } - NewJdbcScanner* scanner = new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id, - _query_string, _table_type); + NewJdbcScanner* scanner = + new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id, _query_string, + _table_type, _state->runtime_profile()); _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); scanners->push_back(static_cast<VScanner*>(scanner)); diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index edfb843733..377ae6c800 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -20,8 +20,8 @@ namespace doris::vectorized { NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, const TupleId& tuple_id, const std::string& query_string, - TOdbcTableType::type table_type) - : VScanner(state, static_cast<VScanNode*>(parent), limit), + TOdbcTableType::type table_type, RuntimeProfile* profile) + : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), _is_init(false), _jdbc_eos(false), _tuple_id(tuple_id), diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index 9fa17c4116..e88b33d252 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -27,7 +27,7 @@ class NewJdbcScanner : public VScanner { public: NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, const TupleId& tuple_id, const std::string& query_string, - TOdbcTableType::type table_type); + TOdbcTableType::type table_type, RuntimeProfile* profile); Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index db8f8bfc68..1a7296a866 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -51,7 +51,8 @@ Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) { if (_eos == true) { return Status::OK(); } - NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node); + NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node, + _state->runtime_profile()); _scanner_pool.add(scanner); RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); scanners->push_back(static_cast<VScanner*>(scanner)); diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp index 71b1a8d525..f19b44c294 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp @@ -26,8 +26,8 @@ static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner"; namespace doris::vectorized { NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit, - const TOdbcScanNode& odbc_scan_node) - : VScanner(state, static_cast<VScanNode*>(parent), limit), + const TOdbcScanNode& odbc_scan_node, RuntimeProfile* profile) + : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), _is_init(false), _odbc_eof(false), _table_name(odbc_scan_node.table_name), diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h index 012fc8b3c1..e238a544fe 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.h +++ b/be/src/vec/exec/scan/new_odbc_scanner.h @@ -26,7 +26,7 @@ namespace doris::vectorized { class NewOdbcScanner : public VScanner { public: NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit, - const TOdbcScanNode& odbc_scan_node); + const TOdbcScanNode& odbc_scan_node, RuntimeProfile* profile); Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index fe1a010df4..cc1f030c6a 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -26,11 +26,10 @@ namespace doris::vectorized { NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, bool need_agg_finalize, const TPaloScanRange& scan_range, RuntimeProfile* profile) - : VScanner(state, static_cast<VScanNode*>(parent), limit), + : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), - _version(-1), - _profile(profile) { + _version(-1) { _tablet_schema = std::make_shared<TabletSchema>(); } diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 5f253ea93f..97ba78aa2b 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -79,7 +79,6 @@ private: // ========= profiles ========== int64_t _compressed_bytes_read = 0; int64_t _raw_rows_read = 0; - RuntimeProfile* _profile; bool _profile_updated = false; }; } // namespace vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index e67ff364be..63c8fa1355 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -80,6 +80,12 @@ Status ScannerScheduler::init(ExecEnv* env) { new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_queue_size, "remote_scan")); + ThreadPoolBuilder("LimitedScanThreadPool") + .set_min_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) + .build(&_limited_scan_thread_pool); + _is_init = true; return Status::OK(); } @@ -94,6 +100,11 @@ Status ScannerScheduler::submit(ScannerContext* ctx) { return Status::OK(); } +std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token( + ThreadPool::ExecutionMode mode, int max_concurrency) { + return _limited_scan_thread_pool->new_token(mode, max_concurrency); +} + void ScannerScheduler::_schedule_thread(int queue_id) { BlockingQueue<ScannerContext*>* queue = _pending_queues[queue_id]; while (!_is_closed) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index f8c1a8f3df..a0062500d9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -19,6 +19,7 @@ #include "common/status.h" #include "util/blocking_queue.hpp" +#include "util/threadpool.h" #include "vec/exec/scan/scanner_context.h" namespace doris::vectorized { @@ -50,6 +51,9 @@ public: Status submit(ScannerContext* ctx); + std::unique_ptr<ThreadPoolToken> new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, + int max_concurrency); + private: // scheduling thread function void _schedule_thread(int queue_id); @@ -75,8 +79,10 @@ private: // execution thread pool // _local_scan_thread_pool is for local scan task(typically, olap scanner) // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, etc.) + // _limited_scan_thread_pool is a special pool for queries with resource limit std::unique_ptr<PriorityThreadPool> _local_scan_thread_pool; std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool; + std::unique_ptr<ThreadPool> _limited_scan_thread_pool; // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index d0492ea9c7..c1f85cf01b 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -43,14 +43,13 @@ namespace doris::vectorized { VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit, const TFileScanRange& scan_range, RuntimeProfile* profile, KVCache<std::string>& kv_cache) - : VScanner(state, static_cast<VScanNode*>(parent), limit), + : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), _cur_reader(nullptr), _cur_reader_eof(false), _mem_pool(std::make_unique<MemPool>()), - _profile(profile), _kv_cache(kv_cache), _strict_mode(false) { if (scan_range.params.__isset.strict_mode) { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 28c0e3d347..48bbfbf3ad 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -45,6 +45,10 @@ public: Status prepare(VExprContext** vconjunct_ctx_ptr, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + doris::TabletStorageType get_storage_type() override { + return doris::TabletStorageType::STORAGE_TYPE_REMOTE; + } + protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; @@ -106,9 +110,6 @@ protected: // Mem pool used to allocate _src_tuple and _src_tuple_row std::unique_ptr<MemPool> _mem_pool; - // Profile - RuntimeProfile* _profile; - KVCache<std::string>& _kv_cache; bool _scanner_eof = false; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index e9518fbe91..071509f6ef 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -21,10 +21,11 @@ namespace doris::vectorized { -VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit) +VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, RuntimeProfile* profile) : _state(state), _parent(parent), _limit(limit), + _profile(profile), _input_tuple_desc(parent->input_tuple_desc()), _output_tuple_desc(parent->output_tuple_desc()) { _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 07ddf65298..e8817670e7 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -38,7 +38,7 @@ struct ScannerCounter { class VScanner { public: - VScanner(RuntimeState* state, VScanNode* parent, int64_t limit); + VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, RuntimeProfile* profile); virtual ~VScanner() {} @@ -89,7 +89,7 @@ public: int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); } - doris::TabletStorageType get_storage_type() { + virtual doris::TabletStorageType get_storage_type() { return doris::TabletStorageType::STORAGE_TYPE_LOCAL; } @@ -133,6 +133,8 @@ protected: // Set if scan node has sort limit info int64_t _limit = -1; + RuntimeProfile* _profile; + const TupleDescriptor* _input_tuple_desc = nullptr; const TupleDescriptor* _output_tuple_desc = nullptr; const TupleDescriptor* _real_tuple_desc = nullptr; diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc index dc2b53c9f6..17dbc22b48 100644 --- a/be/test/runtime/test_env.cc +++ b/be/test/runtime/test_env.cc @@ -36,7 +36,6 @@ TestEnv::TestEnv() { _exec_env->_thread_mgr = new ThreadResourceMgr(2); _exec_env->_disk_io_mgr = new DiskIoMgr(1, 1, 1, 10); _exec_env->disk_io_mgr()->init(-1); - _exec_env->_scan_thread_pool = new PriorityThreadPool(1, 16, "ut_scan"); _exec_env->_result_queue_mgr = new ResultQueueMgr(); // TODO may need rpc support, etc. } @@ -58,7 +57,6 @@ void TestEnv::init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t c TestEnv::~TestEnv() { SAFE_DELETE(_exec_env->_result_queue_mgr); SAFE_DELETE(_exec_env->_buffer_pool); - SAFE_DELETE(_exec_env->_scan_thread_pool); SAFE_DELETE(_exec_env->_disk_io_mgr); SAFE_DELETE(_exec_env->_thread_mgr); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org