This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push: new fbd1f1b9f7 [Bugfix](compaction) fix uniq key compaction bug (#10946) fbd1f1b9f7 is described below commit fbd1f1b9f747b2777a156e15b43cd1a8d1616e98 Author: yixiutt <102007456+yixi...@users.noreply.github.com> AuthorDate: Mon Jul 18 14:40:37 2022 +0800 [Bugfix](compaction) fix uniq key compaction bug (#10946) One rowset multi segments in uniq key compaction, segments rows will be merged in generic_iterator but merged_rows not increased。 Compaction will failed in check_correctness, and make a tablet with too much versions which lead to -235 load error. Co-authored-by: yixiutt <yi...@selectdb.com> --- be/src/olap/generic_iterators.cpp | 11 +++++++---- be/src/olap/generic_iterators.h | 2 +- be/src/olap/reader.cpp | 1 + be/src/olap/rowset/beta_rowset_reader.cpp | 6 +++--- be/src/olap/rowset/rowset_reader_context.h | 1 + be/src/vec/olap/vgeneric_iterators.cpp | 8 +++++++- be/src/vec/olap/vgeneric_iterators.h | 1 + 7 files changed, 21 insertions(+), 9 deletions(-) diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp index ba4cc5559d..2765096cb1 100644 --- a/be/src/olap/generic_iterators.cpp +++ b/be/src/olap/generic_iterators.cpp @@ -213,8 +213,8 @@ Status MergeIteratorContext::_load_next_block() { class MergeIterator : public RowwiseIterator { public: // MergeIterator takes the ownership of input iterators - MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) - : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) { + MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows) + : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merged_rows(merged_rows), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) { // use for count the mem use of Block use in Merge _mem_tracker = MemTracker::CreateTracker(-1, "MergeIterator", std::move(parent), false); } @@ -241,6 +241,7 @@ private: bool _is_unique; std::unique_ptr<Schema> _schema; + uint64_t* _merged_rows; struct MergeContextComparator { MergeContextComparator(int idx, bool is_unique) @@ -314,6 +315,8 @@ Status MergeIterator::next_batch(RowBlockV2* block) { RowBlockRow dst_row = block->row(row_idx++); // copy current row to block copy_row(&dst_row, ctx->current_row(), block->pool()); + } else if (_merged_rows != nullptr) { + (*_merged_rows)++; } RETURN_IF_ERROR(ctx->advance()); @@ -390,11 +393,11 @@ Status UnionIterator::next_batch(RowBlockV2* block) { return Status::EndOfFile("End of UnionIterator"); } -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) { +RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows) { if (inputs.size() == 1) { return *(inputs.begin()); } - return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique); + return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique, merged_rows); } RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) { diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h index 1a6ca90030..9cf40ad2fb 100644 --- a/be/src/olap/generic_iterators.h +++ b/be/src/olap/generic_iterators.h @@ -25,7 +25,7 @@ namespace doris { // // Inputs iterators' ownership is taken by created merge iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique); +RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows); // Create a union iterator for input iterators. Union iterator will read // input iterators one by one. diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 819e84a9a1..9b0b7ceec0 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -226,6 +226,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS; *valid_rs_readers = *rs_readers; + _reader_context.merged_rows = &_merged_rows; return OLAP_SUCCESS; } diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 2d8f79a8da..69127afe7a 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -120,14 +120,14 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { final_iterator = vectorized::new_merge_iterator( - iterators, _parent_tracker, read_context->sequence_id_idx, - read_context->is_unique, read_context->tablet_columns_convert_to_null_set); + iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique, + read_context->merged_rows, read_context->tablet_columns_convert_to_null_set); } else { final_iterator = vectorized::new_union_iterator(iterators, _parent_tracker); } } else { if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { - final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique); + final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique, read_context->merged_rows); } else { final_iterator = new_union_iterator(iterators, _parent_tracker); } diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 0bdb903e2e..374e5da857 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -64,6 +64,7 @@ struct RowsetReaderContext { int batch_size = 1024; bool is_vec = false; bool is_unique = false; + uint64_t* merged_rows = nullptr; // need pass this info to VMergeIterator std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr; diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 13b4acf30c..82faf8c14e 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -294,10 +294,12 @@ public: // VMergeIterator takes the ownership of input iterators VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, + uint64_t* merged_rows, const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set) : _origin_iters(iters), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), + _merged_rows(merged_rows), _tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) { // use for count the mem use of Block use in Merge _mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, false); @@ -338,6 +340,7 @@ private: int block_row_max = 0; int _sequence_id_idx = -1; bool _is_unique = false; + uint64_t* _merged_rows; const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr; }; @@ -374,6 +377,8 @@ Status VMergeIterator::next_batch(vectorized::Block* block) { if (!ctx->need_skip()) { // copy current row to block ctx->copy_row(block); + } else if(_merged_rows != nullptr){ + (*_merged_rows)++; } RETURN_IF_ERROR(ctx->advance()); @@ -453,11 +458,12 @@ Status VUnionIterator::next_batch(vectorized::Block* block) { RowwiseIterator* new_merge_iterator( std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, + uint64_t* merged_rows, const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set) { if (inputs.size() == 1) { return *(inputs.begin()); } - return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique, + return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique, merged_rows, tablet_columns_convert_to_null_set); } diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index 0b9af85ef5..d1ebc0cd25 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -29,6 +29,7 @@ namespace vectorized { // should delete returned iterator after usage. RowwiseIterator* new_merge_iterator( std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, + uint64_t* merged_rows, const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr); // Create a union iterator for input iterators. Union iterator will read --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org