zuochunwei commented on a change in pull request #7613:
URL: https://github.com/apache/incubator-doris/pull/7613#discussion_r781958858



##########
File path: be/src/olap/rowset/segment_v2/segment_iterator.cpp
##########
@@ -581,9 +586,358 @@ Status SegmentIterator::next_batch(RowBlockV2* block) {
     return Status::OK();
 }
 
+/* ---------------------- for vecterization implementation  
---------------------- */
+
+// todo(wb) need a UT here
+void SegmentIterator::_vec_init_lazy_materialization() {
+    _is_pred_column.resize(_schema.columns().size(), false);
+
+    std::set<ColumnId> pred_column_ids; // including short_cir_pred_col_id_set 
and vec_pred_col_id_set
+    _is_all_column_basic_type = true;
+    bool is_predicate_column_exists = false;
+    bool is_non_predicate_column_exists = false;
+    
+    if (!_col_predicates.empty()) {
+        is_predicate_column_exists = true;
+
+        std::set<ColumnId> short_cir_pred_col_id_set; // using set for 
distinct cid
+        std::set<ColumnId> vec_pred_col_id_set;
+        
+        for (auto predicate : _col_predicates) {
+            auto cid = predicate->column_id();
+            FieldType type = _schema.column(cid)->type();
+            _is_pred_column[cid] = true;
+            pred_column_ids.insert(cid);
+
+            // for date type which can not be executed in a vectorized way, 
using short circuit execution
+            if (type == OLAP_FIELD_TYPE_VARCHAR || type == 
OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_DECIMAL
+                || type == OLAP_FIELD_TYPE_DATE || 
predicate->is_in_predicate()) {
+                short_cir_pred_col_id_set.insert(cid);
+                _short_cir_eval_predicate.push_back(predicate);
+                _is_all_column_basic_type = false;
+            } else {
+                vec_pred_col_id_set.insert(predicate->column_id());
+                if (_pre_eval_block_predicate == nullptr) {
+                    _pre_eval_block_predicate = new AndBlockColumnPredicate();
+                }
+                
reinterpret_cast<MutilColumnBlockPredicate*>(_pre_eval_block_predicate)->add_column_predicate(new
 SingleColumnBlockPredicate(predicate));
+            }
+        }
+
+        std::set<ColumnId> del_cond_id_set;
+        
_opts.delete_condition_predicates.get()->get_all_column_ids(del_cond_id_set);
+        short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), 
del_cond_id_set.end());
+        pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end());
+
+        if (_schema.column_ids().size() > pred_column_ids.size()) {
+            for (auto cid : _schema.column_ids()) {
+                if (!_is_pred_column[cid]) {
+                    _non_predicate_columns.push_back(cid);
+                    is_non_predicate_column_exists = true;
+                }
+            }
+        }
+
+        _vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), 
vec_pred_col_id_set.cend());
+        _short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(), 
short_cir_pred_col_id_set.cend());
+    } else {
+        _is_all_column_basic_type = false;
+        is_non_predicate_column_exists = true;
+        for (auto cid : _schema.column_ids()) {
+            _non_predicate_columns.push_back(cid);
+        }
+    }
+
+    // note(wb) in following cases we disable lazy materialization
+    // case 1: when all column is basic type(is_all_column_basic_type = true) 
+    //   because we think `seek and read` cost > read page cost, lazy 
materialize may cause more `seek and read`, so disable it
+    // case 2: all column is predicate column
+    // case 3: all column is not predicate column
+    // todo(wb) need further research more lazy materialization rule, such as 
get more info from `statistics` for better decision
+    if (_is_all_column_basic_type) {
+        std::set<ColumnId> pred_set(_vec_pred_column_ids.begin(), 
_vec_pred_column_ids.end());
+        std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(), 
_non_predicate_columns.end());
+
+        // when _is_all_column_basic_type = true, _first_read_column_ids 
should keep the same order with _schema.column_ids which stands for return 
column order
+        for (int i = 0; i < _schema.num_column_ids(); i++) {
+            auto cid = _schema.column_ids()[i];
+            if (pred_set.find(cid) != pred_set.end()) {
+                _first_read_column_ids.push_back(cid);    
+            } else if (non_pred_set.find(cid) != non_pred_set.end()) {
+                _first_read_column_ids.push_back(cid);
+                _is_pred_column[cid] = true; // in this case, non-predicate 
column should also be filtered by sel idx, so we regard it as pred columns
+            }
+        }
+
+    } else if (is_predicate_column_exists && !is_non_predicate_column_exists) {
+        _first_read_column_ids.assign(pred_column_ids.cbegin(), 
pred_column_ids.cend());
+    } else if (!is_predicate_column_exists && is_non_predicate_column_exists) {
+        for (auto cid : _non_predicate_columns) {
+            _first_read_column_ids.push_back(cid);
+        }
+    } else {
+        _lazy_materialization_read = true;
+        _first_read_column_ids.assign(pred_column_ids.cbegin(), 
pred_column_ids.cend());
+    }
+
+    // make _schema_block_id_map
+    _schema_block_id_map.resize(_schema.columns().size());
+    for (int i = 0; i < _schema.num_column_ids(); i++) {
+        auto cid = _schema.column_ids()[i];
+        _schema_block_id_map[cid] = i;
+    }
+
+}
+
+Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids, 
vectorized::MutableColumns& column_block, size_t nrows) {
+    for (auto cid : column_ids) {
+        auto& column = column_block[cid];
+        size_t rows_read = nrows;
+        RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, 
column));
+        DCHECK_EQ(nrows, rows_read);
+    }
+    return Status::OK();
+}
+
+void SegmentIterator::_init_current_block(vectorized::Block* block, 
std::vector<vectorized::MutableColumnPtr>& current_columns) {
+    bool is_block_mem_reuse= block->mem_reuse();
+    if (is_block_mem_reuse) {
+        size_t column_to_keep = _schema.num_column_ids();
+        for (int i = block->columns() - 1; i >= column_to_keep; i--) {
+            block->erase(i);
+        }
+        block->clear_column_data();
+    } else { // pre fill output block here
+        for (size_t i = 0; i < _schema.num_column_ids(); i++) {
+            auto cid = _schema.column_ids()[i];
+            auto* column_desc = _schema.columns()[cid];
+            auto data_type = Schema::get_data_type_ptr(column_desc->type());
+            if (column_desc->is_nullable()) {
+                block->insert({nullptr, 
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)), 
column_desc->name()});
+            } else {
+                block->insert({nullptr, std::move(data_type), 
column_desc->name()});
+            }
+        }
+    }
+
+    for (size_t i = 0; i < _schema.num_column_ids(); i++) {
+        auto cid = _schema.column_ids()[i];
+        if (_is_pred_column[cid]) {  //todo(wb) maybe we can relase it after 
output block
+            current_columns[cid]->clear();
+        } else { // non-predicate column
+            auto &column_desc = _schema.columns()[cid];
+            if (is_block_mem_reuse) {
+                current_columns[cid] = 
std::move(*block->get_by_position(i).column).mutate();
+            } else {
+                auto data_type = 
Schema::get_data_type_ptr(column_desc->type());
+                if (column_desc->is_nullable()) {
+                    current_columns[cid] = 
doris::vectorized::ColumnNullable::create(
+                        std::move(data_type->create_column()), 
doris::vectorized::ColumnUInt8::create());
+                } else {
+                    current_columns[cid] = data_type->create_column();
+                }
+            }
+            if (column_desc->type() == OLAP_FIELD_TYPE_DATE) {
+                current_columns[cid]->set_date_type();
+            } else if (column_desc->type() == OLAP_FIELD_TYPE_DATETIME) {
+                current_columns[cid]->set_datetime_type();
+            }
+        }
+    }
+}
+
+void SegmentIterator::_output_non_pred_columns(vectorized::Block* block, bool 
is_block_mem_reuse) {
+    for (auto cid : _non_predicate_columns) {
+        block->replace_by_position(_schema_block_id_map[cid], 
std::move(_current_return_columns[cid]));
+    }
+ }
+
+void SegmentIterator::_output_column_by_sel_idx(vectorized::Block* block, 
std::vector<ColumnId> columnIds, 
+        uint16_t* sel_rowid_idx, uint16_t select_size, bool 
is_block_mem_reuse) {
+    for (auto cid : columnIds) {
+        auto &column_ptr = _current_return_columns[cid];
+        if (is_block_mem_reuse) {
+            column_ptr->filter_by_selector(sel_rowid_idx, select_size, 
+                &block->get_by_position(_schema_block_id_map[cid]).column);
+        } else {
+            block->replace_by_position(_schema_block_id_map[cid], 
+                (*column_ptr).get_ptr()->filter_by_selector(sel_rowid_idx, 
select_size));
+        }
+    }
+ }
+
+
+Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, 
uint32_t& nrows_read, bool set_block_rowid) {
+    do {
+        uint32_t range_from;
+        uint32_t range_to;
+        bool has_next_range =
+            _range_iter->next_range(nrows_read_limit - nrows_read, 
&range_from, &range_to);
+        if (!has_next_range) {
+            break;
+        }
+        if (_cur_rowid == 0 || _cur_rowid != range_from) {
+            _cur_rowid = range_from;
+            RETURN_IF_ERROR(_seek_columns(_first_read_column_ids, _cur_rowid));
+        }
+        size_t rows_to_read = range_to - range_from;
+        RETURN_IF_ERROR(_read_columns(_first_read_column_ids, 
_current_return_columns, rows_to_read));
+        _cur_rowid += rows_to_read;
+        if (set_block_rowid) {
+            for (uint32_t rid = range_from; rid < range_to; rid++) {
+                _block_rowids[nrows_read++] = rid;
+            }
+        } else {
+            nrows_read += rows_to_read;
+        }
+    } while (nrows_read < nrows_read_limit);
+    return Status::OK();
+}
+
+void SegmentIterator::_evaluate_vectorization_predicate(uint16_t* 
sel_rowid_idx, uint16_t& selected_size) {
+    uint16_t new_size = 0;
+    if (_vec_pred_column_ids.size() == 0) {
+        for (uint32_t i = 0; i < selected_size; ++i) {
+            sel_rowid_idx[new_size++] = i;
+        }
+        return;
+    }
+
+    uint16_t original_size = selected_size;
+    bool ret_flags[selected_size];
+    memset(ret_flags, 1, selected_size);
+    _pre_eval_block_predicate->evaluate_vec(_current_return_columns, 
selected_size, ret_flags);
+    
+    for (uint32_t i = 0; i < selected_size; ++i) {
+        if (ret_flags[i]) {
+            sel_rowid_idx[new_size++] = i;
+        }
+    }
+
+    _opts.stats->rows_vec_cond_filtered += original_size - new_size;
+    selected_size = new_size;
+}
+
+void SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* 
vec_sel_rowid_idx, uint16_t* selected_size_ptr) {
+    if (_short_cir_pred_column_ids.size() == 0) {
+        return;
+    }
+    
+    for (auto column_predicate : _short_cir_eval_predicate) {
+        auto column_id = column_predicate->column_id();
+        auto& short_cir_column = _current_return_columns[column_id];
+        column_predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, 
selected_size_ptr);
+    }
+
+    // evaluate delete condition
+    _opts.delete_condition_predicates->evaluate(_current_return_columns, 
vec_sel_rowid_idx, selected_size_ptr);
+}
+
+void SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& 
read_column_ids, std::vector<rowid_t>& rowid_vector,
+        uint16_t* sel_rowid_idx, size_t select_size, 
vectorized::MutableColumns* mutable_columns) {
+    size_t start_idx = 0;
+    while (start_idx < select_size) {
+        size_t end_idx = start_idx + 1;
+        while (end_idx < select_size && (rowid_vector[sel_rowid_idx[end_idx - 
1]] == rowid_vector[sel_rowid_idx[end_idx]] - 1)) {
+            end_idx++;
+        }
+        size_t range = end_idx - start_idx;
+        _seek_columns(read_column_ids, rowid_vector[sel_rowid_idx[start_idx]]);
+        _read_columns(read_column_ids, *mutable_columns, range);
+        start_idx += range;
+    }
+}
+
 Status SegmentIterator::next_batch(vectorized::Block* block) {
-    //TODO
-    return Status::NotSupported("not implement now");
+    bool is_mem_reuse = block->mem_reuse();
+    SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
+    if (UNLIKELY(!_inited)) {
+        RETURN_IF_ERROR(_init(true));

Review comment:
       condition init is not a good practice although this usage is frequent
   separate init from next_batch because these two methods do absolutely 
different things




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to