github-actions[bot] commented on code in PR #30045:
URL: https://github.com/apache/doris/pull/30045#discussion_r1454779916


##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -0,0 +1,454 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/encodings.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "io/cache/block/block_file_cache_factory.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_desc.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr 
tablet_meta)
+        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
+
+CloudTablet::~CloudTablet() = default;
+
+bool CloudTablet::exceed_version_limit(int32_t limit) {
+    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
+}
+
+Status CloudTablet::capture_rs_readers(const Version& spec_version,
+                                       std::vector<RowSetSplits>* rs_splits,
+                                       bool skip_missing_version) {
+    Versions version_path;
+    std::shared_lock rlock(_meta_lock);
+    auto st = 
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
&version_path);
+    if (!st.ok()) {
+        rlock.unlock(); // avoid logging in lock range
+        // Check no missed versions or req version is merged
+        auto missed_versions = calc_missed_versions(spec_version.second);
+        if (missed_versions.empty()) {
+            st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+        }
+        st.append(" tablet_id=" + std::to_string(tablet_id()));
+        // clang-format off
+        LOG(WARNING) << st << '\n' << [this]() { std::string json; 
get_compaction_status(&json); return json; }();
+        // clang-format on
+        return st;
+    }
+    VLOG_DEBUG << "capture consitent versions: " << version_path;
+    return capture_rs_readers_unlocked(version_path, rs_splits);
+}
+
+// for example:
+//     [0-4][5-5][8-8][9-9][13-13]
+// if spec_version = 12, it will return [6-7],[10-12]
+Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
+    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+    Versions missed_versions;
+    Versions existing_versions;
+    {
+        std::shared_lock rdlock(_meta_lock);
+        for (const auto& rs : _tablet_meta->all_rs_metas()) {
+            existing_versions.emplace_back(rs->version());
+        }
+    }
+
+    // sort the existing versions in ascending order
+    std::sort(existing_versions.begin(), existing_versions.end(),
+              [](const Version& a, const Version& b) {
+                  // simple because 2 versions are certainly not overlapping
+                  return a.first < b.first;
+              });
+
+    auto min_version = existing_versions.front().first;
+    if (min_version > 0) {
+        missed_versions.emplace_back(0, std::min(spec_version, min_version - 
1));
+    }
+    for (auto it = existing_versions.begin(); it != existing_versions.end() - 
1; ++it) {
+        auto prev_v = it->second;
+        if (prev_v >= spec_version) {
+            return missed_versions;
+        }
+        auto next_v = (it + 1)->first;
+        if (next_v > prev_v + 1) {
+            // there is a hole between versions
+            missed_versions.emplace_back(prev_v + 1, std::min(spec_version, 
next_v - 1));
+        }
+    }
+    auto max_version = existing_versions.back().second;
+    if (max_version < spec_version) {
+        missed_versions.emplace_back(max_version + 1, spec_version);
+    }
+    return missed_versions;
+}
+
+Status CloudTablet::sync_meta() {
+    // TODO(lightman): FileCache
+    return Status::NotSupported("CloudTablet::sync_meta is not implemented");
+}
+
+// There are only two tablet_states RUNNING and NOT_READY in cloud mode
+// This function will erase the tablet from `CloudTabletMgr` when it can't 
find this tablet in MS.
+Status CloudTablet::sync_rowsets(int64_t query_version, bool 
warmup_delta_data) {
+    RETURN_IF_ERROR(sync_if_not_running());
+
+    if (query_version > 0) {
+        std::shared_lock rlock(_meta_lock);
+        if (_max_version >= query_version) {
+            return Status::OK();
+        }
+    }
+
+    // serially execute sync to reduce unnecessary network overhead
+    std::lock_guard lock(_sync_meta_lock);
+    if (query_version > 0) {
+        std::shared_lock rlock(_meta_lock);
+        if (_max_version >= query_version) {
+            return Status::OK();
+        }
+    }
+
+    auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
+    if (st.is<ErrorCode::NOT_FOUND>()) {
+        recycle_cached_data();
+    }
+    return st;
+}
+
+// Sync tablet meta and all rowset meta if not running.
+// This could happen when BE didn't finish schema change job and another BE 
committed this schema change job.
+// It should be a quite rare situation.
+Status CloudTablet::sync_if_not_running() {
+    if (tablet_state() == TABLET_RUNNING) {
+        return Status::OK();
+    }
+
+    // Serially execute sync to reduce unnecessary network overhead
+    std::lock_guard lock(_sync_meta_lock);
+
+    {
+        std::shared_lock rlock(_meta_lock);
+        if (tablet_state() == TABLET_RUNNING) {
+            return Status::OK();
+        }
+    }
+
+    TabletMetaSharedPtr tablet_meta;
+    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
+    if (!st.ok()) {
+        if (st.is<ErrorCode::NOT_FOUND>()) {
+            recycle_cached_data();
+        }
+        return st;
+    }
+
+    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
+        // MoW may go to here when load while schema change
+        return Status::Error<INVALID_TABLET_STATE>("invalid tablet state. 
tablet_id={}",
+                                                   tablet_id());
+    }
+
+    TimestampedVersionTracker empty_tracker;
+    {
+        std::lock_guard wlock(_meta_lock);
+        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
+        _rs_version_map.clear();
+        _stale_rs_version_map.clear();
+        std::swap(_timestamped_version_tracker, empty_tracker);
+        _tablet_meta->clear_rowsets();
+        _tablet_meta->clear_stale_rowset();
+        _max_version = -1;
+    }
+
+    st = _engine.meta_mgr().sync_tablet_rowsets(this);
+    if (st.is<ErrorCode::NOT_FOUND>()) {
+        recycle_cached_data();
+    }
+    return st;
+}
+
+void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool 
version_overlap,

Review Comment:
   warning: function 'add_rowsets' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool 
version_overlap,
                     ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/cloud/cloud_tablet.cpp:196:** 81 lines including whitespace and 
comments (threshold 80)
   ```cpp
   void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool 
version_overlap,
                     ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to