This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
     new 7fbca548ab VMergeIterator should use nullable info from scanner 
instead of schema (#10797)
7fbca548ab is described below

commit 7fbca548abb3d7d84ba0ca0c2ebc29b5177ec0a5
Author: starocean999 <40539150+starocean...@users.noreply.github.com>
AuthorDate: Thu Jul 14 12:22:28 2022 +0800

    VMergeIterator should use nullable info from scanner instead of schema 
(#10797)
---
 be/src/olap/rowset/beta_rowset_reader.cpp  |   7 +-
 be/src/olap/rowset/rowset_reader_context.h |   3 +
 be/src/vec/olap/block_reader.cpp           |   1 +
 be/src/vec/olap/vgeneric_iterators.cpp     | 110 +++++++++++++++++------------
 be/src/vec/olap/vgeneric_iterators.h       |   4 +-
 5 files changed, 78 insertions(+), 47 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index e314c4c1ab..2d8f79a8da 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -117,8 +117,11 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
     // merge or union segment iterator
     RowwiseIterator* final_iterator;
     if (config::enable_storage_vectorization && read_context->is_vec) {
-        if (read_context->need_ordered_result && 
_rowset->rowset_meta()->is_segments_overlapping()) {
-            final_iterator = vectorized::new_merge_iterator(iterators, 
_parent_tracker, read_context->sequence_id_idx, read_context->is_unique);
+        if (read_context->need_ordered_result &&
+            _rowset->rowset_meta()->is_segments_overlapping()) {
+            final_iterator = vectorized::new_merge_iterator(
+                    iterators, _parent_tracker, read_context->sequence_id_idx,
+                    read_context->is_unique, 
read_context->tablet_columns_convert_to_null_set);
         } else {
             final_iterator = vectorized::new_union_iterator(iterators, 
_parent_tracker);
         }
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index 0ae42f6cf4..0bdb903e2e 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -64,6 +64,9 @@ struct RowsetReaderContext {
     int batch_size = 1024;
     bool is_vec = false;
     bool is_unique = false;
+
+    // need pass this info to VMergeIterator
+    std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 2d5c542379..19b1192444 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -52,6 +52,7 @@ OLAPStatus BlockReader::_init_collect_iter(const 
ReaderParams& read_params,
 
     _reader_context.batch_size = _batch_size;
     _reader_context.is_vec = true;
+    _reader_context.tablet_columns_convert_to_null_set = 
_tablet_columns_convert_to_null_set;
     for (auto& rs_reader : rs_readers) {
         RETURN_NOT_OK(rs_reader->init(&_reader_context));
         OLAPStatus res = _vcollect_iter.add_child(rs_reader);
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 2440b09888..13b4acf30c 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -60,28 +60,28 @@ public:
                 size_t data_len = 0;
                 const auto* col_schema = _schema.column(j);
                 switch (col_schema->type()) {
-                    case OLAP_FIELD_TYPE_SMALLINT:
-                        *(int16_t*)data = _rows_returned + j;
-                        data_len = sizeof(int16_t);
-                        break; 
-                    case OLAP_FIELD_TYPE_INT:
-                        *(int32_t*)data = _rows_returned + j;
-                        data_len = sizeof(int32_t);
-                        break;
-                    case OLAP_FIELD_TYPE_BIGINT:
-                        *(int64_t*)data = _rows_returned + j;
-                        data_len = sizeof(int64_t);
-                        break;
-                    case OLAP_FIELD_TYPE_FLOAT: 
-                        *(float*)data = _rows_returned + j;
-                        data_len = sizeof(float);
-                        break;
-                    case OLAP_FIELD_TYPE_DOUBLE: 
-                        *(double*)data = _rows_returned + j;
-                        data_len = sizeof(double);
-                        break;
-                    default:
-                        break;
+                case OLAP_FIELD_TYPE_SMALLINT:
+                    *(int16_t*)data = _rows_returned + j;
+                    data_len = sizeof(int16_t);
+                    break;
+                case OLAP_FIELD_TYPE_INT:
+                    *(int32_t*)data = _rows_returned + j;
+                    data_len = sizeof(int32_t);
+                    break;
+                case OLAP_FIELD_TYPE_BIGINT:
+                    *(int64_t*)data = _rows_returned + j;
+                    data_len = sizeof(int64_t);
+                    break;
+                case OLAP_FIELD_TYPE_FLOAT:
+                    *(float*)data = _rows_returned + j;
+                    data_len = sizeof(float);
+                    break;
+                case OLAP_FIELD_TYPE_DOUBLE:
+                    *(double*)data = _rows_returned + j;
+                    data_len = sizeof(double);
+                    break;
+                default:
+                    break;
                 }
 
                 vi.insert_data(data, data_len);
@@ -91,8 +91,7 @@ public:
             ++_rows_returned;
         }
 
-        if (row_idx > 0)
-            return Status::OK();
+        if (row_idx > 0) return Status::OK();
         return Status::EndOfFile("End of VAutoIncrementIterator");
     }
 
@@ -120,12 +119,14 @@ Status VAutoIncrementIterator::init(const 
StorageReadOptions& opts) {
 //      }
 class VMergeIteratorContext {
 public:
-    VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool 
is_unique)
+    VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool 
is_unique,
+                          const std::unordered_set<uint32_t>* 
tablet_columns_convert_to_null_set)
             : _iter(iter),
               _sequence_id_idx(sequence_id_idx),
               _is_unique(is_unique),
               _num_columns(iter->schema().num_column_ids()),
-              _num_key_columns(iter->schema().num_key_columns()) {}
+              _num_key_columns(iter->schema().num_key_columns()),
+              
_tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) {}
 
     VMergeIteratorContext(const VMergeIteratorContext&) = delete;
     VMergeIteratorContext(VMergeIteratorContext&&) = delete;
@@ -137,8 +138,7 @@ public:
         _iter = nullptr;
     }
 
-    Status block_reset()
-    {
+    Status block_reset() {
         if (!_block) {
             const Schema& schema = _iter->schema();
             const auto& column_ids = schema.column_ids();
@@ -149,11 +149,21 @@ public:
                     return Status::RuntimeError("invalid data type");
                 }
                 if (column_desc->is_nullable()) {
-                    data_type = 
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
+                    data_type =
+                            
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
                 }
                 auto column = data_type->create_column();
                 column->reserve(_block_row_max);
-                _block.insert(ColumnWithTypeAndName(std::move(column), 
data_type, column_desc->name()));
+
+                if (_tablet_columns_convert_to_null_set &&
+                    _tablet_columns_convert_to_null_set->find(column_ids[i]) !=
+                            _tablet_columns_convert_to_null_set->end()) {
+                    column = 
make_nullable(std::move(column))->assume_mutable();
+                    data_type = make_nullable(data_type);
+                }
+
+                _block.insert(
+                        ColumnWithTypeAndName(std::move(column), data_type, 
column_desc->name()));
             }
         } else {
             _block.clear_column_data();
@@ -233,6 +243,7 @@ private:
     int _block_row_max = 4096;
     int _num_columns;
     int _num_key_columns;
+    const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = 
nullptr;
 };
 
 Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -281,8 +292,13 @@ Status VMergeIteratorContext::_load_next_block() {
 class VMergeIterator : public RowwiseIterator {
 public:
     // VMergeIterator takes the ownership of input iterators
-    VMergeIterator(std::vector<RowwiseIterator*>& iters, 
std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) : 
-        _origin_iters(iters),_sequence_id_idx(sequence_id_idx), 
_is_unique(is_unique) {
+    VMergeIterator(std::vector<RowwiseIterator*>& iters, 
std::shared_ptr<MemTracker> parent,
+                   int sequence_id_idx, bool is_unique,
+                   const std::unordered_set<uint32_t>* 
tablet_columns_convert_to_null_set)
+            : _origin_iters(iters),
+              _sequence_id_idx(sequence_id_idx),
+              _is_unique(is_unique),
+              
_tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) {
         // use for count the mem use of Block use in Merge
         _mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, 
false);
     }
@@ -313,15 +329,16 @@ private:
         }
     };
 
-    using VMergeHeap = std::priority_queue<VMergeIteratorContext*, 
-                                        std::vector<VMergeIteratorContext*>,
-                                        VMergeContextComparator>;
+    using VMergeHeap =
+            std::priority_queue<VMergeIteratorContext*, 
std::vector<VMergeIteratorContext*>,
+                                VMergeContextComparator>;
 
     VMergeHeap _merge_heap;
 
     int block_row_max = 0;
     int _sequence_id_idx = -1;
     bool _is_unique = false;
+    const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = 
nullptr;
 };
 
 Status VMergeIterator::init(const StorageReadOptions& opts) {
@@ -331,7 +348,8 @@ Status VMergeIterator::init(const StorageReadOptions& opts) 
{
     _schema = &(*_origin_iters.begin())->schema();
 
     for (auto iter : _origin_iters) {
-        auto ctx = std::make_unique<VMergeIteratorContext>(iter, 
_sequence_id_idx, _is_unique);
+        auto ctx = std::make_unique<VMergeIteratorContext>(iter, 
_sequence_id_idx, _is_unique,
+                                                           
_tablet_columns_convert_to_null_set);
         RETURN_IF_ERROR(ctx->init(opts));
         if (!ctx->valid()) {
             continue;
@@ -348,8 +366,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts) 
{
 
 Status VMergeIterator::next_batch(vectorized::Block* block) {
     while (block->rows() < block_row_max) {
-        if (_merge_heap.empty())
-            break;
+        if (_merge_heap.empty()) break;
 
         auto ctx = _merge_heap.top();
         _merge_heap.pop();
@@ -386,7 +403,8 @@ public:
     }
 
     ~VUnionIterator() override {
-        std::for_each(_origin_iters.begin(), _origin_iters.end(), 
std::default_delete<RowwiseIterator>());
+        std::for_each(_origin_iters.begin(), _origin_iters.end(),
+                      std::default_delete<RowwiseIterator>());
     }
 
     Status init(const StorageReadOptions& opts) override;
@@ -432,15 +450,19 @@ Status VUnionIterator::next_batch(vectorized::Block* 
block) {
     return Status::EndOfFile("End of VUnionIterator");
 }
 
-
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) {
+RowwiseIterator* new_merge_iterator(
+        std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> 
parent,
+        int sequence_id_idx, bool is_unique,
+        const std::unordered_set<uint32_t>* 
tablet_columns_convert_to_null_set) {
     if (inputs.size() == 1) {
         return *(inputs.begin());
     }
-    return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique);
+    return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique,
+                              tablet_columns_convert_to_null_set);
 }
 
-RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent) {
+RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs,
+                                    std::shared_ptr<MemTracker> parent) {
     if (inputs.size() == 1) {
         return *(inputs.begin());
     }
@@ -451,6 +473,6 @@ RowwiseIterator* new_auto_increment_iterator(const Schema& 
schema, size_t num_ro
     return new VAutoIncrementIterator(schema, num_rows);
 }
 
-}
+} // namespace vectorized
 
 } // namespace doris
diff --git a/be/src/vec/olap/vgeneric_iterators.h 
b/be/src/vec/olap/vgeneric_iterators.h
index 063d07da51..0b9af85ef5 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -27,7 +27,9 @@ namespace vectorized {
 //
 // Inputs iterators' ownership is taken by created merge iterator. And client
 // should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique);
+RowwiseIterator* new_merge_iterator(
+        std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> 
parent, int sequence_id_idx, bool is_unique,
+        const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set 
= nullptr);
 
 // Create a union iterator for input iterators. Union iterator will read
 // input iterators one by one.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to