liaoxin01 commented on code in PR #20153:
URL: https://github.com/apache/doris/pull/20153#discussion_r1211026968


##########
be/src/olap/tablet.cpp:
##########
@@ -3264,4 +3240,188 @@ bool Tablet::is_enable_binlog() {
 void Tablet::set_binlog_config(BinlogConfig binlog_config) {
     tablet_meta()->set_binlog_config(std::move(binlog_config));
 }
+
+// caller should hold meta_lock
+Status Tablet::calc_delete_bitmap_between_segments(
+        RowsetSharedPtr rowset, const 
std::vector<segment_v2::SegmentSharedPtr>& segments,
+        DeleteBitmapPtr delete_bitmap) {
+    size_t const num_segments = segments.size();
+    if (num_segments < 2) {
+        return Status::OK();
+    }
+
+    OlapStopWatch watch;
+    auto const rowset_id = rowset->rowset_id();
+    std::vector<MergeIndexDeleteBitmapCalculatorContext> contexts;
+    for (auto& segment : segments) {
+        RETURN_IF_ERROR(segment->load_index());
+        auto pk_idx = segment->get_primary_key_index();
+        std::unique_ptr<segment_v2::IndexedColumnIterator> index;
+        pk_idx->new_iterator(&index);
+        auto index_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                pk_idx->type_info()->type(), 1, 0);
+        contexts.emplace_back(std::move(index), index_type, segment->id(), 
pk_idx->num_rows());
+    }
+
+    size_t seq_col_length = 0;
+    if (_schema->has_sequence_col()) {
+        auto seq_col_idx = _schema->sequence_col_idx();
+        seq_col_length = _schema->column(seq_col_idx).length();
+    }
+
+    MergeIndexDeleteBitmapCalculator calculator(contexts, seq_col_length);
+    RowLocation loc;
+    while (true) {
+        auto st = calculator.next(loc);
+        if (st.is<ErrorCode::END_OF_FILE>()) {
+            break;
+        }
+        RETURN_IF_ERROR(st);
+        delete_bitmap->add({rowset_id, loc.segment_id, 0}, loc.row_id);
+    }
+
+    LOG(INFO) << fmt::format(
+            "calculate delete bitmap between segments with merged pkindex in 
{} us",
+            watch.get_elapse_time_us());
+    return Status::OK();
+}
+
+Status MergeIndexDeleteBitmapCalculatorContext::get_current_key(Slice& slice) {
+    if (_cur_row_id >= _num_rows) {
+        return Status::EndOfFile("Reach the end of file");
+    }
+    if (_cur_pos >= _block_size) {
+        RETURN_IF_ERROR(_iter->seek_to_ordinal(_cur_row_id));
+        RETURN_IF_ERROR(_next_batch(_cur_row_id));
+    }
+    slice = Slice(_index_column->get_data_at(_cur_pos).data,
+                  _index_column->get_data_at(_cur_pos).size);
+    return Status::OK();
+}
+
+Status MergeIndexDeleteBitmapCalculatorContext::advance() {
+    ++_cur_pos;
+    ++_cur_row_id;
+    if (_cur_row_id >= _num_rows) {
+        return Status::EndOfFile("Reach the end of file");
+    }
+    return Status::OK();
+}
+
+Status MergeIndexDeleteBitmapCalculatorContext::seek_at_or_after(Slice const& 
key) {
+    auto st = _iter->seek_at_or_after(&key, &_excat_match);
+    if (st.is<ErrorCode::NOT_FOUND>()) {
+        return Status::EndOfFile("Reach the end of file");
+    }
+    RETURN_IF_ERROR(st);
+    auto current_ordinal = _iter->get_current_ordinal();
+    DCHECK(current_ordinal > _cur_row_id)
+            << fmt::format("current_ordinal: {} should be greater than 
_cur_row_id: {}",
+                           current_ordinal, _cur_row_id);
+    // if key is still in the block read before,
+    // in other words, if `_cur_pos + current_ordinal - _cur_row_id < 
_block_size` holds
+    // we can seek simply by moving the pointers, aka. _cur_pos and _cur_row_id
+    if (_cur_pos + current_ordinal - _cur_row_id < _block_size) {
+        _cur_pos = _cur_pos + current_ordinal - _cur_row_id;
+        _cur_row_id = current_ordinal;
+        return Status::OK();
+    }
+    // otherwise, we have to read the data starts from `current_ordinal`
+    return _next_batch(current_ordinal);
+}
+
+Status MergeIndexDeleteBitmapCalculatorContext::_next_batch(size_t row_id) {
+    // _iter should be seeked before calling this function
+    DCHECK(row_id < _num_rows) << fmt::format("row_id: {} should be less than 
_num_rows: {}",
+                                              row_id, _num_rows);
+    _index_column = _index_type->create_column();
+    auto remaining = _num_rows - row_id;
+    size_t num_to_read = std::min(_max_batch_size, remaining);
+    size_t num_read = num_to_read;
+    RETURN_IF_ERROR(_iter->next_batch(&num_read, _index_column));
+    DCHECK(num_to_read == num_read) << fmt::format(
+            "num_to_read: {} should be equal to num_to_read: {}", num_to_read, 
num_read);
+    _block_size = num_read;
+    _cur_pos = 0;
+    _cur_row_id = row_id;
+    return Status::OK();
+}
+
+bool MergeIndexDeleteBitmapCalculatorContext::Comparator::operator()(
+        MergeIndexDeleteBitmapCalculatorContext* lhs,
+        MergeIndexDeleteBitmapCalculatorContext* rhs) const {
+    // std::proiroty_queue is a max heap, and function should return the 
result of `lhs < rhs`
+    // so if the result of the function is true, rhs will be popped before lhs
+    Slice key1, key2;
+    RETURN_IF_ERROR(lhs->get_current_key(key1));
+    RETURN_IF_ERROR(rhs->get_current_key(key2));
+    if (_sequence_length == 0) {
+        auto cmp_result = key1.compare(key2);
+        // when key1 is the same as key2,
+        // we want the one with greater segment id to be popped first
+        return cmp_result ? (cmp_result > 0) : (lhs->segment_id() < 
rhs->segment_id());
+    }
+    // smaller key popped first
+    auto key1_without_seq = Slice(key1.get_data(), key1.get_size() - 
_sequence_length);
+    auto key2_without_seq = Slice(key2.get_data(), key2.get_size() - 
_sequence_length);
+    auto cmp_result = key1_without_seq.compare(key2_without_seq);
+    if (cmp_result != 0) {
+        return cmp_result > 0;
+    }
+    // greater sequence value popped first
+    auto key1_sequence_val =
+            Slice(key1.get_data() + key1.get_size() - _sequence_length, 
_sequence_length);
+    auto key2_sequence_val =
+            Slice(key2.get_data() + key2.get_size() - _sequence_length, 
_sequence_length);
+    cmp_result = key1_sequence_val.compare(key2_sequence_val);
+    if (cmp_result != 0) {
+        return cmp_result < 0;
+    }
+    // greater segment id popped first
+    return lhs->segment_id() < rhs->segment_id();
+}
+
+bool MergeIndexDeleteBitmapCalculatorContext::Comparator::is_key_same(Slice 
const& lhs,
+                                                                      Slice 
const& rhs) const {
+    auto lhs_without_seq = Slice(lhs.get_data(), lhs.get_size() - 
_sequence_length);
+    auto rhs_without_seq = Slice(rhs.get_data(), rhs.get_size() - 
_sequence_length);
+    return lhs_without_seq.compare(rhs_without_seq) == 0;
+}
+
+Status MergeIndexDeleteBitmapCalculator::next(RowLocation& loc) {
+    // get the position of a out-of-date row
+    while (!_heap.empty()) {
+        auto cur_ctx = _heap.top();
+        _heap.pop();
+        Slice cur_key;
+        RETURN_IF_ERROR(cur_ctx->get_current_key(cur_key));
+        if (_comparator.is_key_same(cur_key, _last_key)) {
+            loc.segment_id = cur_ctx->segment_id();
+            loc.row_id = cur_ctx->row_id();
+            auto st = cur_ctx->advance();
+            if (st.ok()) {
+                _heap.push(cur_ctx);
+            } else if (!st.is<ErrorCode::END_OF_FILE>()) {
+                return st;
+            }
+            return Status::OK();
+        }
+        if (_heap.empty()) {
+            break;
+        }
+        _last_key = cur_key.to_string();
+        auto nxt_ctx = _heap.top();
+        Slice nxt_key;
+        RETURN_IF_ERROR(nxt_ctx->get_current_key(nxt_key));
+        Status st = _comparator.is_key_same(cur_key, nxt_key) ? 
cur_ctx->advance()

Review Comment:
   need to add `cur_key` to delete bitmap?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to