This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 6848d984dc6 branch-4.1: [fix](cloud) Drain txn lazy committer workers 
before destruction #63876 (#63926)
6848d984dc6 is described below

commit 6848d984dc6fd0b96c5ecd5331de023b9426042a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 1 14:43:49 2026 +0800

    branch-4.1: [fix](cloud) Drain txn lazy committer workers before 
destruction #63876 (#63926)
    
    Cherry-picked from #63876
    
    Co-authored-by: Gavin Chou <[email protected]>
    Co-authored-by: gavinchou <[email protected]>
---
 cloud/src/meta-service/txn_lazy_committer.cpp | 57 +++++++++++++++++++++------
 cloud/src/meta-service/txn_lazy_committer.h   |  9 ++++-
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 130745e5e0b..2d209ba2a7f 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -634,14 +634,20 @@ TxnLazyCommitTask::TxnLazyCommitTask(const std::string& 
instance_id, int64_t txn
     DCHECK(txn_id > 0);
 }
 
+void TxnLazyCommitTask::finish(MetaServiceCode code, std::string msg) {
+    {
+        std::unique_lock lock(mutex_);
+        finished_ = true;
+        code_ = code;
+        msg_ = std::move(msg);
+    }
+    cond_.notify_all();
+}
+
 void TxnLazyCommitTask::commit() {
     StopWatch sw;
     DORIS_CLOUD_DEFER {
-        {
-            std::unique_lock lock(mutex_);
-            this->finished_ = true;
-        }
-        this->cond_.notify_all();
+        finish(code_, msg_);
         g_bvar_txn_lazy_committer_committing_duration << sw.elapsed_us();
     };
 
@@ -966,6 +972,25 @@ TxnLazyCommitter::TxnLazyCommitter(std::shared_ptr<TxnKv> 
txn_kv,
     parallel_commit_pool_->start();
 }
 
+TxnLazyCommitter::~TxnLazyCommitter() {
+    {
+        std::unique_lock<std::mutex> lock(mutex_);
+        stopped_ = true;
+    }
+
+    if (worker_pool_ != nullptr) {
+        worker_pool_->stop();
+    }
+    if (parallel_commit_pool_ != nullptr) {
+        parallel_commit_pool_->stop();
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(mutex_);
+        running_tasks_.clear();
+    }
+}
+
 /**
  * @brief Submit a lazy commit txn task
  * 
@@ -979,6 +1004,12 @@ std::shared_ptr<TxnLazyCommitTask> 
TxnLazyCommitter::submit(const std::string& i
     std::shared_ptr<TxnLazyCommitTask> task;
     {
         std::unique_lock<std::mutex> lock(mutex_);
+        if (stopped_) {
+            task = std::make_shared<TxnLazyCommitTask>(instance_id, txn_id, 
txn_kv_, this);
+            task->finish(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer 
is stopped");
+            return task;
+        }
+
         auto iter = running_tasks_.find(txn_id);
         if (iter != running_tasks_.end()) {
             return iter->second;
@@ -987,13 +1018,17 @@ std::shared_ptr<TxnLazyCommitTask> 
TxnLazyCommitter::submit(const std::string& i
         task = std::make_shared<TxnLazyCommitTask>(instance_id, txn_id, 
txn_kv_, this);
         running_tasks_.emplace(txn_id, task);
         g_bvar_txn_lazy_committer_submitted << 1;
-    }
 
-    worker_pool_->submit([task]() {
-        task->commit();
-        task->txn_lazy_committer_->remove(task->txn_id_);
-        g_bvar_txn_lazy_committer_finished << 1;
-    });
+        int ret = worker_pool_->submit([task]() {
+            task->commit();
+            task->txn_lazy_committer_->remove(task->txn_id_);
+            g_bvar_txn_lazy_committer_finished << 1;
+        });
+        if (ret != 0) {
+            running_tasks_.erase(txn_id);
+            task->finish(MetaServiceCode::UNDEFINED_ERR, "failed to submit txn 
lazy commit task");
+        }
+    }
     DCHECK(task != nullptr);
     return task;
 }
diff --git a/cloud/src/meta-service/txn_lazy_committer.h 
b/cloud/src/meta-service/txn_lazy_committer.h
index 048ff7cf308..0ac6591029c 100644
--- a/cloud/src/meta-service/txn_lazy_committer.h
+++ b/cloud/src/meta-service/txn_lazy_committer.h
@@ -45,6 +45,11 @@ public:
 private:
     friend class TxnLazyCommitter;
 
+    // Marks the task as finished with the final result and wakes all waiters.
+    // `code` is returned by wait() as the task status, and `msg` carries the
+    // corresponding error detail or an empty string on success.
+    void finish(MetaServiceCode code, std::string msg);
+
     std::pair<MetaServiceCode, std::string> commit_partition(
             int64_t db_id, int64_t partition_id,
             const std::vector<std::pair<std::string, 
doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
@@ -66,6 +71,7 @@ class TxnLazyCommitter {
 public:
     TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv);
     TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv, 
std::shared_ptr<ResourceManager> resource_mgr);
+    ~TxnLazyCommitter();
     std::shared_ptr<TxnLazyCommitTask> submit(const std::string& instance_id, 
int64_t txn_id);
     void remove(int64_t txn_id);
 
@@ -82,5 +88,6 @@ private:
     std::mutex mutex_;
     // <txn_id, TxnLazyCommitTask>
     std::unordered_map<int64_t, std::shared_ptr<TxnLazyCommitTask>> 
running_tasks_;
+    bool stopped_ = false;
 };
-} // namespace doris::cloud
\ No newline at end of file
+} // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to