github-actions[bot] commented on code in PR #23443:
URL: https://github.com/apache/doris/pull/23443#discussion_r1360065667


##########
be/src/olap/tablet.cpp:
##########
@@ -3820,4 +3831,194 @@ Status 
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
     }
     return Status::OK();
 }
+
+Status Tablet::check_primary_keys_consistency(
+        const PartialUpdateReadPlan* read_plan,

Review Comment:
   warning: method 'check_primary_keys_consistency' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/tablet.cpp:3834:
   ```diff
   -   }
   +   }static 
   ```
   



##########
be/src/olap/check_primary_keys_executor.cpp:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/check_primary_keys_executor.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "olap/tablet.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+Status CheckPrimaryKeysToken::submit(Tablet* tablet, const 
PartialUpdateReadPlan* read_plan,
+                                     const std::map<RowsetId, 
RowsetSharedPtr>* rsid_to_rowset,
+                                     std::unordered_map<uint32_t, 
std::string>* pk_entries,
+                                     bool with_seq_col) {
+    {
+        std::shared_lock rlock(_mutex);
+        RETURN_IF_ERROR(_status);
+    }
+    return _thread_token->submit_func([=, this]() {
+        auto st = tablet->check_primary_keys_consistency(read_plan, 
rsid_to_rowset, pk_entries,
+                                                         with_seq_col);
+        if (!st.ok()) {
+            std::lock_guard wlock(_mutex);
+            if (_status.ok()) {
+                _status = st;
+            }
+        }
+    });
+}
+
+Status CheckPrimaryKeysToken::submit(Tablet* tablet, const 
PartialUpdateReadPlan* read_plan,

Review Comment:
   warning: method 'submit' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status CheckPrimaryKeysToken::submit(Tablet* tablet, const 
PartialUpdateReadPlan* read_plan,
   ```
   



##########
be/src/olap/tablet.cpp:
##########
@@ -3820,4 +3831,194 @@ Status 
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
     }
     return Status::OK();
 }
+
+Status Tablet::check_primary_keys_consistency(
+        const PartialUpdateReadPlan* read_plan,
+        const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
+        std::unordered_map<uint32_t, std::string>* pk_entries, bool 
with_seq_col) {
+    size_t count = 0;
+    for (auto& [rowset_id, segment_read_info] : *read_plan) {
+        for (auto& [segment_id, rows_info] : segment_read_info) {
+            auto rowset_iter = rsid_to_rowset->find(rowset_id);
+            CHECK(rowset_iter != rsid_to_rowset->end());
+            BetaRowsetSharedPtr rowset = 
std::static_pointer_cast<BetaRowset>(rowset_iter->second);
+            CHECK(rowset);
+            const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
+            SegmentCacheHandle segment_cache;
+            RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true));
+            auto it = std::find_if(segment_cache.get_segments().cbegin(),
+                                   segment_cache.get_segments().cend(),
+                                   [segment_id](const 
segment_v2::SegmentSharedPtr& seg) {
+                                       return seg->id() == segment_id;
+                                   });
+            if (it == segment_cache.get_segments().end()) {
+                return Status::NotFound(fmt::format("rowset {} 's segemnt not 
found, seg_id {}",
+                                                    
rowset->rowset_id().to_string(), segment_id));
+            }
+
+            segment_v2::SegmentSharedPtr segment = *it;
+            LOG(INFO) << fmt::format(
+                    
"[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]",
+                    segment->rowset_id().to_string(), segment->id());
+            RETURN_IF_ERROR(segment->load_index());
+            auto pk_index = segment->get_primary_key_index();
+            std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
+            RETURN_IF_ERROR(pk_index->new_iterator(&iter));
+            auto index_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                    pk_index->type_info()->type(), 1, 0);
+            auto index_column = index_type->create_column();
+
+            size_t idx = 0;
+            for (auto [rowid, pos] : rows_info) {
+                RETURN_IF_ERROR(iter->seek_to_ordinal(rowid));
+                size_t num_read = 1;
+                RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
+                CHECK(num_read == 1);
+                std::string prev_pk_entry = 
index_column->get_data_at(idx++).to_string();
+                std::string cur_pk_entry = pk_entries->at(pos);
+                Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size());
+                Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size());
+                int result = 0;
+                // always ignore the seq col
+                if (tablet_schema->has_sequence_col()) {
+                    auto seq_col_length =
+                            
tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1;
+                    key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - 
seq_col_length);
+                    if (with_seq_col) {
+                        key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size() 
- seq_col_length);
+                    }
+                }
+                result = key1.compare(key2);
+                if (result != 0) {
+                    LOG(WARNING) << fmt::format(
+                            "check primary keys consistency failed, pk at pos 
{} in current "
+                            "block is {}, but in previous conflict segment is 
{}!",
+                            pos, key2.to_string(), key1.to_string());
+                    return Status::InternalError("check primary keys 
consistency failed");
+                }
+            }
+            count += rows_info.size();
+        }
+    }
+    if (count != pk_entries->size()) {
+        return Status::InternalError(
+                "check primary keys consistency failed, pk_entries.size():{}, 
but number of keys "
+                "in read plan is {}",
+                pk_entries->size(), count);
+    }
+    return Status::OK();
+}
+
+Status Tablet::check_primary_keys_consistency(
+        const PartialUpdateReadPlan* read_plan,

Review Comment:
   warning: method 'check_primary_keys_consistency' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/tablet.cpp:3911:
   ```diff
   -   }
   +   }static 
   ```
   



##########
be/src/olap/tablet.cpp:
##########
@@ -3820,4 +3831,194 @@ Status 
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
     }
     return Status::OK();
 }
+
+Status Tablet::check_primary_keys_consistency(
+        const PartialUpdateReadPlan* read_plan,
+        const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
+        std::unordered_map<uint32_t, std::string>* pk_entries, bool 
with_seq_col) {
+    size_t count = 0;
+    for (auto& [rowset_id, segment_read_info] : *read_plan) {
+        for (auto& [segment_id, rows_info] : segment_read_info) {
+            auto rowset_iter = rsid_to_rowset->find(rowset_id);
+            CHECK(rowset_iter != rsid_to_rowset->end());
+            BetaRowsetSharedPtr rowset = 
std::static_pointer_cast<BetaRowset>(rowset_iter->second);
+            CHECK(rowset);
+            const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
+            SegmentCacheHandle segment_cache;
+            RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true));
+            auto it = std::find_if(segment_cache.get_segments().cbegin(),
+                                   segment_cache.get_segments().cend(),
+                                   [segment_id](const 
segment_v2::SegmentSharedPtr& seg) {
+                                       return seg->id() == segment_id;
+                                   });
+            if (it == segment_cache.get_segments().end()) {
+                return Status::NotFound(fmt::format("rowset {} 's segemnt not 
found, seg_id {}",
+                                                    
rowset->rowset_id().to_string(), segment_id));
+            }
+
+            segment_v2::SegmentSharedPtr segment = *it;
+            LOG(INFO) << fmt::format(
+                    
"[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]",
+                    segment->rowset_id().to_string(), segment->id());
+            RETURN_IF_ERROR(segment->load_index());
+            auto pk_index = segment->get_primary_key_index();
+            std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
+            RETURN_IF_ERROR(pk_index->new_iterator(&iter));
+            auto index_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                    pk_index->type_info()->type(), 1, 0);
+            auto index_column = index_type->create_column();
+
+            size_t idx = 0;
+            for (auto [rowid, pos] : rows_info) {
+                RETURN_IF_ERROR(iter->seek_to_ordinal(rowid));
+                size_t num_read = 1;
+                RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
+                CHECK(num_read == 1);
+                std::string prev_pk_entry = 
index_column->get_data_at(idx++).to_string();
+                std::string cur_pk_entry = pk_entries->at(pos);
+                Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size());
+                Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size());
+                int result = 0;
+                // always ignore the seq col
+                if (tablet_schema->has_sequence_col()) {
+                    auto seq_col_length =
+                            
tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1;
+                    key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - 
seq_col_length);
+                    if (with_seq_col) {
+                        key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size() 
- seq_col_length);
+                    }
+                }
+                result = key1.compare(key2);
+                if (result != 0) {
+                    LOG(WARNING) << fmt::format(
+                            "check primary keys consistency failed, pk at pos 
{} in current "
+                            "block is {}, but in previous conflict segment is 
{}!",
+                            pos, key2.to_string(), key1.to_string());
+                    return Status::InternalError("check primary keys 
consistency failed");
+                }
+            }
+            count += rows_info.size();
+        }
+    }
+    if (count != pk_entries->size()) {
+        return Status::InternalError(
+                "check primary keys consistency failed, pk_entries.size():{}, 
but number of keys "
+                "in read plan is {}",
+                pk_entries->size(), count);
+    }
+    return Status::OK();
+}
+
+Status Tablet::check_primary_keys_consistency(
+        const PartialUpdateReadPlan* read_plan,
+        const std::map<RowsetId, RowsetSharedPtr>* rsid_to_rowset,
+        segment_v2::SegmentWriter* segment_writer,
+        std::vector<vectorized::IOlapColumnDataAccessor*>* key_columns, 
uint32_t row_pos) {
+    for (auto& [rowset_id, segment_read_info] : *read_plan) {
+        for (auto& [segment_id, rows_info] : segment_read_info) {
+            auto rowset_iter = rsid_to_rowset->find(rowset_id);
+            CHECK(rowset_iter != rsid_to_rowset->end());
+            BetaRowsetSharedPtr rowset = 
std::static_pointer_cast<BetaRowset>(rowset_iter->second);
+            CHECK(rowset);
+            const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
+            SegmentCacheHandle segment_cache;
+            RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true));
+            auto it = std::find_if(segment_cache.get_segments().cbegin(),
+                                   segment_cache.get_segments().cend(),
+                                   [segment_id](const 
segment_v2::SegmentSharedPtr& seg) {
+                                       return seg->id() == segment_id;
+                                   });
+            if (it == segment_cache.get_segments().end()) {
+                return Status::NotFound(fmt::format("rowset {} 's segemnt not 
found, seg_id {}",
+                                                    
rowset->rowset_id().to_string(), segment_id));
+            }
+
+            segment_v2::SegmentSharedPtr segment = *it;
+            LOG(INFO) << fmt::format(
+                    
"[Tablet::check_primary_keys_consistency][rowset_id:{}][segment_id:{}]",
+                    segment->rowset_id().to_string(), segment->id());
+            RETURN_IF_ERROR(segment->load_index());
+            auto pk_index = segment->get_primary_key_index();
+            std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
+            RETURN_IF_ERROR(pk_index->new_iterator(&iter));
+            auto index_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                    pk_index->type_info()->type(), 1, 0);
+            auto index_column = index_type->create_column();
+
+            size_t idx = 0;
+            for (auto [rowid, pos] : rows_info) {
+                RETURN_IF_ERROR(iter->seek_to_ordinal(rowid));
+                size_t num_read = 1;
+                RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
+                CHECK(num_read == 1);
+                std::string prev_pk_entry = 
index_column->get_data_at(idx++).to_string();
+                std::string cur_pk_entry =
+                        segment_writer->full_encode_keys(*key_columns, pos - 
row_pos);
+                Slice key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size());
+                Slice key2 = Slice(cur_pk_entry.data(), cur_pk_entry.size());
+                int result = 0;
+                // always ignore the seq col
+                if (tablet_schema->has_sequence_col()) {
+                    auto seq_col_length =
+                            
tablet_schema->column(tablet_schema->sequence_col_idx()).length() + 1;
+                    key1 = Slice(prev_pk_entry.data(), prev_pk_entry.size() - 
seq_col_length);
+                }
+                result = key1.compare(key2);
+                if (result != 0) {
+                    LOG(WARNING) << fmt::format(
+                            "check primary keys consistency failed, pk at pos 
{} in current "
+                            "block is {}, but in previous conflict segment is 
{}!",
+                            pos, key2.to_string(), key1.to_string());
+                    return Status::InternalError("check primary keys 
consistency failed");
+                }
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status Tablet::fetch_pk_entries(const PartialUpdateReadPlan* read_plan,

Review Comment:
   warning: method 'fetch_pk_entries' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
     }static 
   ```
   



##########
be/src/olap/check_primary_keys_executor.cpp:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/check_primary_keys_executor.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "olap/tablet.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+Status CheckPrimaryKeysToken::submit(Tablet* tablet, const 
PartialUpdateReadPlan* read_plan,

Review Comment:
   warning: method 'submit' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status CheckPrimaryKeysToken::submit(Tablet* tablet, const 
PartialUpdateReadPlan* read_plan,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to