This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1f236a5 [BUG] Fix core when schema change (#5018) 1f236a5 is described below commit 1f236a533900c63360d8f20df83e51b19d3cd519 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Fri Dec 4 09:53:19 2020 +0800 [BUG] Fix core when schema change (#5018) --- be/src/olap/collect_iterator.cpp | 17 ++++++------- be/src/olap/collect_iterator.h | 2 -- be/src/olap/reader.cpp | 53 +++++++++++++++++++++++++++++++--------- be/src/olap/reader.h | 8 +++--- 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp index fba50f3..c3e0ebd 100644 --- a/be/src/olap/collect_iterator.cpp +++ b/be/src/olap/collect_iterator.cpp @@ -40,12 +40,11 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader, _reader)); RETURN_NOT_OK(child->init()); if (child->current_row() == nullptr) { - return OLAP_SUCCESS; + return OLAP_ERR_DATA_EOF; } LevelIterator* child_ptr = child.release(); _children.push_back(child_ptr); - _rs_readers.push_back(rs_reader); return OLAP_SUCCESS; } @@ -53,28 +52,28 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { // status will be used as the base rowset, and the other rowsets will be merged first and // then merged with the base rowset. void CollectIterator::build_heap() { - DCHECK(_rs_readers.size() == _children.size()); + DCHECK(_reader->_rs_readers.size() == _children.size()); _reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS; if (_children.empty()) { _inner_iter.reset(nullptr); return; } else if (_merge) { - DCHECK(!_rs_readers.empty()); + DCHECK(!_reader->_rs_readers.empty()); // build merge heap with two children, a base rowset as level0iterator and // other cumulative rowsets as a level1iterator if (_children.size() > 1) { // find base rowset(max rownum), - RowsetReaderSharedPtr base_reader = _rs_readers[0]; + RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0]; int base_reader_idx = 0; - for (size_t i = 1; i < _rs_readers.size(); ++i) { - if (_rs_readers[i]->rowset()->rowset_meta()->num_rows() > + for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) { + if (_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() > base_reader->rowset()->rowset_meta()->num_rows()) { - base_reader = _rs_readers[i]; + base_reader = _reader->_rs_readers[i]; base_reader_idx = i; } } std::vector<LevelIterator*> cumu_children; - for (size_t i = 0; i < _rs_readers.size(); ++i) { + for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) { if (i != base_reader_idx) { cumu_children.push_back(_children[i]); } diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h index 97a4834..173dba4 100644 --- a/be/src/olap/collect_iterator.h +++ b/be/src/olap/collect_iterator.h @@ -164,8 +164,6 @@ private: // Hold reader point to access read params, such as fetch conditions. Reader* _reader = nullptr; - std::vector<RowsetReaderSharedPtr> _rs_readers; - }; } // namespace doris diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 43598e5..75f9789 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -90,7 +90,7 @@ std::string Reader::KeysParam::to_string() const { return ss.str(); } -Reader::Reader() { +Reader::Reader() : _collect_iter(new CollectIterator()) { _tracker.reset(new MemTracker(-1)); _predicate_mem_pool.reset(new MemPool(_tracker.get())); } @@ -119,14 +119,28 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { << ", version:" << read_params.version; return res; } - - if (_rs_readers.size() == 1 && - !_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) { - _next_row_func = &Reader::_dup_key_next_row; + // When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation + bool has_delete_rowset = false; + int nonoverlapping_count = 0; + for (auto rs_reader : _rs_readers) { + if (rs_reader->rowset()->rowset_meta()->delete_flag()) { + has_delete_rowset = true; + break; + } + if (rs_reader->rowset()->rowset_meta()->num_rows() > 0 && + !rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) { + if (++nonoverlapping_count > 1) { + break; + } + } + } + if (nonoverlapping_count == 1 && !has_delete_rowset) { + _next_row_func = _tablet->keys_type() == AGG_KEYS ? &Reader::_direct_agg_key_next_row + : &Reader::_direct_next_row; } else { switch (_tablet->keys_type()) { case KeysType::DUP_KEYS: - _next_row_func = &Reader::_dup_key_next_row; + _next_row_func = &Reader::_direct_next_row; break; case KeysType::UNIQUE_KEYS: _next_row_func = &Reader::_unique_key_next_row; @@ -143,8 +157,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return OLAP_SUCCESS; } -OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, - bool* eof) { +OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, + bool* eof) { if (UNLIKELY(_next_key == nullptr)) { *eof = true; return OLAP_SUCCESS; @@ -158,6 +172,22 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, O } return OLAP_SUCCESS; } +OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, + ObjectPool* agg_pool, bool* eof) { + if (UNLIKELY(_next_key == nullptr)) { + *eof = true; + return OLAP_SUCCESS; + } + init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool); + auto res = _collect_iter->next(&_next_key, &_next_delete_flag); + if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { + return res; + } + if (_need_agg_finalize) { + agg_finalize_row(_value_cids, row_cursor, mem_pool); + } + return OLAP_SUCCESS; +} OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) { @@ -254,8 +284,6 @@ void Reader::close() { for (auto pred : _col_predicates) { delete pred; } - - delete _collect_iter; } OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { @@ -352,7 +380,9 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { LOG(WARNING) << "failed to add child to iterator"; return res; } - _rs_readers.push_back(rs_reader); + if (res == OLAP_SUCCESS) { + _rs_readers.push_back(rs_reader); + } } _collect_iter->build_heap(); _next_key = _collect_iter->current_row(&_next_delete_flag); @@ -390,7 +420,6 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) { _init_seek_columns(); - _collect_iter = new CollectIterator(); _collect_iter->init(this); if (_tablet->tablet_schema().has_sequence_col()) { diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 0cd7142..9b2c911 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -145,8 +145,10 @@ private: void _init_load_bf_columns(const ReaderParams& read_params); - OLAPStatus _dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, - bool* eof); + OLAPStatus _direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, + bool* eof); + OLAPStatus _direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, + ObjectPool* agg_pool, bool* eof); OLAPStatus _agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof); OLAPStatus _unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, @@ -183,7 +185,7 @@ private: bool _has_sequence_col = false; int32_t _sequence_col_idx = -1; const RowCursor* _next_key = nullptr; - CollectIterator* _collect_iter = nullptr; + std::unique_ptr<CollectIterator> _collect_iter; std::vector<uint32_t> _key_cids; std::vector<uint32_t> _value_cids; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org