github-actions[bot] commented on code in PR #38243:
URL: https://github.com/apache/doris/pull/38243#discussion_r1713235462


##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -1405,104 +1438,144 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
     int64_t req_cc_cnt = request->cumulative_compaction_cnt();
     int64_t req_cp = request->cumulative_point();
 
-    std::unique_ptr<Transaction> txn;
-    TxnErrorCode err = txn_kv_->create_txn(&txn);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = cast_as<ErrCategory::CREATE>(err);
-        msg = "failed to create txn";
-        return;
-    }
+    do {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(err);
+            msg = "failed to create txn";
+            return;
+        }
 
-    TabletIndexPB idx(request->idx());
-    // Get tablet id index from kv
-    if (!idx.has_table_id() || !idx.has_index_id() || !idx.has_partition_id()) 
{
+        TabletIndexPB idx;
+        // Get tablet id index from kv
         get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx);
-        if (code != MetaServiceCode::OK) return;
-    }
-    // TODO(plat1ko): Judge if tablet has been dropped (in dropped 
index/partition)
-
-    TabletStatsPB tablet_stat;
-    internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat, true);
-    if (code != MetaServiceCode::OK) return;
-    VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << 
proto_to_json(tablet_stat);
-
-    int64_t bc_cnt = tablet_stat.base_compaction_cnt();
-    int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
-    int64_t cp = tablet_stat.cumulative_point();
-
-    response->mutable_stats()->CopyFrom(tablet_stat);
-
-    int64_t req_start = request->start_version();
-    int64_t req_end = request->end_version();
-    req_end = req_end < 0 ? std::numeric_limits<int64_t>::max() - 1 : req_end;
-
-    
//==========================================================================
-    //      Find version ranges to be synchronized due to compaction
-    
//==========================================================================
-    if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
-        code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" 
<< req_bc_cnt
-           << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", 
cc_cnt=" << cc_cnt
-           << ", req_cp=" << req_cp << ", cp=" << cp;
-        msg = ss.str();
-        return;
-    }
-    auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, 
req_cp, cp,
-                                       req_start, req_end);
-    for (auto [start, end] : versions) {
-        internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, 
code, msg, response);
         if (code != MetaServiceCode::OK) {
             return;
         }
-    }
+        DCHECK(request->has_idx());
+
+        std::string ver_val;
+        std::string ver_key =
+                partition_version_key({instance_id, idx.db_id(), 
idx.table_id(), idx.partition_id()});
+        err = txn->get(ver_key, &ver_val);
+        if (TxnErrorCode::TXN_OK != err) {
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                        : 
cast_as<ErrCategory::READ>(err);
+            ss << "failed to get partiton version, tablet_id=" << tablet_id << 
" key=" << hex(ver_key)
+            << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
 
-    // get referenced schema
-    std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> version_to_schema;
-    for (auto& rowset_meta : *response->mutable_rowset_meta()) {
-        if (rowset_meta.has_tablet_schema()) {
-            
version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(),
-                                      rowset_meta.mutable_tablet_schema());
-            
rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version());
+        VersionPB version_pb;
+        if (!version_pb.ParseFromString(ver_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "failed to parse version pb tablet_id=" << tablet_id << " 
key=" << hex(ver_key);
+            msg = ss.str();
+            return;
         }
-        rowset_meta.set_index_id(idx.index_id());
-    }
-    bool need_read_schema_dict = false;
-    auto arena = response->GetArena();
-    for (auto& rowset_meta : *response->mutable_rowset_meta()) {
-        if (rowset_meta.has_schema_dict_key_list()) {
-            need_read_schema_dict = true;
+
+        if (version_pb.has_txn_id()) {
+            txn.reset();
+            std::shared_ptr<TxnLazyCommitTask> task =
+                    txn_lazy_committer_->submit(instance_id, 
version_pb.txn_id());
+
+            std::tie(code, msg) = task->wait();
+            if (code != MetaServiceCode::OK) {
+                LOG(WARNING) << "advance_last_txn failed last_txn=" << 
version_pb.txn_id()
+                             << " code=" << code << "msg=" << msg;
+                return;
+            }
+            continue;
         }
-        if (rowset_meta.has_tablet_schema()) continue;
-        if (!rowset_meta.has_schema_version()) {
+
+        // TODO(plat1ko): Judge if tablet has been dropped (in dropped 
index/partition)
+
+        TabletStatsPB tablet_stat;
+        internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat, true);
+        if (code != MetaServiceCode::OK) return;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
           if (code != MetaServiceCode::OK) { return;
   }
   ```
   



##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -1405,104 +1438,144 @@
     int64_t req_cc_cnt = request->cumulative_compaction_cnt();
     int64_t req_cp = request->cumulative_point();
 
-    std::unique_ptr<Transaction> txn;
-    TxnErrorCode err = txn_kv_->create_txn(&txn);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = cast_as<ErrCategory::CREATE>(err);
-        msg = "failed to create txn";
-        return;
-    }
+    do {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(err);
+            msg = "failed to create txn";
+            return;
+        }
 
-    TabletIndexPB idx(request->idx());
-    // Get tablet id index from kv
-    if (!idx.has_table_id() || !idx.has_index_id() || !idx.has_partition_id()) 
{
+        TabletIndexPB idx;
+        // Get tablet id index from kv
         get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx);
-        if (code != MetaServiceCode::OK) return;
-    }
-    // TODO(plat1ko): Judge if tablet has been dropped (in dropped 
index/partition)
-
-    TabletStatsPB tablet_stat;
-    internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat, true);
-    if (code != MetaServiceCode::OK) return;
-    VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << 
proto_to_json(tablet_stat);
-
-    int64_t bc_cnt = tablet_stat.base_compaction_cnt();
-    int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
-    int64_t cp = tablet_stat.cumulative_point();
-
-    response->mutable_stats()->CopyFrom(tablet_stat);
-
-    int64_t req_start = request->start_version();
-    int64_t req_end = request->end_version();
-    req_end = req_end < 0 ? std::numeric_limits<int64_t>::max() - 1 : req_end;
-
-    
//==========================================================================
-    //      Find version ranges to be synchronized due to compaction
-    
//==========================================================================
-    if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
-        code = MetaServiceCode::INVALID_ARGUMENT;
-        ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" 
<< req_bc_cnt
-           << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", 
cc_cnt=" << cc_cnt
-           << ", req_cp=" << req_cp << ", cp=" << cp;
-        msg = ss.str();
-        return;
-    }
-    auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, 
req_cp, cp,
-                                       req_start, req_end);
-    for (auto [start, end] : versions) {
-        internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, 
code, msg, response);
         if (code != MetaServiceCode::OK) {
             return;
         }
-    }
+        DCHECK(request->has_idx());
+
+        std::string ver_val;
+        std::string ver_key =
+                partition_version_key({instance_id, idx.db_id(), 
idx.table_id(), idx.partition_id()});
+        err = txn->get(ver_key, &ver_val);
+        if (TxnErrorCode::TXN_OK != err) {
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                        : 
cast_as<ErrCategory::READ>(err);
+            ss << "failed to get partiton version, tablet_id=" << tablet_id << 
" key=" << hex(ver_key)
+            << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
 
-    // get referenced schema
-    std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> version_to_schema;
-    for (auto& rowset_meta : *response->mutable_rowset_meta()) {
-        if (rowset_meta.has_tablet_schema()) {
-            
version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(),
-                                      rowset_meta.mutable_tablet_schema());
-            
rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version());
+        VersionPB version_pb;
+        if (!version_pb.ParseFromString(ver_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "failed to parse version pb tablet_id=" << tablet_id << " 
key=" << hex(ver_key);
+            msg = ss.str();
+            return;
         }
-        rowset_meta.set_index_id(idx.index_id());
-    }
-    bool need_read_schema_dict = false;
-    auto arena = response->GetArena();
-    for (auto& rowset_meta : *response->mutable_rowset_meta()) {
-        if (rowset_meta.has_schema_dict_key_list()) {
-            need_read_schema_dict = true;
+
+        if (version_pb.has_txn_id()) {
+            txn.reset();
+            std::shared_ptr<TxnLazyCommitTask> task =
+                    txn_lazy_committer_->submit(instance_id, 
version_pb.txn_id());
+
+            std::tie(code, msg) = task->wait();
+            if (code != MetaServiceCode::OK) {
+                LOG(WARNING) << "advance_last_txn failed last_txn=" << 
version_pb.txn_id()
+                             << " code=" << code << "msg=" << msg;
+                return;
+            }
+            continue;
         }
-        if (rowset_meta.has_tablet_schema()) continue;
-        if (!rowset_meta.has_schema_version()) {
+
+        // TODO(plat1ko): Judge if tablet has been dropped (in dropped 
index/partition)
+
+        TabletStatsPB tablet_stat;
+        internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, 
tablet_stat, true);
+        if (code != MetaServiceCode::OK) return;
+        VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << 
proto_to_json(tablet_stat);
+
+        int64_t bc_cnt = tablet_stat.base_compaction_cnt();
+        int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
+        int64_t cp = tablet_stat.cumulative_point();
+
+        response->mutable_stats()->CopyFrom(tablet_stat);
+
+        int64_t req_start = request->start_version();
+        int64_t req_end = request->end_version();
+        req_end = req_end < 0 ? std::numeric_limits<int64_t>::max() - 1 : 
req_end;
+
+        
//==========================================================================
+        //      Find version ranges to be synchronized due to compaction
+        
//==========================================================================
+        if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
             code = MetaServiceCode::INVALID_ARGUMENT;
-            msg = fmt::format(
-                    "rowset_meta must have either schema or schema_version, "
-                    "rowset_version=[{}-{}]",
-                    rowset_meta.start_version(), rowset_meta.end_version());
+            ss << "no valid compaction_cnt or cumulative_point given. 
req_bc_cnt=" << req_bc_cnt
+            << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", 
cc_cnt=" << cc_cnt
+            << ", req_cp=" << req_cp << ", cp=" << cp;
+            msg = ss.str();
             return;
         }
-        if (auto it = version_to_schema.find(rowset_meta.schema_version());
-            it != version_to_schema.end()) {
-            if (arena != nullptr) {
-                rowset_meta.set_allocated_tablet_schema(it->second);
-            } else {
-                rowset_meta.mutable_tablet_schema()->CopyFrom(*it->second);
+        auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, 
cc_cnt, req_cp, cp,
+                                        req_start, req_end);
+        for (auto [start, end] : versions) {
+            internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, 
code, msg, response);
+            if (code != MetaServiceCode::OK) {
+                return;
             }
-        } else {
-            auto key = meta_schema_key({instance_id, idx.index_id(), 
rowset_meta.schema_version()});
-            if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, key, code, 
msg)) {
+        }
+
+        // get referenced schema
+        std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> 
version_to_schema;
+        for (auto& rowset_meta : *response->mutable_rowset_meta()) {
+            if (rowset_meta.has_tablet_schema()) {
+                
version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(),
+                                        rowset_meta.mutable_tablet_schema());
+                
rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version());
+            }
+            rowset_meta.set_index_id(idx.index_id());
+        }
+        bool need_read_schema_dict = false;
+        auto arena = response->GetArena();
+        for (auto& rowset_meta : *response->mutable_rowset_meta()) {
+            if (rowset_meta.has_schema_dict_key_list()) {
+                need_read_schema_dict = true;
+            }
+            if (rowset_meta.has_tablet_schema()) continue;
+            if (!rowset_meta.has_schema_version()) {
+                code = MetaServiceCode::INVALID_ARGUMENT;
+                msg = fmt::format(
+                        "rowset_meta must have either schema or 
schema_version, "
+                        "rowset_version=[{}-{}]",
+                        rowset_meta.start_version(), 
rowset_meta.end_version());
                 return;
             }
-            version_to_schema.emplace(rowset_meta.schema_version(),
-                                      rowset_meta.mutable_tablet_schema());
+            if (auto it = version_to_schema.find(rowset_meta.schema_version());
+                it != version_to_schema.end()) {
+                if (arena != nullptr) {
+                    rowset_meta.set_allocated_tablet_schema(it->second);
+                } else {
+                    rowset_meta.mutable_tablet_schema()->CopyFrom(*it->second);
+                }
+            } else {
+                auto key = meta_schema_key({instance_id, idx.index_id(), 
rowset_meta.schema_version()});
+                if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, key, 
code, msg)) {
+                    return;
+                }
+                version_to_schema.emplace(rowset_meta.schema_version(),
+                                        rowset_meta.mutable_tablet_schema());
+            }
         }
-    }
 
-    if (need_read_schema_dict) {
-        read_schema_from_dict(code, msg, instance_id, idx.index_id(), 
txn.get(),
-                              response->mutable_rowset_meta());
-        if (code != MetaServiceCode::OK) return;
-    }
+        if (need_read_schema_dict) {
+            read_schema_from_dict(code, msg, instance_id, idx.index_id(), 
txn.get(),
+                                response->mutable_rowset_meta());
+            if (code != MetaServiceCode::OK) return;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
               if (code != MetaServiceCode::OK) { return;
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to