This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new dda7604e16 [Bug][Storage-vectorized] fix code dump on outer join with not nullable column (#9112) dda7604e16 is described below commit dda7604e16c0e6fed07708855f47c294b47c8e90 Author: Pxl <952130...@qq.com> AuthorDate: Thu Apr 21 11:02:04 2022 +0800 [Bug][Storage-vectorized] fix code dump on outer join with not nullable column (#9112) --- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 49 ++++++------------- be/src/olap/rowset/segment_v2/segment_iterator.h | 11 ++--- be/src/runtime/thread_mem_tracker_mgr.h | 1 - be/src/vec/core/block.cpp | 9 ++++ be/src/vec/core/block.h | 42 ++++++++-------- be/src/vec/exec/volap_scan_node.cpp | 56 +++++++++++----------- be/src/vec/exec/volap_scan_node.h | 3 ++ 7 files changed, 81 insertions(+), 90 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index e2da9f2bd2..6c342c6742 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -662,8 +662,10 @@ void SegmentIterator::_vec_init_lazy_materialization() { // todo(wb) make a cost-based lazy-materialization framework // check non-pred column type to decide whether using lazy-materialization FieldType type = _schema.column(cid)->type(); - if (_is_all_column_basic_type && (type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT - || type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR || type == OLAP_FIELD_TYPE_STRING)) { + if (_is_all_column_basic_type && + (type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT || + type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR || + type == OLAP_FIELD_TYPE_STRING)) { _is_all_column_basic_type = false; } } @@ -747,17 +749,7 @@ Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids, void SegmentIterator::_init_current_block( vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns) { - bool is_block_mem_reuse = block->mem_reuse(); - if (is_block_mem_reuse) { - block->clear_column_data(_schema.num_column_ids()); - } else { // pre fill output block here - for (size_t i = 0; i < _schema.num_column_ids(); i++) { - auto cid = _schema.column_id(i); - auto column_desc = _schema.column(cid); - auto data_type = Schema::get_data_type_ptr(*column_desc); - block->insert({nullptr, std::move(data_type), column_desc->name()}); - } - } + block->clear_column_data(_schema.num_column_ids()); for (size_t i = 0; i < _schema.num_column_ids(); i++) { auto cid = _schema.column_id(i); @@ -766,12 +758,8 @@ void SegmentIterator::_init_current_block( if (_is_pred_column[cid]) { //todo(wb) maybe we can release it after output block current_columns[cid]->clear(); } else { // non-predicate column - if (is_block_mem_reuse) { - current_columns[cid] = std::move(*block->get_by_position(i).column).mutate(); - } else { - auto data_type = Schema::get_data_type_ptr(*column_desc); - current_columns[cid] = data_type->create_column(); - } + current_columns[cid] = std::move(*block->get_by_position(i).column).mutate(); + if (column_desc->type() == OLAP_FIELD_TYPE_DATE) { current_columns[cid]->set_date_type(); } else if (column_desc->type() == OLAP_FIELD_TYPE_DATETIME) { @@ -782,7 +770,7 @@ void SegmentIterator::_init_current_block( } } -void SegmentIterator::_output_non_pred_columns(vectorized::Block* block, bool is_block_mem_reuse) { +void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) { SCOPED_RAW_TIMER(&_opts.stats->output_col_ns); for (auto cid : _non_predicate_columns) { block->replace_by_position(_schema_block_id_map[cid], @@ -907,6 +895,8 @@ void SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column Status SegmentIterator::next_batch(vectorized::Block* block) { bool is_mem_reuse = block->mem_reuse(); + DCHECK(is_mem_reuse); + SCOPED_RAW_TIMER(&_opts.stats->block_load_ns); if (UNLIKELY(!_inited)) { RETURN_IF_ERROR(_init(true)); @@ -941,24 +931,15 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // todo(wb) abstract make column where if (!_is_pred_column[cid]) { // non-predicate block->replace_by_position(i, std::move(_current_return_columns[cid])); - } else { // predicate - if (!is_mem_reuse) { - auto column_desc = _schema.column(cid); - auto data_type = Schema::get_data_type_ptr(*column_desc); - block->replace_by_position(i, data_type->create_column()); - } } } - // not sure whether block is clear before enter segmentIter, so clear it here. - if (is_mem_reuse) { - block->clear_column_data(); - } + block->clear_column_data(); return Status::EndOfFile("no more data in segment"); } // when no predicate(include delete condition) is provided, output column directly if (_vec_pred_column_ids.empty() && _short_cir_pred_column_ids.empty()) { - _output_non_pred_columns(block, is_mem_reuse); + _output_non_pred_columns(block); } else { // need predicate evaluation uint16_t selected_size = nrows_read; uint16_t sel_rowid_idx[selected_size]; @@ -970,7 +951,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // So output block directly after vectorization evaluation if (_is_all_column_basic_type) { RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx, - selected_size, is_mem_reuse)); + selected_size)); } else { // step 2: evaluate short ciruit predicate // todo(wb) research whether need to read short predicate after vectorization evaluation @@ -986,7 +967,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // step4: output columns // 4.1 output non-predicate column - _output_non_pred_columns(block, is_mem_reuse); + _output_non_pred_columns(block); // 4.2 get union of short_cir_pred and vec_pred std::set<ColumnId> pred_column_ids; @@ -996,7 +977,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // 4.3 output short circuit and predicate column RETURN_IF_ERROR(_output_column_by_sel_idx(block, pred_column_ids, sel_rowid_idx, - selected_size, is_mem_reuse)); + selected_size)); } } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 42a2cafc90..489d19b2a2 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -101,21 +101,20 @@ private: std::vector<vectorized::MutableColumnPtr>& non_pred_vector); void _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t& selected_size); void _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t* selected_size); - void _output_non_pred_columns(vectorized::Block* block, bool is_block_mem_reuse); + void _output_non_pred_columns(vectorized::Block* block); void _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids, std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx, size_t select_size, vectorized::MutableColumns* mutable_columns); template <class Container> Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids, - uint16_t* sel_rowid_idx, uint16_t select_size, - bool is_block_mem_reuse) { + uint16_t* sel_rowid_idx, uint16_t select_size) { SCOPED_RAW_TIMER(&_opts.stats->output_col_ns); for (auto cid : column_ids) { int block_cid = _schema_block_id_map[cid]; - RETURN_IF_ERROR(block->copy_column_data_to_block( - is_block_mem_reuse, _current_return_columns[cid].get(), sel_rowid_idx, - select_size, block_cid, _opts.block_row_max)); + RETURN_IF_ERROR(block->copy_column_data_to_block(_current_return_columns[cid].get(), + sel_rowid_idx, select_size, block_cid, + _opts.block_row_max)); } return Status::OK(); } diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 79c152c243..4ca2adba3e 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -103,7 +103,6 @@ public: void update_tracker_id(int64_t tracker_id); void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) { - DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()); _mem_trackers[mem_tracker->id()] = mem_tracker; DCHECK(_mem_trackers[mem_tracker->id()]); _untracked_mems[mem_tracker->id()] = 0; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index dc970df517..e304a1973e 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -65,6 +65,15 @@ Block::Block(const ColumnsWithTypeAndName& data_) : data {data_} { initialize_index_by_name(); } +Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size) { + for (const auto slot_desc : slots) { + auto column_ptr = slot_desc->get_empty_mutable_column(); + column_ptr->reserve(block_size); + insert(ColumnWithTypeAndName(std::move(column_ptr), slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } +} + Block::Block(const PBlock& pblock) { const char* buf = nullptr; std::string compression_scratch; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 087f4959cc..ee032900a3 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -29,6 +29,8 @@ #include <vector> #include "gen_cpp/data.pb.h" +#include "runtime/descriptors.h" +#include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/core/block_info.h" #include "vec/core/column_with_type_and_name.h" @@ -70,6 +72,7 @@ public: Block(std::initializer_list<ColumnWithTypeAndName> il); Block(const ColumnsWithTypeAndName& data_); Block(const PBlock& pblock); + Block(const std::vector<SlotDescriptor*>& slots, size_t block_size); /// insert the column at the specified position void insert(size_t position, const ColumnWithTypeAndName& elem); @@ -100,8 +103,7 @@ public: ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; } const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; } - Status copy_column_data_to_block(bool is_block_mem_reuse, - doris::vectorized::IColumn* input_col_ptr, + Status copy_column_data_to_block(doris::vectorized::IColumn* input_col_ptr, uint16_t* sel_rowid_idx, uint16_t select_size, int block_cid, size_t batch_size) { // Only the additional deleted filter condition need to materialize column be at the end of the block @@ -111,25 +113,22 @@ public: // `select b from table;` // a column only effective in segment iterator, the block from query engine only contain the b column. // so the `block_cid >= data.size()` is true - if (block_cid >= data.size()) + if (block_cid >= data.size()) { return Status::OK(); + } - if (is_block_mem_reuse) { - auto* raw_res_ptr = this->get_by_position(block_cid).column.get(); - const_cast<doris::vectorized::IColumn*>(raw_res_ptr)->reserve(batch_size); - return input_col_ptr->filter_by_selector( - sel_rowid_idx, select_size, - const_cast<doris::vectorized::IColumn*>(raw_res_ptr)); - } else { - MutableColumnPtr res_col_ptr = data[block_cid].type->create_column(); - res_col_ptr->reserve(batch_size); - auto* raw_res_ptr = res_col_ptr.get(); - RETURN_IF_ERROR(input_col_ptr->filter_by_selector( - sel_rowid_idx, select_size, - const_cast<doris::vectorized::IColumn*>(raw_res_ptr))); - this->replace_by_position(block_cid, std::move(res_col_ptr)); - return Status::OK(); + MutableColumnPtr raw_res_ptr = this->get_by_position(block_cid).column->assume_mutable(); + raw_res_ptr->reserve(batch_size); + + // adapt for outer join change column to nullable + if (raw_res_ptr->is_nullable()) { + auto col_ptr_nullable = + reinterpret_cast<vectorized::ColumnNullable*>(raw_res_ptr.get()); + col_ptr_nullable->get_null_map_column().insert_many_defaults(select_size); + raw_res_ptr = col_ptr_nullable->get_nested_column_ptr(); } + + return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, raw_res_ptr); } void replace_by_position(size_t position, ColumnPtr&& res) { @@ -311,9 +310,8 @@ public: private: void erase_impl(size_t position); void initialize_index_by_name(); - bool is_column_data_null(const doris::TypeDescriptor& type_desc, - const StringRef& data_ref, - const IColumn* column_with_type_and_name, int row); + bool is_column_data_null(const doris::TypeDescriptor& type_desc, const StringRef& data_ref, + const IColumn* column_with_type_and_name, int row); void deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor& type_desc, const StringRef& data_ref, const IColumn* column, int row, bool padding_char); @@ -351,7 +349,7 @@ public: size_t rows() const; size_t columns() const { return _columns.size(); } - bool empty() { return rows() == 0; } + bool empty() const { return rows() == 0; } MutableColumns& mutable_columns() { return _columns; } diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 604dda8e72..4dba1e7c28 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -66,25 +66,20 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { auto doris_scanner_row_num = _limit == -1 ? config::doris_scanner_row_num : std::min(static_cast<int64_t>(config::doris_scanner_row_num), _limit); - auto block_size = _limit == -1 ? state->batch_size() - : std::min(static_cast<int64_t>(state->batch_size()), _limit); - auto block_per_scanner = (doris_scanner_row_num + (block_size - 1)) / block_size; + _block_size = _limit == -1 ? state->batch_size() + : std::min(static_cast<int64_t>(state->batch_size()), _limit); + auto block_per_scanner = (doris_scanner_row_num + (_block_size - 1)) / _block_size; auto pre_block_count = std::min(_volap_scanners.size(), static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)) * block_per_scanner; for (int i = 0; i < pre_block_count; ++i) { - auto block = new Block; - for (const auto slot_desc : _tuple_desc->slots()) { - auto column_ptr = slot_desc->get_empty_mutable_column(); - column_ptr->reserve(block_size); - block->insert(ColumnWithTypeAndName( - std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name())); - } + auto block = new Block(_tuple_desc->slots(), _block_size); _free_blocks.emplace_back(block); _buffered_bytes += block->allocated_bytes(); } + _block_mem_tracker->consume(_buffered_bytes); // read from scanner @@ -155,7 +150,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { Status status = Status::OK(); bool eos = false; RuntimeState* state = scanner->runtime_state(); - DCHECK(NULL != state); + DCHECK(nullptr != state); if (!scanner->is_open()) { status = scanner->open(); if (!status.ok()) { @@ -206,8 +201,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; bool get_free_block = true; - while (!eos && raw_rows_read < raw_rows_threshold && - raw_bytes_read < raw_bytes_threshold && get_free_block) { + while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold && + get_free_block) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -233,7 +228,8 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { std::lock_guard<std::mutex> l(_free_blocks_lock); _free_blocks.emplace_back(block); } else { - if (!blocks.empty() && blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) { + if (!blocks.empty() && + blocks.back()->rows() + block->rows() <= _runtime_state->batch_size()) { MutableBlock(blocks.back()).merge(*block); block->clear_column_data(); std::lock_guard<std::mutex> l(_free_blocks_lock); @@ -339,8 +335,8 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) { for (auto& scan_range : _scan_ranges) { auto tablet_id = scan_range->tablet_id; std::string err; - TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( - tablet_id, true, &err); + TabletSharedPtr tablet = + StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); if (tablet == nullptr) { std::stringstream ss; ss << "failed to get tablet: " << tablet_id << ", reason: " << err; @@ -411,7 +407,9 @@ Status VOlapScanNode::close(RuntimeState* state) { _scan_block_added_cv.notify_all(); // join transfer thread - if (_transfer_thread) _transfer_thread->join(); + if (_transfer_thread) { + _transfer_thread->join(); + } // clear some block in queue // TODO: The presence of transfer_thread here may cause Block's memory alloc and be released not in a thread, @@ -478,7 +476,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { } // wait for block from queue - Block* materialized_block = NULL; + Block* materialized_block = nullptr; { std::unique_lock<std::mutex> l(_blocks_lock); SCOPED_TIMER(_olap_wait_batch_queue_timer); @@ -493,14 +491,14 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { if (!_materialized_blocks.empty()) { materialized_block = _materialized_blocks.back(); - DCHECK(materialized_block != NULL); + DCHECK(materialized_block != nullptr); _materialized_blocks.pop_back(); _materialized_row_batches_bytes -= materialized_block->allocated_bytes(); } } // return block - if (NULL != materialized_block) { + if (nullptr != materialized_block) { // notify scanner _block_consumed_cv.notify_one(); // get scanner's block memory @@ -536,8 +534,6 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { return _status; } -// TODO: we should register the mem cost of new Block in -// alloc block Block* VOlapScanNode::_alloc_block(bool& get_free_block) { { std::lock_guard<std::mutex> l(_free_blocks_lock); @@ -547,15 +543,19 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) { return block; } } + get_free_block = false; - return new Block(); + + auto block = new Block(_tuple_desc->slots(), _block_size); + _buffered_bytes += block->allocated_bytes(); + return block; } int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) { std::list<VOlapScanner*> olap_scanners; int assigned_thread_num = _running_thread; size_t max_thread = std::min(_volap_scanners.size(), - static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)); + static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)); // copy to local { // How many thread can apply to this query @@ -566,7 +566,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per thread_slot_num = _free_blocks.size() / block_per_scanner; thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num); - if (thread_slot_num <= 0) thread_slot_num = 1; + if (thread_slot_num <= 0) { + thread_slot_num = 1; + } } else { std::lock_guard<std::mutex> l(_scan_blocks_lock); if (_scan_blocks.empty()) { @@ -586,9 +588,9 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per auto scanner = _volap_scanners.front(); _volap_scanners.pop_front(); - if (scanner->need_to_close()) + if (scanner->need_to_close()) { scanner->close(state); - else { + } else { olap_scanners.push_back(scanner); _running_thread++; assigned_thread_num++; diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h index 09f77364ed..9557d5c1e7 100644 --- a/be/src/vec/exec/volap_scan_node.h +++ b/be/src/vec/exec/volap_scan_node.h @@ -39,6 +39,7 @@ public: } Status get_next(RuntimeState* state, Block* block, bool* eos) override; Status close(RuntimeState* state) override; + private: void transfer_thread(RuntimeState* state); void scanner_thread(VOlapScanner* scanner); @@ -66,6 +67,8 @@ private: std::shared_ptr<MemTracker> _block_mem_tracker; int _max_materialized_blocks; + + size_t _block_size = 0; }; } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org