This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push: new 7fbca548ab VMergeIterator should use nullable info from scanner instead of schema (#10797) 7fbca548ab is described below commit 7fbca548abb3d7d84ba0ca0c2ebc29b5177ec0a5 Author: starocean999 <40539150+starocean...@users.noreply.github.com> AuthorDate: Thu Jul 14 12:22:28 2022 +0800 VMergeIterator should use nullable info from scanner instead of schema (#10797) --- be/src/olap/rowset/beta_rowset_reader.cpp | 7 +- be/src/olap/rowset/rowset_reader_context.h | 3 + be/src/vec/olap/block_reader.cpp | 1 + be/src/vec/olap/vgeneric_iterators.cpp | 110 +++++++++++++++++------------ be/src/vec/olap/vgeneric_iterators.h | 4 +- 5 files changed, 78 insertions(+), 47 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index e314c4c1ab..2d8f79a8da 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -117,8 +117,11 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { // merge or union segment iterator RowwiseIterator* final_iterator; if (config::enable_storage_vectorization && read_context->is_vec) { - if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { - final_iterator = vectorized::new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique); + if (read_context->need_ordered_result && + _rowset->rowset_meta()->is_segments_overlapping()) { + final_iterator = vectorized::new_merge_iterator( + iterators, _parent_tracker, read_context->sequence_id_idx, + read_context->is_unique, read_context->tablet_columns_convert_to_null_set); } else { final_iterator = vectorized::new_union_iterator(iterators, _parent_tracker); } diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 0ae42f6cf4..0bdb903e2e 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -64,6 +64,9 @@ struct RowsetReaderContext { int batch_size = 1024; bool is_vec = false; bool is_unique = false; + + // need pass this info to VMergeIterator + std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr; }; } // namespace doris diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 2d5c542379..19b1192444 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -52,6 +52,7 @@ OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params, _reader_context.batch_size = _batch_size; _reader_context.is_vec = true; + _reader_context.tablet_columns_convert_to_null_set = _tablet_columns_convert_to_null_set; for (auto& rs_reader : rs_readers) { RETURN_NOT_OK(rs_reader->init(&_reader_context)); OLAPStatus res = _vcollect_iter.add_child(rs_reader); diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 2440b09888..13b4acf30c 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -60,28 +60,28 @@ public: size_t data_len = 0; const auto* col_schema = _schema.column(j); switch (col_schema->type()) { - case OLAP_FIELD_TYPE_SMALLINT: - *(int16_t*)data = _rows_returned + j; - data_len = sizeof(int16_t); - break; - case OLAP_FIELD_TYPE_INT: - *(int32_t*)data = _rows_returned + j; - data_len = sizeof(int32_t); - break; - case OLAP_FIELD_TYPE_BIGINT: - *(int64_t*)data = _rows_returned + j; - data_len = sizeof(int64_t); - break; - case OLAP_FIELD_TYPE_FLOAT: - *(float*)data = _rows_returned + j; - data_len = sizeof(float); - break; - case OLAP_FIELD_TYPE_DOUBLE: - *(double*)data = _rows_returned + j; - data_len = sizeof(double); - break; - default: - break; + case OLAP_FIELD_TYPE_SMALLINT: + *(int16_t*)data = _rows_returned + j; + data_len = sizeof(int16_t); + break; + case OLAP_FIELD_TYPE_INT: + *(int32_t*)data = _rows_returned + j; + data_len = sizeof(int32_t); + break; + case OLAP_FIELD_TYPE_BIGINT: + *(int64_t*)data = _rows_returned + j; + data_len = sizeof(int64_t); + break; + case OLAP_FIELD_TYPE_FLOAT: + *(float*)data = _rows_returned + j; + data_len = sizeof(float); + break; + case OLAP_FIELD_TYPE_DOUBLE: + *(double*)data = _rows_returned + j; + data_len = sizeof(double); + break; + default: + break; } vi.insert_data(data, data_len); @@ -91,8 +91,7 @@ public: ++_rows_returned; } - if (row_idx > 0) - return Status::OK(); + if (row_idx > 0) return Status::OK(); return Status::EndOfFile("End of VAutoIncrementIterator"); } @@ -120,12 +119,14 @@ Status VAutoIncrementIterator::init(const StorageReadOptions& opts) { // } class VMergeIteratorContext { public: - VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique) + VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique, + const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set) : _iter(iter), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _num_columns(iter->schema().num_column_ids()), - _num_key_columns(iter->schema().num_key_columns()) {} + _num_key_columns(iter->schema().num_key_columns()), + _tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) {} VMergeIteratorContext(const VMergeIteratorContext&) = delete; VMergeIteratorContext(VMergeIteratorContext&&) = delete; @@ -137,8 +138,7 @@ public: _iter = nullptr; } - Status block_reset() - { + Status block_reset() { if (!_block) { const Schema& schema = _iter->schema(); const auto& column_ids = schema.column_ids(); @@ -149,11 +149,21 @@ public: return Status::RuntimeError("invalid data type"); } if (column_desc->is_nullable()) { - data_type = std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)); + data_type = + std::make_shared<vectorized::DataTypeNullable>(std::move(data_type)); } auto column = data_type->create_column(); column->reserve(_block_row_max); - _block.insert(ColumnWithTypeAndName(std::move(column), data_type, column_desc->name())); + + if (_tablet_columns_convert_to_null_set && + _tablet_columns_convert_to_null_set->find(column_ids[i]) != + _tablet_columns_convert_to_null_set->end()) { + column = make_nullable(std::move(column))->assume_mutable(); + data_type = make_nullable(data_type); + } + + _block.insert( + ColumnWithTypeAndName(std::move(column), data_type, column_desc->name())); } } else { _block.clear_column_data(); @@ -233,6 +243,7 @@ private: int _block_row_max = 4096; int _num_columns; int _num_key_columns; + const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr; }; Status VMergeIteratorContext::init(const StorageReadOptions& opts) { @@ -281,8 +292,13 @@ Status VMergeIteratorContext::_load_next_block() { class VMergeIterator : public RowwiseIterator { public: // VMergeIterator takes the ownership of input iterators - VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) : - _origin_iters(iters),_sequence_id_idx(sequence_id_idx), _is_unique(is_unique) { + VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent, + int sequence_id_idx, bool is_unique, + const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set) + : _origin_iters(iters), + _sequence_id_idx(sequence_id_idx), + _is_unique(is_unique), + _tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) { // use for count the mem use of Block use in Merge _mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, false); } @@ -313,15 +329,16 @@ private: } }; - using VMergeHeap = std::priority_queue<VMergeIteratorContext*, - std::vector<VMergeIteratorContext*>, - VMergeContextComparator>; + using VMergeHeap = + std::priority_queue<VMergeIteratorContext*, std::vector<VMergeIteratorContext*>, + VMergeContextComparator>; VMergeHeap _merge_heap; int block_row_max = 0; int _sequence_id_idx = -1; bool _is_unique = false; + const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr; }; Status VMergeIterator::init(const StorageReadOptions& opts) { @@ -331,7 +348,8 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { _schema = &(*_origin_iters.begin())->schema(); for (auto iter : _origin_iters) { - auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx, _is_unique); + auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx, _is_unique, + _tablet_columns_convert_to_null_set); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; @@ -348,8 +366,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { Status VMergeIterator::next_batch(vectorized::Block* block) { while (block->rows() < block_row_max) { - if (_merge_heap.empty()) - break; + if (_merge_heap.empty()) break; auto ctx = _merge_heap.top(); _merge_heap.pop(); @@ -386,7 +403,8 @@ public: } ~VUnionIterator() override { - std::for_each(_origin_iters.begin(), _origin_iters.end(), std::default_delete<RowwiseIterator>()); + std::for_each(_origin_iters.begin(), _origin_iters.end(), + std::default_delete<RowwiseIterator>()); } Status init(const StorageReadOptions& opts) override; @@ -432,15 +450,19 @@ Status VUnionIterator::next_batch(vectorized::Block* block) { return Status::EndOfFile("End of VUnionIterator"); } - -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) { +RowwiseIterator* new_merge_iterator( + std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, + int sequence_id_idx, bool is_unique, + const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set) { if (inputs.size() == 1) { return *(inputs.begin()); } - return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique); + return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique, + tablet_columns_convert_to_null_set); } -RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) { +RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, + std::shared_ptr<MemTracker> parent) { if (inputs.size() == 1) { return *(inputs.begin()); } @@ -451,6 +473,6 @@ RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t num_ro return new VAutoIncrementIterator(schema, num_rows); } -} +} // namespace vectorized } // namespace doris diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index 063d07da51..0b9af85ef5 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -27,7 +27,9 @@ namespace vectorized { // // Inputs iterators' ownership is taken by created merge iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique); +RowwiseIterator* new_merge_iterator( + std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, + const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr); // Create a union iterator for input iterators. Union iterator will read // input iterators one by one. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org