This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ec85e225061 [enhance](scanner) pass the tablet in `NewOlapScanner`'s ctor (#26167) ec85e225061 is described below commit ec85e22506140fdec4c73e5346e0f57354f80051 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Wed Nov 1 17:50:14 2023 +0800 [enhance](scanner) pass the tablet in `NewOlapScanner`'s ctor (#26167) --- be/src/pipeline/exec/olap_scan_operator.cpp | 24 ++-- be/src/vec/exec/scan/new_olap_scan_node.cpp | 78 ++++++------- be/src/vec/exec/scan/new_olap_scanner.cpp | 164 +++++++++++----------------- be/src/vec/exec/scan/new_olap_scanner.h | 35 +++--- 4 files changed, 130 insertions(+), 171 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 5ea76ddd0d6..b66157d567f 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -247,13 +247,19 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); - auto build_new_scanner = [&](const TPaloScanRange& scan_range, + auto build_new_scanner = [&](BaseTabletSPtr tablet, int64_t version, const std::vector<OlapScanRange*>& key_ranges) { - std::shared_ptr<vectorized::NewOlapScanner> scanner = - vectorized::NewOlapScanner::create_shared( - state(), this, p._limit_per_scanner, p._olap_scan_node.is_preaggregation, - scan_range, key_ranges, _scanner_profile.get()); - + auto scanner = vectorized::NewOlapScanner::create_shared( + this, vectorized::NewOlapScanner::Params { + state(), + _scanner_profile.get(), + key_ranges, + std::move(tablet), + version, + {}, + p._limit_per_scanner, + p._olap_scan_node.is_preaggregation, + }); RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts)); scanner->set_compound_filters(_compound_filters); scanners->push_back(scanner); @@ -261,7 +267,9 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s }; for (auto& scan_range : _scan_ranges) { auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); - + int64_t version = 0; + std::from_chars(scan_range->version.data(), + scan_range->version.data() + scan_range->version.size(), version); std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges; int size_based_scanners_per_tablet = 1; @@ -282,7 +290,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s ++j, ++i) { scanner_ranges.push_back((*ranges)[i].get()); } - RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges)); + RETURN_IF_ERROR(build_new_scanner(tablet, version, scanner_ranges)); } } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 8c95391d48f..45f0ebf2f34 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -457,14 +457,22 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { tablets_read_source.reserve(_scan_ranges.size()); std::vector<std::vector<size_t>> tablet_rs_seg_count; tablet_rs_seg_count.reserve(_scan_ranges.size()); + std::vector<std::pair<BaseTabletSPtr, int64_t /* version */>> tablets_to_scan; + tablets_to_scan.reserve(_scan_ranges.size()); + + for (auto&& scan_range : _scan_ranges) { + auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); + int64_t version = 0; + std::from_chars(scan_range->version.data(), + scan_range->version.data() + scan_range->version.size(), version); + tablets_to_scan.emplace_back(std::move(tablet), version); + } // Split tablet segment by scanner, only use in pipeline in duplicate key // 1. if tablet count lower than scanner thread num, count segment num of all tablet ready for scan // TODO: some tablet may do not have segment, may need split segment all case if (_shared_scan_opt && _scan_ranges.size() < config::doris_scanner_thread_pool_thread_num) { - for (auto&& scan_range : _scan_ranges) { - auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); - + for (auto&& [tablet, version] : tablets_to_scan) { is_dup_mow_key = tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == UNIQUE_KEYS && tablet->enable_unique_key_merge_on_write()); @@ -472,10 +480,6 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { break; } - int64_t version = 0; - std::from_chars(scan_range->version.c_str(), - scan_range->version.c_str() + scan_range->version.size(), version); - auto& read_source = tablets_read_source.emplace_back(); { std::shared_lock rdlock(tablet->get_header_lock()); @@ -500,24 +504,32 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { } } + auto build_new_scanner = [&](BaseTabletSPtr tablet, int64_t version, + const std::vector<OlapScanRange*>& key_ranges, + TabletReader::ReadSource read_source) { + auto scanner = + NewOlapScanner::create_shared(this, NewOlapScanner::Params { + _state, + _scanner_profile.get(), + key_ranges, + std::move(tablet), + version, + std::move(read_source), + _limit_per_scanner, + _olap_scan_node.is_preaggregation, + }); + RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); + scanner->set_compound_filters(_compound_filters); + scanners->push_back(std::move(scanner)); + return Status::OK(); + }; + if (is_dup_mow_key) { - auto build_new_scanner = [&](const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, - TabletReader::ReadSource read_source) { - std::shared_ptr<NewOlapScanner> scanner = NewOlapScanner::create_shared( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, - key_ranges, std::move(read_source), _scanner_profile.get()); - - RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); - scanner->set_compound_filters(_compound_filters); - scanners->push_back(std::move(scanner)); - return Status::OK(); - }; // 2. Split segment evenly to each scanner (e.g. each scanner need to scan `avg_segment_count_per_scanner` segments) const auto avg_segment_count_by_scanner = std::max(segment_count / config::doris_scanner_thread_pool_thread_num, (size_t)1); for (int i = 0; i < _scan_ranges.size(); ++i) { - auto& scan_range = _scan_ranges[i]; + auto&& [tablet, version] = tablets_to_scan[i]; std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges; int num_ranges = ranges->size(); std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges); @@ -556,7 +568,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { segment_idx_to_scan + need_add_seg_nums}; // only scan need_add_seg_nums RETURN_IF_ERROR(build_new_scanner( - *scan_range, scanner_ranges, + tablet, version, scanner_ranges, {std::move(rs_splits), read_source.delete_predicates})); segment_idx_to_scan += need_add_seg_nums; @@ -565,7 +577,7 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { avg_segment_count_by_scanner) { split.segment_offsets = {segment_idx_to_scan, rs_seg_count[rowset_idx]}; RETURN_IF_ERROR(build_new_scanner( - *scan_range, scanner_ranges, + tablet, version, scanner_ranges, {std::move(rs_splits), read_source.delete_predicates})); segment_idx_to_scan = 0; @@ -589,26 +601,13 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { // dispose some segment tail if (!rs_splits.empty()) { - static_cast<void>( - build_new_scanner(*scan_range, scanner_ranges, + RETURN_IF_ERROR( + build_new_scanner(tablet, version, scanner_ranges, {std::move(rs_splits), read_source.delete_predicates})); } } } else { - auto build_new_scanner = [&](const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges) { - std::shared_ptr<NewOlapScanner> scanner = NewOlapScanner::create_shared( - _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation, scan_range, - key_ranges, _scanner_profile.get()); - - RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts)); - scanner->set_compound_filters(_compound_filters); - scanners->push_back(scanner); - return Status::OK(); - }; - for (auto& scan_range : _scan_ranges) { - auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id)); - + for (auto&& [tablet, version] : tablets_to_scan) { std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges; int size_based_scanners_per_tablet = 1; @@ -630,7 +629,8 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { ++j, ++i) { scanner_ranges.push_back((*ranges)[i].get()); } - RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges)); + // Construct `ReadSource` in `NewOlapScanner::init` + RETURN_IF_ERROR(build_new_scanner(tablet, version, scanner_ranges, {})); } } } diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index eef7cf8271c..7494124d993 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -63,59 +63,22 @@ namespace doris::vectorized { using ReadSource = TabletReader::ReadSource; -NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, - bool aggregation, const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, - RuntimeProfile* profile) - : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), - _aggregation(aggregation), - _version(-1), - _scan_range(scan_range), - _key_ranges(key_ranges) { - _tablet_schema = std::make_shared<TabletSchema>(); +template <class T> +NewOlapScanner::NewOlapScanner(T* parent, NewOlapScanner::Params&& params) + : VScanner(params.state, parent, params.limit, params.profile), + _key_ranges(std::move(params.key_ranges)), + _tablet_reader_params({ + .tablet = std::move(params.tablet), + .aggregation = params.aggregation, + .version = {0, params.version}, + }) { + _tablet_reader_params.set_read_source(std::move(params.read_source)); _is_init = false; } -NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, - bool aggregation, const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, - ReadSource read_source, RuntimeProfile* profile) - : VScanner(state, static_cast<VScanNode*>(parent), limit, profile), - _aggregation(aggregation), - _version(-1), - _scan_range(scan_range), - _key_ranges(key_ranges) { - _tablet_reader_params.set_read_source(std::move(read_source)); - _tablet_schema = std::make_shared<TabletSchema>(); - _is_init = false; -} +template NewOlapScanner::NewOlapScanner(NewOlapScanNode*, NewOlapScanner::Params&&); -NewOlapScanner::NewOlapScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, - int64_t limit, bool aggregation, const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, - RuntimeProfile* profile) - : VScanner(state, local_state, limit, profile), - _aggregation(aggregation), - _version(-1), - _scan_range(scan_range), - _key_ranges(key_ranges) { - _tablet_schema = std::make_shared<TabletSchema>(); - _is_init = false; -} - -NewOlapScanner::NewOlapScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_state, - int64_t limit, bool aggregation, const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, - ReadSource read_source, RuntimeProfile* profile) - : VScanner(state, local_state, limit, profile), - _aggregation(aggregation), - _version(-1), - _scan_range(scan_range), - _key_ranges(key_ranges) { - _tablet_reader_params.set_read_source(std::move(read_source)); - _tablet_schema = std::make_shared<TabletSchema>(); - _is_init = false; -} +template NewOlapScanner::NewOlapScanner(pipeline::OlapScanLocalState*, NewOlapScanner::Params&&); static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema, const std::vector<uint32_t>& read_columns) { @@ -147,8 +110,10 @@ Status NewOlapScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con Status NewOlapScanner::init() { _is_init = true; - auto parent = static_cast<NewOlapScanNode*>(_parent); - auto local_state = static_cast<pipeline::OlapScanLocalState*>(_local_state); + auto* parent = static_cast<NewOlapScanNode*>(_parent); + auto* local_state = static_cast<pipeline::OlapScanLocalState*>(_local_state); + auto& tablet = _tablet_reader_params.tablet; + auto& tablet_schema = _tablet_reader_params.tablet_schema; if (_parent) { for (auto& ctx : parent->_common_expr_ctxs_push_down) { VExprContextSPtr context; @@ -171,43 +136,40 @@ Status NewOlapScanner::init() { // it will be very slow when reading data in segment iterator _tablet_reader->set_batch_size(_state->batch_size()); - // Get olap table - TTabletId tablet_id = _scan_range.tablet_id; - _version = strtoul(_scan_range.version.c_str(), nullptr, 10); TabletSchemaSPtr cached_schema; std::string schema_key; { - _tablet = DORIS_TRY(ExecEnv::get_tablet(tablet_id)); TOlapScanNode& olap_scan_node = _parent ? parent->_olap_scan_node : local_state->olap_scan_node(); if (olap_scan_node.__isset.schema_version && olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() && olap_scan_node.columns_desc[0].col_unique_id >= 0) { - schema_key = SchemaCache::get_schema_key(tablet_id, olap_scan_node.columns_desc, - olap_scan_node.schema_version, - SchemaCache::Type::TABLET_SCHEMA); + schema_key = SchemaCache::get_schema_key( + tablet->tablet_id(), olap_scan_node.columns_desc, olap_scan_node.schema_version, + SchemaCache::Type::TABLET_SCHEMA); cached_schema = SchemaCache::instance()->get_schema<TabletSchemaSPtr>(schema_key); } if (cached_schema) { - _tablet_schema = cached_schema; + tablet_schema = cached_schema; } else { - _tablet_schema->copy_from(*_tablet->tablet_schema()); + tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->copy_from(*tablet->tablet_schema()); if (olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() && olap_scan_node.columns_desc[0].col_unique_id >= 0) { // Originally scanner get TabletSchema from tablet object in BE. // To support lightweight schema change for adding / dropping columns, // tabletschema is bounded to rowset and tablet's schema maybe outdated, // so we have to use schema from a query plan witch FE puts it in query plans. - _tablet_schema->clear_columns(); + tablet_schema->clear_columns(); for (const auto& column_desc : olap_scan_node.columns_desc) { - _tablet_schema->append_column(TabletColumn(column_desc)); + tablet_schema->append_column(TabletColumn(column_desc)); } if (olap_scan_node.__isset.schema_version) { - _tablet_schema->set_schema_version(olap_scan_node.schema_version); + tablet_schema->set_schema_version(olap_scan_node.schema_version); } } if (olap_scan_node.__isset.indexes_desc) { - _tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc); + tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc); } } @@ -216,16 +178,16 @@ Status NewOlapScanner::init() { // acquire tablet rowset readers at the beginning of the scan node // to prevent this case: when there are lots of olap scanners to run for example 10000 // the rowsets maybe compacted when the last olap scanner starts - Version rd_version(0, _version); ReadSource read_source; { - std::shared_lock rdlock(_tablet->get_header_lock()); - auto st = _tablet->capture_rs_readers(rd_version, &read_source.rs_splits); + std::shared_lock rdlock(tablet->get_header_lock()); + auto st = tablet->capture_rs_readers(_tablet_reader_params.version, + &read_source.rs_splits); if (!st.ok()) { LOG(WARNING) << "fail to init reader.res=" << st; return Status::InternalError( "failed to initialize storage reader. tablet_id={} : {}", - _tablet->tablet_id(), st.to_string()); + tablet->tablet_id(), st.to_string()); } } if (!_state->skip_delete_predicate()) { @@ -244,11 +206,11 @@ Status NewOlapScanner::init() { // add read columns in profile if (_state->enable_profile()) { _profile->add_info_string("ReadColumns", - read_columns_to_string(_tablet_schema, _return_columns)); + read_columns_to_string(tablet_schema, _return_columns)); } if (!cached_schema && !schema_key.empty()) { - SchemaCache::instance()->insert_schema(schema_key, _tablet_schema); + SchemaCache::instance()->insert_schema(schema_key, tablet_schema); } return Status::OK(); @@ -286,24 +248,20 @@ Status NewOlapScanner::_init_tablet_reader_params( if (_state->skip_storage_engine_merge()) { _tablet_reader_params.direct_mode = true; - _aggregation = true; + _tablet_reader_params.aggregation = true; } else { auto push_down_agg_type = _parent ? _parent->get_push_down_agg_type() : _local_state->get_push_down_agg_type(); - _tablet_reader_params.direct_mode = _aggregation || single_version || + _tablet_reader_params.direct_mode = _tablet_reader_params.aggregation || single_version || (push_down_agg_type != TPushAggOp::NONE && push_down_agg_type != TPushAggOp::COUNT_ON_INDEX); } RETURN_IF_ERROR(_init_return_columns()); - _tablet_reader_params.tablet = _tablet; - _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = ReaderType::READER_QUERY; - _tablet_reader_params.aggregation = _aggregation; _tablet_reader_params.push_down_agg_type_opt = _parent ? _parent->get_push_down_agg_type() : _local_state->get_push_down_agg_type(); - _tablet_reader_params.version = Version(0, _version); // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`, if (_common_expr_ctxs_push_down.empty()) { @@ -345,13 +303,15 @@ Status NewOlapScanner::_init_tablet_reader_params( std::inserter(_tablet_reader_params.function_filters, _tablet_reader_params.function_filters.begin())); + auto& tablet = _tablet_reader_params.tablet; + auto& tablet_schema = _tablet_reader_params.tablet_schema; // Merge the columns in delete predicate that not in latest schema in to current tablet schema for (auto& del_pred : _tablet_reader_params.delete_predicates) { - _tablet_schema->merge_dropped_columns(*del_pred->tablet_schema()); + tablet_schema->merge_dropped_columns(*del_pred->tablet_schema()); } // Range - for (auto key_range : key_ranges) { + for (auto* key_range : key_ranges) { if (key_range->begin_scan_range.size() == 1 && key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { continue; @@ -374,27 +334,26 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.return_columns = _return_columns; } else { // we need to fetch all key columns to do the right aggregation on storage engine side. - for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { + for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) { _tablet_reader_params.return_columns.push_back(i); } for (auto index : _return_columns) { - if (_tablet_schema->column(index).is_key()) { + if (tablet_schema->column(index).is_key()) { continue; - } else { - _tablet_reader_params.return_columns.push_back(index); } + _tablet_reader_params.return_columns.push_back(index); } // expand the sequence column - if (_tablet_schema->has_sequence_col()) { + if (tablet_schema->has_sequence_col()) { bool has_replace_col = false; for (auto col : _return_columns) { - if (_tablet_schema->column(col).aggregation() == + if (tablet_schema->column(col).aggregation() == FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) { has_replace_col = true; break; } } - if (auto sequence_col_idx = _tablet_schema->sequence_col_idx(); + if (auto sequence_col_idx = tablet_schema->sequence_col_idx(); has_replace_col && std::find(_return_columns.begin(), _return_columns.end(), sequence_col_idx) == _return_columns.end()) { _tablet_reader_params.return_columns.push_back(sequence_col_idx); @@ -404,8 +363,8 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.use_page_cache = _state->enable_page_cache(); - if (_tablet->enable_unique_key_merge_on_write() && !_state->skip_delete_bitmap()) { - _tablet_reader_params.delete_bitmap = &_tablet->tablet_meta()->delete_bitmap(); + if (tablet->enable_unique_key_merge_on_write() && !_state->skip_delete_bitmap()) { + _tablet_reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); } if (!_state->skip_storage_engine_merge()) { @@ -414,7 +373,7 @@ Status NewOlapScanner::_init_tablet_reader_params( : ((pipeline::OlapScanLocalState*)_local_state)->olap_scan_node(); // order by table keys optimization for topn // will only read head/tail of data file since it's already sorted by keys - if (olap_scan_node.__isset.sort_info && olap_scan_node.sort_info.is_asc_order.size() > 0) { + if (olap_scan_node.__isset.sort_info && !olap_scan_node.sort_info.is_asc_order.empty()) { _limit = _parent ? ((NewOlapScanNode*)_parent)->_limit_per_scanner : _local_state->limit_per_scanner(); _tablet_reader_params.read_orderby_key = true; @@ -433,7 +392,7 @@ Status NewOlapScanner::_init_tablet_reader_params( // If this is a Two-Phase read query, and we need to delay the release of Rowset // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset - if (_tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { + if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { constexpr static int delayed_s = 60; for (auto rs_reader : _tablet_reader_params.rs_splits) { uint64_t delayed_expired_timestamp = @@ -449,31 +408,32 @@ Status NewOlapScanner::_init_tablet_reader_params( } Status NewOlapScanner::_init_return_columns() { - for (auto slot : _output_tuple_desc->slots()) { + for (auto* slot : _output_tuple_desc->slots()) { if (!slot->is_materialized()) { continue; } if (!slot->need_materialize()) { continue; } + auto& tablet_schema = _tablet_reader_params.tablet_schema; int32_t index = slot->col_unique_id() >= 0 - ? _tablet_schema->field_index(slot->col_unique_id()) - : _tablet_schema->field_index(slot->col_name()); + ? tablet_schema->field_index(slot->col_unique_id()) + : tablet_schema->field_index(slot->col_name()); if (index < 0) { return Status::InternalError( "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}", - slot->col_name(), _tablet_schema->get_all_field_names(), slot->col_unique_id()); + slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id()); } _return_columns.push_back(index); - if (slot->is_nullable() && !_tablet_schema->column(index).is_nullable()) { + if (slot->is_nullable() && !tablet_schema->column(index).is_nullable()) { _tablet_columns_convert_to_null_set.emplace(index); - } else if (!slot->is_nullable() && _tablet_schema->column(index).is_nullable()) { + } else if (!slot->is_nullable() && tablet_schema->column(index).is_nullable()) { return Status::Error<ErrorCode::INVALID_SCHEMA>( "slot(id: {}, name: {})'s nullable does not match " "column(tablet id: {}, index: {}, name: {}) ", - slot->id(), slot->col_name(), _tablet_schema->table_id(), index, - _tablet_schema->column(index).name()); + slot->id(), slot->col_name(), tablet_schema->table_id(), index, + tablet_schema->column(index).name()); } } @@ -526,8 +486,7 @@ Status NewOlapScanner::close(RuntimeState* state) { // so that it will core _tablet_reader_params.rs_splits.clear(); _tablet_reader.reset(); - auto tablet_id = _scan_range.tablet_id; - LOG(INFO) << "close_tablet_id" << tablet_id; + LOG(INFO) << "close_tablet_id" << _tablet_reader_params.tablet->tablet_id(); RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); } @@ -649,9 +608,10 @@ void NewOlapScanner::_update_counters_before_close() { // Update metrics DorisMetrics::instance()->query_scan_bytes->increment(_compressed_bytes_read); DorisMetrics::instance()->query_scan_rows->increment(_raw_rows_read); - _tablet->query_scan_bytes->increment(_compressed_bytes_read); - _tablet->query_scan_rows->increment(_raw_rows_read); - _tablet->query_scan_count->increment(1); + auto& tablet = _tablet_reader_params.tablet; + tablet->query_scan_bytes->increment(_compressed_bytes_read); + tablet->query_scan_rows->increment(_raw_rows_read); + tablet->query_scan_count->increment(1); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 150e27c6e3a..ccb5572846c 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -54,22 +54,19 @@ class NewOlapScanner : public VScanner { ENABLE_FACTORY_CREATOR(NewOlapScanner); public: - NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, - const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, - RuntimeProfile* profile); - - NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation, - const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges, - TabletReader::ReadSource read_source, RuntimeProfile* profile); - - NewOlapScanner(RuntimeState* state, pipeline::ScanLocalStateBase* parent, int64_t limit, - bool aggregation, const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, RuntimeProfile* profile); - - NewOlapScanner(RuntimeState* state, pipeline::ScanLocalStateBase* parent, int64_t limit, - bool aggregation, const TPaloScanRange& scan_range, - const std::vector<OlapScanRange*>& key_ranges, - TabletReader::ReadSource read_source, RuntimeProfile* profile); + struct Params { + RuntimeState* state; + RuntimeProfile* profile; + std::vector<OlapScanRange*> key_ranges; + BaseTabletSPtr tablet; + int64_t version; + TabletReader::ReadSource read_source; + int64_t limit; + bool aggregation; + }; + + template <class T> + NewOlapScanner(T* parent, Params&& params); Status init() override; @@ -97,12 +94,6 @@ private: [[nodiscard]] Status _init_return_columns(); - bool _aggregation; - - TabletSchemaSPtr _tablet_schema; - BaseTabletSPtr _tablet; - int64_t _version; - const TPaloScanRange& _scan_range; std::vector<OlapScanRange*> _key_ranges; TabletReader::ReaderParams _tablet_reader_params; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org