This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f236a5  [BUG] Fix core when schema change (#5018)
1f236a5 is described below

commit 1f236a533900c63360d8f20df83e51b19d3cd519
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Fri Dec 4 09:53:19 2020 +0800

    [BUG] Fix core when schema change (#5018)
---
 be/src/olap/collect_iterator.cpp | 17 ++++++-------
 be/src/olap/collect_iterator.h   |  2 --
 be/src/olap/reader.cpp           | 53 +++++++++++++++++++++++++++++++---------
 be/src/olap/reader.h             |  8 +++---
 4 files changed, 54 insertions(+), 26 deletions(-)

diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp
index fba50f3..c3e0ebd 100644
--- a/be/src/olap/collect_iterator.cpp
+++ b/be/src/olap/collect_iterator.cpp
@@ -40,12 +40,11 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr 
rs_reader) {
     std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader, 
_reader));
     RETURN_NOT_OK(child->init());
     if (child->current_row() == nullptr) {
-        return OLAP_SUCCESS;
+        return OLAP_ERR_DATA_EOF;
     }
 
     LevelIterator* child_ptr = child.release();
     _children.push_back(child_ptr);
-    _rs_readers.push_back(rs_reader);
     return OLAP_SUCCESS;
 }
 
@@ -53,28 +52,28 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr 
rs_reader) {
 // status will be used as the base rowset, and the other rowsets will be 
merged first and
 // then merged with the base rowset.
 void CollectIterator::build_heap() {
-    DCHECK(_rs_readers.size() == _children.size());
+    DCHECK(_reader->_rs_readers.size() == _children.size());
     _reverse = _reader->_tablet->tablet_schema().keys_type() == 
KeysType::UNIQUE_KEYS;
     if (_children.empty()) {
         _inner_iter.reset(nullptr);
         return;
     } else if (_merge) {
-        DCHECK(!_rs_readers.empty());
+        DCHECK(!_reader->_rs_readers.empty());
         // build merge heap with two children, a base rowset as level0iterator 
and
         // other cumulative rowsets as a level1iterator
         if (_children.size() > 1) {
             // find base rowset(max rownum),
-            RowsetReaderSharedPtr base_reader = _rs_readers[0];
+            RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0];
             int base_reader_idx = 0;
-            for (size_t i = 1; i < _rs_readers.size(); ++i) {
-                if (_rs_readers[i]->rowset()->rowset_meta()->num_rows() >
+            for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) {
+                if 
(_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() >
                     base_reader->rowset()->rowset_meta()->num_rows()) {
-                    base_reader = _rs_readers[i];
+                    base_reader = _reader->_rs_readers[i];
                     base_reader_idx = i;
                 }
             }
             std::vector<LevelIterator*> cumu_children;
-            for (size_t i = 0; i < _rs_readers.size(); ++i) {
+            for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) {
                 if (i != base_reader_idx) {
                     cumu_children.push_back(_children[i]);
                 }
diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h
index 97a4834..173dba4 100644
--- a/be/src/olap/collect_iterator.h
+++ b/be/src/olap/collect_iterator.h
@@ -164,8 +164,6 @@ private:
 
     // Hold reader point to access read params, such as fetch conditions.
     Reader* _reader = nullptr;
-    std::vector<RowsetReaderSharedPtr> _rs_readers;
-
 };
 
 } // namespace doris
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 43598e5..75f9789 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -90,7 +90,7 @@ std::string Reader::KeysParam::to_string() const {
     return ss.str();
 }
 
-Reader::Reader() {
+Reader::Reader() : _collect_iter(new CollectIterator()) {
     _tracker.reset(new MemTracker(-1));
     _predicate_mem_pool.reset(new MemPool(_tracker.get()));
 }
@@ -119,14 +119,28 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
                      << ", version:" << read_params.version;
         return res;
     }
-
-    if (_rs_readers.size() == 1 &&
-        !_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) {
-        _next_row_func = &Reader::_dup_key_next_row;
+    // When only one rowset has data, and this rowset is nonoverlapping, we 
can read directly without aggregation
+    bool has_delete_rowset = false;
+    int nonoverlapping_count = 0;
+    for (auto rs_reader : _rs_readers) {
+        if (rs_reader->rowset()->rowset_meta()->delete_flag()) {
+            has_delete_rowset = true;
+            break;
+        }
+        if (rs_reader->rowset()->rowset_meta()->num_rows() > 0 &&
+            !rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) {
+            if (++nonoverlapping_count > 1) {
+                break;
+            }
+        }
+    }
+    if (nonoverlapping_count == 1 && !has_delete_rowset) {
+        _next_row_func = _tablet->keys_type() == AGG_KEYS ? 
&Reader::_direct_agg_key_next_row
+                                                          : 
&Reader::_direct_next_row;
     } else {
         switch (_tablet->keys_type()) {
         case KeysType::DUP_KEYS:
-            _next_row_func = &Reader::_dup_key_next_row;
+            _next_row_func = &Reader::_direct_next_row;
             break;
         case KeysType::UNIQUE_KEYS:
             _next_row_func = &Reader::_unique_key_next_row;
@@ -143,8 +157,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, 
ObjectPool* agg_pool,
-                                     bool* eof) {
+OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, 
ObjectPool* agg_pool,
+                                    bool* eof) {
     if (UNLIKELY(_next_key == nullptr)) {
         *eof = true;
         return OLAP_SUCCESS;
@@ -158,6 +172,22 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* 
row_cursor, MemPool* mem_pool, O
     }
     return OLAP_SUCCESS;
 }
+OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* 
mem_pool,
+                                            ObjectPool* agg_pool, bool* eof) {
+    if (UNLIKELY(_next_key == nullptr)) {
+        *eof = true;
+        return OLAP_SUCCESS;
+    }
+    init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool);
+    auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
+    if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
+        return res;
+    }
+    if (_need_agg_finalize) {
+        agg_finalize_row(_value_cids, row_cursor, mem_pool);
+    }
+    return OLAP_SUCCESS;
+}
 
 OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, 
ObjectPool* agg_pool,
                                      bool* eof) {
@@ -254,8 +284,6 @@ void Reader::close() {
     for (auto pred : _col_predicates) {
         delete pred;
     }
-
-    delete _collect_iter;
 }
 
 OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
@@ -352,7 +380,9 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& 
read_params) {
             LOG(WARNING) << "failed to add child to iterator";
             return res;
         }
-        _rs_readers.push_back(rs_reader);
+        if (res == OLAP_SUCCESS) {
+            _rs_readers.push_back(rs_reader);
+        }
     }
     _collect_iter->build_heap();
     _next_key = _collect_iter->current_row(&_next_delete_flag);
@@ -390,7 +420,6 @@ OLAPStatus Reader::_init_params(const ReaderParams& 
read_params) {
 
     _init_seek_columns();
 
-    _collect_iter = new CollectIterator();
     _collect_iter->init(this);
 
     if (_tablet->tablet_schema().has_sequence_col()) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 0cd7142..9b2c911 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -145,8 +145,10 @@ private:
 
     void _init_load_bf_columns(const ReaderParams& read_params);
 
-    OLAPStatus _dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, 
ObjectPool* agg_pool,
-                                 bool* eof);
+    OLAPStatus _direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, 
ObjectPool* agg_pool,
+                                bool* eof);
+    OLAPStatus _direct_agg_key_next_row(RowCursor* row_cursor, MemPool* 
mem_pool,
+                                        ObjectPool* agg_pool, bool* eof);
     OLAPStatus _agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, 
ObjectPool* agg_pool,
                                  bool* eof);
     OLAPStatus _unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, 
ObjectPool* agg_pool,
@@ -183,7 +185,7 @@ private:
     bool _has_sequence_col = false;
     int32_t _sequence_col_idx = -1;
     const RowCursor* _next_key = nullptr;
-    CollectIterator* _collect_iter = nullptr;
+    std::unique_ptr<CollectIterator> _collect_iter;
     std::vector<uint32_t> _key_cids;
     std::vector<uint32_t> _value_cids;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to