This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 40c53931e5 [fix](vec) VMergeIterator add key same label for agg table (#14722) 40c53931e5 is described below commit 40c53931e57be494a71591bc1d32fef2450207fd Author: xueweizhang <zxw520bl...@163.com> AuthorDate: Mon Jan 2 22:54:21 2023 +0800 [fix](vec) VMergeIterator add key same label for agg table (#14722) --- be/src/olap/olap_define.h | 2 +- be/src/vec/core/block.cpp | 13 +++++++++++++ be/src/vec/core/block.h | 15 +++++++++++++++ be/src/vec/olap/block_reader.cpp | 11 ++++++++++- be/src/vec/olap/block_reader.h | 2 ++ be/src/vec/olap/vgeneric_iterators.cpp | 9 ++++++++- be/src/vec/olap/vgeneric_iterators.h | 18 ++++++++++++++++++ be/test/olap/segcompaction_test.cpp | 1 + 8 files changed, 68 insertions(+), 3 deletions(-) diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index 40a6dd9dee..a9c92c69e2 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -212,4 +212,4 @@ private: \ #define BUILD_VERSION "Unknown" #endif -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 571a391eb0..817a2ac643 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -198,6 +198,9 @@ void Block::erase_impl(size_t position) { ++it; } } + if (position < row_same_bit.size()) { + row_same_bit.erase(row_same_bit.begin() + position); + } } void Block::erase(const String& name) { @@ -339,6 +342,9 @@ void Block::set_num_rows(size_t length) { elem.column = elem.column->cut(0, length); } } + if (length < row_same_bit.size()) { + row_same_bit.resize(length); + } } } @@ -353,6 +359,9 @@ void Block::skip_num_rows(int64_t& length) { elem.column = elem.column->cut(length, origin_rows - length); } } + if (length < row_same_bit.size()) { + row_same_bit.assign(row_same_bit.begin() + length, row_same_bit.end()); + } } } @@ -593,6 +602,7 @@ DataTypes Block::get_data_types() const { void Block::clear() { data.clear(); index_by_name.clear(); + row_same_bit.clear(); } void Block::clear_column_data(int column_size) noexcept { @@ -607,17 +617,20 @@ void Block::clear_column_data(int column_size) noexcept { DCHECK_EQ(d.column->use_count(), 1); (*std::move(d.column)).assume_mutable()->clear(); } + row_same_bit.clear(); } void Block::swap(Block& other) noexcept { data.swap(other.data); index_by_name.swap(other.index_by_name); + row_same_bit.swap(other.row_same_bit); } void Block::swap(Block&& other) noexcept { clear(); data = std::move(other.data); initialize_index_by_name(); + row_same_bit = std::move(other.row_same_bit); } void Block::update_hash(SipHash& hash) const { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 803fb82f97..b4a73a896f 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -62,6 +62,7 @@ private: Container data; IndexByName index_by_name; + std::vector<bool> row_same_bit; int64_t _decompress_time_ns = 0; int64_t _decompressed_bytes = 0; @@ -349,6 +350,20 @@ public: int64_t get_decompressed_bytes() const { return _decompressed_bytes; } int64_t get_compress_time() const { return _compress_time_ns; } + void set_same_bit(std::vector<bool>::const_iterator begin, + std::vector<bool>::const_iterator end) { + row_same_bit.insert(row_same_bit.end(), begin, end); + + DCHECK_EQ(row_same_bit.size(), rows()); + } + + bool get_same_bit(size_t position) { + if (position >= row_same_bit.size()) { + return false; + } + return row_same_bit[position]; + } + private: void erase_impl(size_t position); void initialize_index_by_name(); diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index be064d26ce..43336b1861 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -213,7 +213,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP return res; } - if (!_next_row.is_same) { + if (!_get_next_row_same()) { if (target_block_row == _batch_size) { break; } @@ -413,6 +413,15 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, } } +bool BlockReader::_get_next_row_same() { + if (_next_row.is_same) { + return true; + } else { + auto block = _next_row.block.get(); + return block->get_same_bit(_next_row.row_pos); + } +} + ColumnPredicate* BlockReader::_parse_to_predicate(const FunctionFilter& function_filter) { int32_t index = _tablet_schema->field_index(function_filter._col_name); if (index < 0) { diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 18e3e0e232..f727930022 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -79,6 +79,8 @@ private: void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true); + bool _get_next_row_same(); + VCollectIterator _vcollect_iter; IteratorRowRef _next_row {{}, -1, false}; diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 7997a3d43d..2d6d54f762 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -124,6 +124,7 @@ bool VMergeIteratorContext::compare(const VMergeIteratorContext& rhs) const { if (_is_unique) { result ? set_skip(true) : rhs.set_skip(true); } + result ? set_same(true) : rhs.set_same(true); return result; } @@ -148,6 +149,8 @@ void VMergeIteratorContext::copy_rows(Block* block, bool advanced) { d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num); } + const auto& tmp_pre_ctx_same_bit = get_pre_ctx_same(); + dst.set_same_bit(tmp_pre_ctx_same_bit.begin(), tmp_pre_ctx_same_bit.begin() + _cur_batch_num); _cur_batch_num = 0; } @@ -158,8 +161,9 @@ void VMergeIteratorContext::copy_rows(BlockView* view, bool advanced) { size_t start = _index_in_block - _cur_batch_num + 1 - advanced; DCHECK(start >= 0); + const auto& tmp_pre_ctx_same_bit = get_pre_ctx_same(); for (size_t i = 0; i < _cur_batch_num; ++i) { - view->push_back({_block, static_cast<int>(start + i), false}); + view->push_back({_block, static_cast<int>(start + i), tmp_pre_ctx_same_bit[i]}); } _cur_batch_num = 0; @@ -255,11 +259,14 @@ Status VMergeIteratorContext::init(const StorageReadOptions& opts) { if (valid()) { RETURN_IF_ERROR(advance()); } + _pre_ctx_same_bit.reserve(_block_row_max); + _pre_ctx_same_bit.assign(_block_row_max, false); return Status::OK(); } Status VMergeIteratorContext::advance() { _skip = false; + _same = false; // NOTE: we increase _index_in_block directly to valid one check do { _index_in_block++; diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index 210f47022a..e5bb36d0d3 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -120,6 +120,21 @@ public: void set_skip(bool skip) const { _skip = skip; } + bool is_same() const { return _same; } + + void set_same(bool same) const { _same = same; } + + const std::vector<bool>& get_pre_ctx_same() const { return _pre_ctx_same_bit; } + + void set_pre_ctx_same(VMergeIteratorContext* ctx) const { + int64_t index = ctx->get_cur_batch() - 1; + DCHECK(index >= 0); + DCHECK_LT(index, _pre_ctx_same_bit.size()); + _pre_ctx_same_bit[index] = ctx->is_same(); + } + + size_t get_cur_batch() const { return _cur_batch_num; } + void add_cur_batch() { _cur_batch_num++; } void reset_cur_batch() { _cur_batch_num = 0; } @@ -137,6 +152,7 @@ private: bool _is_reverse = false; bool _valid = false; mutable bool _skip = false; + mutable bool _same = false; size_t _index_in_block = -1; // 4096 minus 16 + 16 bytes padding that in padding pod array int _block_row_max = 4064; @@ -151,6 +167,7 @@ private: std::shared_ptr<Block> _block; // used to store data still on block view std::list<std::shared_ptr<Block>> _block_list; + mutable std::vector<bool> _pre_ctx_same_bit; }; class VMergeIterator : public RowwiseIterator { @@ -221,6 +238,7 @@ private: } pre_ctx = ctx; } + pre_ctx->set_pre_ctx_same(ctx); if (UNLIKELY(_record_rowids)) { _block_row_locations[row_idx] = ctx->current_row_location(); } diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index 5ac39db339..78fff2dd6e 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -310,6 +310,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { } s = rowset_writer->flush(); EXPECT_EQ(Status::OK(), s); + sleep(1); } num_segments = 1; rows_per_segment = 6400; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org