github-actions[bot] commented on code in PR #23443: URL: https://github.com/apache/doris/pull/23443#discussion_r1360065667
########## be/src/olap/tablet.cpp: ########## @@ -3820,4 +3831,194 @@ Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in } return Status::OK(); } + +Status Tablet::check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, Review Comment: warning: method 'check_primary_keys_consistency' can be made static [readability-convert-member-functions-to-static] be/src/olap/tablet.cpp:3834: ```diff - } + }static ``` ########## be/src/olap/check_primary_keys_executor.cpp: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/check_primary_keys_executor.h" + +#include "common/config.h" +#include "common/logging.h" +#include "olap/tablet.h" + +namespace doris { +using namespace ErrorCode; + +Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, + const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset, + std::unordered_map<uint32_t, std::string>* pk_entries, + bool with_seq_col) { + { + std::shared_lock rlock(_mutex); + RETURN_IF_ERROR(_status); + } + return _thread_token->submit_func([=, this]() { + auto st = tablet->check_primary_keys_consistency(read_plan, rsid_to_rowset, pk_entries, + with_seq_col); + if (!st.ok()) { + std::lock_guard wlock(_mutex); + if (_status.ok()) { + _status = st; + } + } + }); +} + +Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, Review Comment: warning: method 'submit' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, ``` ########## be/src/olap/tablet.cpp: ########## @@ -3820,4 +3831,194 @@ Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in } return Status::OK(); } + +Status Tablet::check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, + const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset, + std::unordered_map<uint32_t, std::string>* pk_entries, bool with_seq_col) { + size_t count = 0; + for (auto& [rowset_id, segment_read_info] : *read_plan) { + for (auto& [segment_id, rows_info] : segment_read_info) { + auto rowset_iter = rsid_to_rowset->find(rowset_id); + CHECK(rowset_iter != rsid_to_rowset->end()); + BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(rowset_iter->second); + CHECK(rowset); + const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + auto it = std::find_if(segment_cache.get_segments().cbegin(), + segment_cache.get_segments().cend(), + [segment_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == segment_id; + }); + if (it == segment_cache.get_segments().end()) { + return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", + rowset->rowset_id().to_string(), segment_id)); + } + + segment_v2::SegmentSharedPtr segment = *it; + LOG(INFO) << fmt::format( + "[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]", + segment->rowset_id().to_string(), segment->id()); + RETURN_IF_ERROR(segment->load_index()); + auto pk_index = segment->get_primary_key_index(); + std::unique_ptr<segment_v2::IndexedColumnIterator> iter; + RETURN_IF_ERROR(pk_index->new_iterator(&iter)); + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + pk_index->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); + + size_t idx = 0; + for (auto [rowid, pos] : rows_info) { + RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); + size_t num_read = 1; + RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); + CHECK(num_read == 1); + std::string prev_pk_entry = index_column->get_data_at(idx++).to_string(); + std::string cur_pk_entry = pk_entries->at(pos); + Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size()); + Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size()); + int result = 0; + // always ignore the seq col + if (tablet_schema->has_sequence_col()) { + auto seq_col_length = + tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1; + key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - seq_col_length); + if (with_seq_col) { + key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size() - seq_col_length); + } + } + result = key1.compare(key2); + if (result != 0) { + LOG(WARNING) << fmt::format( + "check primary keys consistency failed, pk at pos {} in current " + "block is {}, but in previous conflict segment is {}!", + pos, key2.to_string(), key1.to_string()); + return Status::InternalError("check primary keys consistency failed"); + } + } + count += rows_info.size(); + } + } + if (count != pk_entries->size()) { + return Status::InternalError( + "check primary keys consistency failed, pk_entries.size():{}, but number of keys " + "in read plan is {}", + pk_entries->size(), count); + } + return Status::OK(); +} + +Status Tablet::check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, Review Comment: warning: method 'check_primary_keys_consistency' can be made static [readability-convert-member-functions-to-static] be/src/olap/tablet.cpp:3911: ```diff - } + }static ``` ########## be/src/olap/tablet.cpp: ########## @@ -3820,4 +3831,194 @@ Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in } return Status::OK(); } + +Status Tablet::check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, + const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset, + std::unordered_map<uint32_t, std::string>* pk_entries, bool with_seq_col) { + size_t count = 0; + for (auto& [rowset_id, segment_read_info] : *read_plan) { + for (auto& [segment_id, rows_info] : segment_read_info) { + auto rowset_iter = rsid_to_rowset->find(rowset_id); + CHECK(rowset_iter != rsid_to_rowset->end()); + BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(rowset_iter->second); + CHECK(rowset); + const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + auto it = std::find_if(segment_cache.get_segments().cbegin(), + segment_cache.get_segments().cend(), + [segment_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == segment_id; + }); + if (it == segment_cache.get_segments().end()) { + return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", + rowset->rowset_id().to_string(), segment_id)); + } + + segment_v2::SegmentSharedPtr segment = *it; + LOG(INFO) << fmt::format( + "[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]", + segment->rowset_id().to_string(), segment->id()); + RETURN_IF_ERROR(segment->load_index()); + auto pk_index = segment->get_primary_key_index(); + std::unique_ptr<segment_v2::IndexedColumnIterator> iter; + RETURN_IF_ERROR(pk_index->new_iterator(&iter)); + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + pk_index->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); + + size_t idx = 0; + for (auto [rowid, pos] : rows_info) { + RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); + size_t num_read = 1; + RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); + CHECK(num_read == 1); + std::string prev_pk_entry = index_column->get_data_at(idx++).to_string(); + std::string cur_pk_entry = pk_entries->at(pos); + Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size()); + Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size()); + int result = 0; + // always ignore the seq col + if (tablet_schema->has_sequence_col()) { + auto seq_col_length = + tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1; + key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - seq_col_length); + if (with_seq_col) { + key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size() - seq_col_length); + } + } + result = key1.compare(key2); + if (result != 0) { + LOG(WARNING) << fmt::format( + "check primary keys consistency failed, pk at pos {} in current " + "block is {}, but in previous conflict segment is {}!", + pos, key2.to_string(), key1.to_string()); + return Status::InternalError("check primary keys consistency failed"); + } + } + count += rows_info.size(); + } + } + if (count != pk_entries->size()) { + return Status::InternalError( + "check primary keys consistency failed, pk_entries.size():{}, but number of keys " + "in read plan is {}", + pk_entries->size(), count); + } + return Status::OK(); +} + +Status Tablet::check_primary_keys_consistency( + const PartialUpdateReadPlan* read_plan, + const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset, + segment_v2::SegmentWriter* segment_writer, + std::vector<vectorized::IOlapColumnDataAccessor*>* key_columns, uint32_t row_pos) { + for (auto& [rowset_id, segment_read_info] : *read_plan) { + for (auto& [segment_id, rows_info] : segment_read_info) { + auto rowset_iter = rsid_to_rowset->find(rowset_id); + CHECK(rowset_iter != rsid_to_rowset->end()); + BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(rowset_iter->second); + CHECK(rowset); + const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); + auto it = std::find_if(segment_cache.get_segments().cbegin(), + segment_cache.get_segments().cend(), + [segment_id](const segment_v2::SegmentSharedPtr& seg) { + return seg->id() == segment_id; + }); + if (it == segment_cache.get_segments().end()) { + return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", + rowset->rowset_id().to_string(), segment_id)); + } + + segment_v2::SegmentSharedPtr segment = *it; + LOG(INFO) << fmt::format( + "[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]", + segment->rowset_id().to_string(), segment->id()); + RETURN_IF_ERROR(segment->load_index()); + auto pk_index = segment->get_primary_key_index(); + std::unique_ptr<segment_v2::IndexedColumnIterator> iter; + RETURN_IF_ERROR(pk_index->new_iterator(&iter)); + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + pk_index->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); + + size_t idx = 0; + for (auto [rowid, pos] : rows_info) { + RETURN_IF_ERROR(iter->seek_to_ordinal(rowid)); + size_t num_read = 1; + RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); + CHECK(num_read == 1); + std::string prev_pk_entry = index_column->get_data_at(idx++).to_string(); + std::string cur_pk_entry = + segment_writer->full_encode_keys(*key_columns, pos - row_pos); + Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size()); + Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size()); + int result = 0; + // always ignore the seq col + if (tablet_schema->has_sequence_col()) { + auto seq_col_length = + tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1; + key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - seq_col_length); + } + result = key1.compare(key2); + if (result != 0) { + LOG(WARNING) << fmt::format( + "check primary keys consistency failed, pk at pos {} in current " + "block is {}, but in previous conflict segment is {}!", + pos, key2.to_string(), key1.to_string()); + return Status::InternalError("check primary keys consistency failed"); + } + } + } + } + return Status::OK(); +} + +Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan, Review Comment: warning: method 'fetch_pk_entries' can be made static [readability-convert-member-functions-to-static] ```suggestion }static ``` ########## be/src/olap/check_primary_keys_executor.cpp: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/check_primary_keys_executor.h" + +#include "common/config.h" +#include "common/logging.h" +#include "olap/tablet.h" + +namespace doris { +using namespace ErrorCode; + +Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, Review Comment: warning: method 'submit' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status CheckPrimaryKeysToken::submit(Tablet* tablet, const PartialUpdateReadPlan* read_plan, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org