github-actions[bot] commented on code in PR #38243: URL: https://github.com/apache/doris/pull/38243#discussion_r1713235462
########## cloud/src/meta-service/meta_service.cpp: ########## @@ -1405,104 +1438,144 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, int64_t req_cc_cnt = request->cumulative_compaction_cnt(); int64_t req_cp = request->cumulative_point(); - std::unique_ptr<Transaction> txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as<ErrCategory::CREATE>(err); - msg = "failed to create txn"; - return; - } + do { + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to create txn"; + return; + } - TabletIndexPB idx(request->idx()); - // Get tablet id index from kv - if (!idx.has_table_id() || !idx.has_index_id() || !idx.has_partition_id()) { + TabletIndexPB idx; + // Get tablet id index from kv get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx); - if (code != MetaServiceCode::OK) return; - } - // TODO(plat1ko): Judge if tablet has been dropped (in dropped index/partition) - - TabletStatsPB tablet_stat; - internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat, true); - if (code != MetaServiceCode::OK) return; - VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << proto_to_json(tablet_stat); - - int64_t bc_cnt = tablet_stat.base_compaction_cnt(); - int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt(); - int64_t cp = tablet_stat.cumulative_point(); - - response->mutable_stats()->CopyFrom(tablet_stat); - - int64_t req_start = request->start_version(); - int64_t req_end = request->end_version(); - req_end = req_end < 0 ? std::numeric_limits<int64_t>::max() - 1 : req_end; - - //========================================================================== - // Find version ranges to be synchronized due to compaction - //========================================================================== - if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) { - code = MetaServiceCode::INVALID_ARGUMENT; - ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" << req_bc_cnt - << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", cc_cnt=" << cc_cnt - << ", req_cp=" << req_cp << ", cp=" << cp; - msg = ss.str(); - return; - } - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, - req_start, req_end); - for (auto [start, end] : versions) { - internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, code, msg, response); if (code != MetaServiceCode::OK) { return; } - } + DCHECK(request->has_idx()); + + std::string ver_val; + std::string ver_key = + partition_version_key({instance_id, idx.db_id(), idx.table_id(), idx.partition_id()}); + err = txn->get(ver_key, &ver_val); + if (TxnErrorCode::TXN_OK != err) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + ss << "failed to get partiton version, tablet_id=" << tablet_id << " key=" << hex(ver_key) + << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } - // get referenced schema - std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> version_to_schema; - for (auto& rowset_meta : *response->mutable_rowset_meta()) { - if (rowset_meta.has_tablet_schema()) { - version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(), - rowset_meta.mutable_tablet_schema()); - rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version()); + VersionPB version_pb; + if (!version_pb.ParseFromString(ver_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse version pb tablet_id=" << tablet_id << " key=" << hex(ver_key); + msg = ss.str(); + return; } - rowset_meta.set_index_id(idx.index_id()); - } - bool need_read_schema_dict = false; - auto arena = response->GetArena(); - for (auto& rowset_meta : *response->mutable_rowset_meta()) { - if (rowset_meta.has_schema_dict_key_list()) { - need_read_schema_dict = true; + + if (version_pb.has_txn_id()) { + txn.reset(); + std::shared_ptr<TxnLazyCommitTask> task = + txn_lazy_committer_->submit(instance_id, version_pb.txn_id()); + + std::tie(code, msg) = task->wait(); + if (code != MetaServiceCode::OK) { + LOG(WARNING) << "advance_last_txn failed last_txn=" << version_pb.txn_id() + << " code=" << code << "msg=" << msg; + return; + } + continue; } - if (rowset_meta.has_tablet_schema()) continue; - if (!rowset_meta.has_schema_version()) { + + // TODO(plat1ko): Judge if tablet has been dropped (in dropped index/partition) + + TabletStatsPB tablet_stat; + internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat, true); + if (code != MetaServiceCode::OK) return; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (code != MetaServiceCode::OK) { return; } ``` ########## cloud/src/meta-service/meta_service.cpp: ########## @@ -1405,104 +1438,144 @@ int64_t req_cc_cnt = request->cumulative_compaction_cnt(); int64_t req_cp = request->cumulative_point(); - std::unique_ptr<Transaction> txn; - TxnErrorCode err = txn_kv_->create_txn(&txn); - if (err != TxnErrorCode::TXN_OK) { - code = cast_as<ErrCategory::CREATE>(err); - msg = "failed to create txn"; - return; - } + do { + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to create txn"; + return; + } - TabletIndexPB idx(request->idx()); - // Get tablet id index from kv - if (!idx.has_table_id() || !idx.has_index_id() || !idx.has_partition_id()) { + TabletIndexPB idx; + // Get tablet id index from kv get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, idx); - if (code != MetaServiceCode::OK) return; - } - // TODO(plat1ko): Judge if tablet has been dropped (in dropped index/partition) - - TabletStatsPB tablet_stat; - internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat, true); - if (code != MetaServiceCode::OK) return; - VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << proto_to_json(tablet_stat); - - int64_t bc_cnt = tablet_stat.base_compaction_cnt(); - int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt(); - int64_t cp = tablet_stat.cumulative_point(); - - response->mutable_stats()->CopyFrom(tablet_stat); - - int64_t req_start = request->start_version(); - int64_t req_end = request->end_version(); - req_end = req_end < 0 ? std::numeric_limits<int64_t>::max() - 1 : req_end; - - //========================================================================== - // Find version ranges to be synchronized due to compaction - //========================================================================== - if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) { - code = MetaServiceCode::INVALID_ARGUMENT; - ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" << req_bc_cnt - << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", cc_cnt=" << cc_cnt - << ", req_cp=" << req_cp << ", cp=" << cp; - msg = ss.str(); - return; - } - auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, - req_start, req_end); - for (auto [start, end] : versions) { - internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, code, msg, response); if (code != MetaServiceCode::OK) { return; } - } + DCHECK(request->has_idx()); + + std::string ver_val; + std::string ver_key = + partition_version_key({instance_id, idx.db_id(), idx.table_id(), idx.partition_id()}); + err = txn->get(ver_key, &ver_val); + if (TxnErrorCode::TXN_OK != err) { + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND + : cast_as<ErrCategory::READ>(err); + ss << "failed to get partiton version, tablet_id=" << tablet_id << " key=" << hex(ver_key) + << " err=" << err; + msg = ss.str(); + LOG(WARNING) << msg; + return; + } - // get referenced schema - std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> version_to_schema; - for (auto& rowset_meta : *response->mutable_rowset_meta()) { - if (rowset_meta.has_tablet_schema()) { - version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(), - rowset_meta.mutable_tablet_schema()); - rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version()); + VersionPB version_pb; + if (!version_pb.ParseFromString(ver_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse version pb tablet_id=" << tablet_id << " key=" << hex(ver_key); + msg = ss.str(); + return; } - rowset_meta.set_index_id(idx.index_id()); - } - bool need_read_schema_dict = false; - auto arena = response->GetArena(); - for (auto& rowset_meta : *response->mutable_rowset_meta()) { - if (rowset_meta.has_schema_dict_key_list()) { - need_read_schema_dict = true; + + if (version_pb.has_txn_id()) { + txn.reset(); + std::shared_ptr<TxnLazyCommitTask> task = + txn_lazy_committer_->submit(instance_id, version_pb.txn_id()); + + std::tie(code, msg) = task->wait(); + if (code != MetaServiceCode::OK) { + LOG(WARNING) << "advance_last_txn failed last_txn=" << version_pb.txn_id() + << " code=" << code << "msg=" << msg; + return; + } + continue; } - if (rowset_meta.has_tablet_schema()) continue; - if (!rowset_meta.has_schema_version()) { + + // TODO(plat1ko): Judge if tablet has been dropped (in dropped index/partition) + + TabletStatsPB tablet_stat; + internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat, true); + if (code != MetaServiceCode::OK) return; + VLOG_DEBUG << "tablet_id=" << tablet_id << " stats=" << proto_to_json(tablet_stat); + + int64_t bc_cnt = tablet_stat.base_compaction_cnt(); + int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt(); + int64_t cp = tablet_stat.cumulative_point(); + + response->mutable_stats()->CopyFrom(tablet_stat); + + int64_t req_start = request->start_version(); + int64_t req_end = request->end_version(); + req_end = req_end < 0 ? std::numeric_limits<int64_t>::max() - 1 : req_end; + + //========================================================================== + // Find version ranges to be synchronized due to compaction + //========================================================================== + if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) { code = MetaServiceCode::INVALID_ARGUMENT; - msg = fmt::format( - "rowset_meta must have either schema or schema_version, " - "rowset_version=[{}-{}]", - rowset_meta.start_version(), rowset_meta.end_version()); + ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" << req_bc_cnt + << ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", cc_cnt=" << cc_cnt + << ", req_cp=" << req_cp << ", cp=" << cp; + msg = ss.str(); return; } - if (auto it = version_to_schema.find(rowset_meta.schema_version()); - it != version_to_schema.end()) { - if (arena != nullptr) { - rowset_meta.set_allocated_tablet_schema(it->second); - } else { - rowset_meta.mutable_tablet_schema()->CopyFrom(*it->second); + auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, + req_start, req_end); + for (auto [start, end] : versions) { + internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, code, msg, response); + if (code != MetaServiceCode::OK) { + return; } - } else { - auto key = meta_schema_key({instance_id, idx.index_id(), rowset_meta.schema_version()}); - if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, key, code, msg)) { + } + + // get referenced schema + std::unordered_map<int32_t, doris::TabletSchemaCloudPB*> version_to_schema; + for (auto& rowset_meta : *response->mutable_rowset_meta()) { + if (rowset_meta.has_tablet_schema()) { + version_to_schema.emplace(rowset_meta.tablet_schema().schema_version(), + rowset_meta.mutable_tablet_schema()); + rowset_meta.set_schema_version(rowset_meta.tablet_schema().schema_version()); + } + rowset_meta.set_index_id(idx.index_id()); + } + bool need_read_schema_dict = false; + auto arena = response->GetArena(); + for (auto& rowset_meta : *response->mutable_rowset_meta()) { + if (rowset_meta.has_schema_dict_key_list()) { + need_read_schema_dict = true; + } + if (rowset_meta.has_tablet_schema()) continue; + if (!rowset_meta.has_schema_version()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = fmt::format( + "rowset_meta must have either schema or schema_version, " + "rowset_version=[{}-{}]", + rowset_meta.start_version(), rowset_meta.end_version()); return; } - version_to_schema.emplace(rowset_meta.schema_version(), - rowset_meta.mutable_tablet_schema()); + if (auto it = version_to_schema.find(rowset_meta.schema_version()); + it != version_to_schema.end()) { + if (arena != nullptr) { + rowset_meta.set_allocated_tablet_schema(it->second); + } else { + rowset_meta.mutable_tablet_schema()->CopyFrom(*it->second); + } + } else { + auto key = meta_schema_key({instance_id, idx.index_id(), rowset_meta.schema_version()}); + if (!try_fetch_and_parse_schema(txn.get(), rowset_meta, key, code, msg)) { + return; + } + version_to_schema.emplace(rowset_meta.schema_version(), + rowset_meta.mutable_tablet_schema()); + } } - } - if (need_read_schema_dict) { - read_schema_from_dict(code, msg, instance_id, idx.index_id(), txn.get(), - response->mutable_rowset_meta()); - if (code != MetaServiceCode::OK) return; - } + if (need_read_schema_dict) { + read_schema_from_dict(code, msg, instance_id, idx.index_id(), txn.get(), + response->mutable_rowset_meta()); + if (code != MetaServiceCode::OK) return; Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (code != MetaServiceCode::OK) { return; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org