w41ter commented on code in PR #38243:
URL: https://github.com/apache/doris/pull/38243#discussion_r1708714602


##########
cloud/src/recycler/recycler.cpp:
##########
@@ -1888,72 +2019,90 @@ int InstanceRecycler::abort_timeout_txn() {
         }
         ++num_timeout;
 
-        std::unique_ptr<Transaction> txn;
-        TxnErrorCode err = txn_kv_->create_txn(&txn);
-        if (err != TxnErrorCode::TXN_OK) {
-            LOG_ERROR("failed to create txn err={}", err).tag("key", hex(k));
-            return -1;
-        }
-        std::string_view k1 = k;
-        //TxnRunningKeyInfo 0:instance_id  1:db_id  2:txn_id
-        k1.remove_prefix(1); // Remove key space
-        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
-        if (decode_key(&k1, &out) != 0) {
-            LOG_ERROR("failed to decode key").tag("key", hex(k));
-            return -1;
-        }
-        int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
-        int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
-        VLOG_DEBUG << "instance_id=" << instance_id_ << " db_id=" << db_id << 
" txn_id=" << txn_id;
-        // Update txn_info
-        std::string txn_inf_key, txn_inf_val;
-        txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
-        err = txn->get(txn_inf_key, &txn_inf_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            LOG_WARNING("failed to get txn info err={}", err).tag("key", 
hex(txn_inf_key));
-            return -1;
-        }
-        TxnInfoPB txn_info;
-        if (!txn_info.ParseFromString(txn_inf_val)) {
-            LOG_WARNING("failed to parse txn info").tag("key", hex(k));
-            return -1;
-        }
-        txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED);
-        txn_info.set_finish_time(current_time);
-        txn_info.set_reason("timeout");
-        VLOG_DEBUG << "txn_info=" << txn_info.DebugString();
-        txn_inf_val.clear();
-        if (!txn_info.SerializeToString(&txn_inf_val)) {
-            LOG_WARNING("failed to serialize txn info").tag("key", hex(k));
-            return -1;
-        }
-        txn->put(txn_inf_key, txn_inf_val);
-        VLOG_DEBUG << "txn->put, txn_inf_key=" << hex(txn_inf_key);
-        // Put recycle txn key
-        std::string recyc_txn_key, recyc_txn_val;
-        recycle_txn_key({instance_id_, db_id, txn_id}, &recyc_txn_key);
-        RecycleTxnPB recycle_txn_pb;
-        recycle_txn_pb.set_creation_time(current_time);
-        recycle_txn_pb.set_label(txn_info.label());
-        if (!recycle_txn_pb.SerializeToString(&recyc_txn_val)) {
-            LOG_WARNING("failed to serialize txn recycle info")
-                    .tag("key", hex(k))
-                    .tag("db_id", db_id)
-                    .tag("txn_id", txn_id);
-            return -1;
-        }
-        txn->put(recyc_txn_key, recyc_txn_val);
-        // Remove txn running key
-        txn->remove(k);
-        err = txn->commit();
-        if (err != TxnErrorCode::TXN_OK) {
-            LOG_WARNING("failed to commit txn err={}", err)
-                    .tag("key", hex(k))
-                    .tag("db_id", db_id)
-                    .tag("txn_id", txn_id);
-            return -1;
-        }
-        ++num_abort;
+        do {
+            std::unique_ptr<Transaction> txn;
+            TxnErrorCode err = txn_kv_->create_txn(&txn);
+            if (err != TxnErrorCode::TXN_OK) {
+                LOG_ERROR("failed to create txn err={}", err).tag("key", 
hex(k));
+                return -1;
+            }
+            std::string_view k1 = k;
+            //TxnRunningKeyInfo 0:instance_id  1:db_id  2:txn_id
+            k1.remove_prefix(1); // Remove key space
+            std::vector<std::tuple<std::variant<int64_t, std::string>, int, 
int>> out;
+            if (decode_key(&k1, &out) != 0) {
+                LOG_ERROR("failed to decode key").tag("key", hex(k));
+                return -1;
+            }
+            int64_t db_id = std::get<int64_t>(std::get<0>(out[3]));
+            int64_t txn_id = std::get<int64_t>(std::get<0>(out[4]));
+            VLOG_DEBUG << "instance_id=" << instance_id_ << " db_id=" << db_id
+                       << " txn_id=" << txn_id;
+            // Update txn_info
+            std::string txn_inf_key, txn_inf_val;
+            txn_info_key({instance_id_, db_id, txn_id}, &txn_inf_key);
+            err = txn->get(txn_inf_key, &txn_inf_val);
+            if (err != TxnErrorCode::TXN_OK) {
+                LOG_WARNING("failed to get txn info err={}", err).tag("key", 
hex(txn_inf_key));
+                return -1;
+            }
+            TxnInfoPB txn_info;
+            if (!txn_info.ParseFromString(txn_inf_val)) {
+                LOG_WARNING("failed to parse txn info").tag("key", hex(k));
+                return -1;
+            }
+
+            if (TxnStatusPB::TXN_STATUS_COMMITTED == txn_info.status()) {
+                txn.reset();
+                std::shared_ptr<TxnLazyCommitTask> task =
+                        txn_lazy_committer_->submit(instance_id_, 
txn_info.txn_id());
+                std::pair<MetaServiceCode, std::string> ret = task->wait();
+                if (ret.first != MetaServiceCode::OK) {
+                    LOG(WARNING) << "lazy commit txn failed txn_id=" << txn_id
+                                 << " code=" << ret.first << "msg=" << 
ret.second;
+                    return -1;
+                }
+                continue;
+            } else {
+                DCHECK(txn_info.status() != TxnStatusPB::TXN_STATUS_VISIBLE);
+                txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED);

Review Comment:
   The `continue` means retry to recycle this "timeouted" TXN, but it has been 
committed (status visible) via the `TxnLazyCommitTask`, then this DCHECK will 
panic.



##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -788,6 +788,164 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, 
const TabletStats& stat
         txn->put(key, val);
     }
 }
+
+void convert_tmp_rowsets(
+        const std::string& instance_id, int64_t txn_id, std::shared_ptr<TxnKv> 
txn_kv,
+        MetaServiceCode& code, std::string& msg, int64_t db_id,
+        std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta,
+        std::unordered_map<int64_t, TabletIndexPB>& tablet_ids) {
+    std::stringstream ss;
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg;
+        return;
+    }
+
+    // partition_id -> VersionPB
+    std::unordered_map<int64_t, VersionPB> partition_versions;

Review Comment:
   `convert_tmp_rowsets` seems only used in TxnLazyCommiter, and line 986 is 
`DCHECK(partition_versions.size() == 1);`.



##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -326,8 +342,12 @@ void 
MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
     std::vector<std::optional<std::string>> version_values;
     version_keys.reserve(BATCH_SIZE);
     version_values.reserve(BATCH_SIZE);
+
     while ((code == MetaServiceCode::OK || code == 
MetaServiceCode::KV_TXN_TOO_OLD) &&
            response->versions_size() < response->partition_ids_size()) {
+        response->clear_versions();
+        code = MetaServiceCode::OK;
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to