This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new ca3eb64 push down conditions on unique table value columns to base rowset (#6457) ca3eb64 is described below commit ca3eb6490e6facc8dc8889d4b917208ba93b0c7d Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Thu Aug 26 09:14:49 2021 +0800 push down conditions on unique table value columns to base rowset (#6457) --- be/src/exec/olap_scanner.cpp | 1 + be/src/olap/olap_cond.h | 3 +- be/src/olap/reader.cpp | 46 +++++++++++++++------- be/src/olap/reader.h | 7 ++++ be/src/olap/rowset/beta_rowset_reader.cpp | 13 ++++-- be/src/olap/rowset/rowset_reader_context.h | 3 ++ be/src/olap/rowset/segment_v2/segment_iterator.cpp | 16 +++++--- 7 files changed, 65 insertions(+), 24 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 073c92b..5f977ed 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -182,6 +182,7 @@ Status OlapScanner::_init_params( _params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 && _params.rs_readers[1]->rowset()->start_version() == 2 && !_params.rs_readers[1]->rowset()->rowset_meta()->is_segments_overlapping()); + if (_aggregation || single_version) { _params.return_columns = _return_columns; } else { diff --git a/be/src/olap/olap_cond.h b/be/src/olap/olap_cond.h index 54d699e..84e56be 100644 --- a/be/src/olap/olap_cond.h +++ b/be/src/olap/olap_cond.h @@ -161,6 +161,7 @@ public: } _columns.clear(); } + bool empty() const { return _columns.empty(); } // TODO(yingchun): should do it in constructor void set_tablet_schema(const TabletSchema* schema) { _schema = schema; } @@ -170,7 +171,7 @@ public: // 1. column不属于key列 // 2. column类型是double, float OLAPStatus append_condition(const TCondition& condition); - + // 通过所有列上的删除条件对RowCursor进行过滤 // Return true means this row should be filtered out, otherwise return false bool delete_conditions_eval(const RowCursor& row) const; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 4776732..d5ca432 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -17,9 +17,10 @@ #include "olap/reader.h" +#include <parallel_hashmap/phmap.h> + #include <boost/algorithm/string/case_conv.hpp> #include <charconv> -#include <parallel_hashmap/phmap.h> #include <unordered_set> #include "olap/bloom_filter_predicate.h" @@ -31,8 +32,8 @@ #include "olap/row_block.h" #include "olap/row_cursor.h" #include "olap/rowset/beta_rowset_reader.h" -#include "olap/schema.h" #include "olap/rowset/column_data.h" +#include "olap/schema.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "runtime/mem_pool.h" @@ -303,6 +304,9 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool void Reader::close() { VLOG_NOTICE << "merged rows:" << _merged_rows; _conditions.finalize(); + if (!_all_conditions.empty()) { + _all_conditions.finalize(); + } _delete_handler.finalize(); for (auto pred : _col_predicates) { @@ -393,7 +397,9 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.return_columns = &_return_columns; _reader_context.seek_columns = &_seek_columns; _reader_context.load_bf_columns = &_load_bf_columns; + _reader_context.load_bf_all_columns = &_load_bf_all_columns; _reader_context.conditions = &_conditions; + _reader_context.all_conditions = &_all_conditions; _reader_context.predicates = &_col_predicates; _reader_context.value_predicates = &_value_col_predicates; _reader_context.lower_bound_keys = &_keys_param.start_keys; @@ -563,11 +569,13 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { std::vector<uint32_t> columns(scan_key_size); std::iota(columns.begin(), columns.end(), 0); - std::shared_ptr<Schema> schema = std::make_shared<Schema>(_tablet->tablet_schema().columns(), columns); + std::shared_ptr<Schema> schema = + std::make_shared<Schema>(_tablet->tablet_schema().columns(), columns); for (size_t i = 0; i < start_key_size; ++i) { if (read_params.start_key[i].size() != scan_key_size) { - OLAP_LOG_WARNING("The start_key.at(%ld).size == %ld, not equals the %ld", i, read_params.start_key[i].size(), scan_key_size); + OLAP_LOG_WARNING("The start_key.at(%ld).size == %ld, not equals the %ld", i, + read_params.start_key[i].size(), scan_key_size); return OLAP_ERR_INPUT_PARAMETER_ERROR; } @@ -594,7 +602,8 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { _keys_param.end_keys.resize(end_key_size, nullptr); for (size_t i = 0; i < end_key_size; ++i) { if (read_params.end_key[i].size() != scan_key_size) { - OLAP_LOG_WARNING("The end_key.at(%ld).size == %ld, not equals the %ld", i, read_params.end_key[i].size(), scan_key_size); + OLAP_LOG_WARNING("The end_key.at(%ld).size == %ld, not equals the %ld", i, + read_params.end_key[i].size(), scan_key_size); return OLAP_ERR_INPUT_PARAMETER_ERROR; } @@ -603,8 +612,8 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { return OLAP_ERR_MALLOC_ERROR; } - OLAPStatus res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(), - read_params.end_key[i].values(), schema); + OLAPStatus res = _keys_param.end_keys[i]->init_scan_key( + _tablet->tablet_schema(), read_params.end_key[i].values(), schema); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res); return res; @@ -624,6 +633,7 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { void Reader::_init_conditions_param(const ReaderParams& read_params) { _conditions.set_tablet_schema(&_tablet->tablet_schema()); + _all_conditions.set_tablet_schema(&_tablet->tablet_schema()); for (const auto& condition : read_params.conditions) { ColumnPredicate* predicate = _parse_to_predicate(condition); if (predicate != nullptr) { @@ -636,6 +646,8 @@ void Reader::_init_conditions_param(const ReaderParams& read_params) { OLAPStatus status = _conditions.append_condition(condition); DCHECK_EQ(OLAP_SUCCESS, status); } + OLAPStatus status = _all_conditions.append_condition(condition); + DCHECK_EQ(OLAP_SUCCESS, status); } } @@ -849,7 +861,8 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o int128_t value = 0; StringParser::ParseResult result; for (auto& cond_val : condition.condition_values) { - value = StringParser::string_to_int<__int128>(cond_val.c_str(), cond_val.size(), &result); + value = StringParser::string_to_int<__int128>(cond_val.c_str(), cond_val.size(), + &result); values.insert(value); } if (condition.condition_op == "*=") { @@ -947,17 +960,22 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o } return predicate; } - void Reader::_init_load_bf_columns(const ReaderParams& read_params) { - // add all columns with condition to _load_bf_columns - for (const auto& cond_column : _conditions.columns()) { + _init_load_bf_columns(read_params, &_conditions, &_load_bf_columns); + _init_load_bf_columns(read_params, &_all_conditions, &_load_bf_all_columns); +} + +void Reader::_init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions, + std::set<uint32_t>* load_bf_columns) { + // add all columns with condition to load_bf_columns + for (const auto& cond_column : conditions->columns()) { if (!_tablet->tablet_schema().column(cond_column.first).is_bf_column()) { continue; } for (const auto& cond : cond_column.second->conds()) { if (cond->op == OP_EQ || (cond->op == OP_IN && cond->operand_set.size() < MAX_OP_IN_FIELD_NUM)) { - _load_bf_columns.insert(cond_column.first); + load_bf_columns->insert(cond_column.first); } } } @@ -986,7 +1004,7 @@ void Reader::_init_load_bf_columns(const ReaderParams& read_params) { } for (int i = 0; i < max_equal_index; ++i) { - _load_bf_columns.erase(i); + load_bf_columns->erase(i); } // remove the max_equal_index column when it's not varchar @@ -996,7 +1014,7 @@ void Reader::_init_load_bf_columns(const ReaderParams& read_params) { } FieldType type = _tablet->tablet_schema().column(max_equal_index).type(); if ((type != OLAP_FIELD_TYPE_VARCHAR && type != OLAP_FIELD_TYPE_STRING)|| max_equal_index + 1 > _tablet->num_short_key_columns()) { - _load_bf_columns.erase(max_equal_index); + load_bf_columns->erase(max_equal_index); } } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 2d62aab..525c2ad 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -162,6 +162,8 @@ private: void _init_seek_columns(); void _init_load_bf_columns(const ReaderParams& read_params); + void _init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions, + std::set<uint32_t>* load_bf_columns); // Direcly read row from rowset and pass to upper caller. No need to do aggregation. // This is usually used for DUPLICATE KEY tables @@ -187,6 +189,7 @@ private: std::shared_ptr<MemTracker> _tracker; std::unique_ptr<MemPool> _predicate_mem_pool; std::set<uint32_t> _load_bf_columns; + std::set<uint32_t> _load_bf_all_columns; std::vector<uint32_t> _return_columns; std::vector<uint32_t> _seek_columns; @@ -195,7 +198,11 @@ private: KeysParam _keys_param; std::vector<bool> _is_lower_keys_included; std::vector<bool> _is_upper_keys_included; + // contains condition on key columns in agg or unique table or all column in dup tables Conditions _conditions; + // contains _conditions and condition on value columns, used for push down + // conditions to base rowset of unique table + Conditions _all_conditions; std::vector<ColumnPredicate*> _col_predicates; std::vector<ColumnPredicate*> _value_col_predicates; DeleteHandler _delete_handler; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index ee13dad..f50eb87 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -79,11 +79,16 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { } // if unique table with rowset [0-x] or [0-1] [2-y] [...], // value column predicates can be pushdown on rowset [0-x] or [2-y] - if (read_context->value_predicates != nullptr && _rowset->keys_type() == UNIQUE_KEYS && + if (_rowset->keys_type() == UNIQUE_KEYS && (_rowset->start_version() == 0 || _rowset->start_version() == 2)) { - read_options.column_predicates.insert(read_options.column_predicates.end(), - read_context->value_predicates->begin(), - read_context->value_predicates->end()); + if (read_context->value_predicates != nullptr) { + read_options.column_predicates.insert(read_options.column_predicates.end(), + read_context->value_predicates->begin(), + read_context->value_predicates->end()); + } + if (read_context->all_conditions != nullptr && !read_context->all_conditions->empty()) { + read_options.conditions = read_context->all_conditions; + } } read_options.use_page_cache = read_context->use_page_cache; diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 9d757dd..d1ef819 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -42,8 +42,11 @@ struct RowsetReaderContext { // columns to load bloom filter index // including columns in "=" or "in" conditions const std::set<uint32_t>* load_bf_columns = nullptr; + const std::set<uint32_t>* load_bf_all_columns = nullptr; // column filter conditions by delete sql const Conditions* conditions = nullptr; + // value column predicate in UNIQUE table + const Conditions* all_conditions = nullptr; // column name -> column predicate // adding column_name for predicate to make use of column selectivity const std::vector<ColumnPredicate*>* predicates = nullptr; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index f39eaa0..5e80270 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -90,7 +90,8 @@ private: bool _eof; }; -SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, const Schema& schema, std::shared_ptr<MemTracker> parent) +SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, const Schema& schema, + std::shared_ptr<MemTracker> parent) : _segment(std::move(segment)), _schema(schema), _column_iterators(_schema.num_columns(), nullptr), @@ -194,11 +195,13 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra // create used column iterator for (auto cid : _seek_schema->column_ids()) { if (_column_iterators[cid] == nullptr) { - RETURN_IF_ERROR(_segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid])); + RETURN_IF_ERROR( + _segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid])); ColumnIteratorOptions iter_opts; iter_opts.stats = _opts.stats; iter_opts.rblock = _rblock.get(); - iter_opts.mem_tracker = MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false); + iter_opts.mem_tracker = + MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false); RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts)); } } @@ -233,6 +236,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row cids.insert(column_condition.first); } } + // first filter data by bloom filter index // bloom filter index only use CondColumn RowRanges bf_row_ranges = RowRanges::create_single(num_rows()); @@ -322,12 +326,14 @@ Status SegmentIterator::_init_return_column_iterators() { } for (auto cid : _schema.column_ids()) { if (_column_iterators[cid] == nullptr) { - RETURN_IF_ERROR(_segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid])); + RETURN_IF_ERROR( + _segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid])); ColumnIteratorOptions iter_opts; iter_opts.stats = _opts.stats; iter_opts.use_page_cache = _opts.use_page_cache; iter_opts.rblock = _rblock.get(); - iter_opts.mem_tracker = MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false); + iter_opts.mem_tracker = + MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false); RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org