This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b31648a6462 [fix](cloud) compaction and schema change potential data 
race when retrying prepare rowset (#51048)
b31648a6462 is described below

commit b31648a6462663c835fa8e0181d01e76fa5aa5fc
Author: Luwei <lu...@selectdb.com>
AuthorDate: Thu Jun 12 22:08:41 2025 +0800

    [fix](cloud) compaction and schema change potential data race when retrying 
prepare rowset (#51048)
    
    related PR #51129
---
 be/src/cloud/cloud_delete_task.cpp       |   2 +-
 be/src/cloud/cloud_delta_writer.cpp      |   2 +-
 be/src/cloud/cloud_meta_mgr.cpp          |   6 +-
 be/src/cloud/cloud_meta_mgr.h            |   4 +-
 be/src/cloud/cloud_rowset_builder.cpp    |   2 +-
 be/src/cloud/cloud_schema_change_job.cpp |   4 +-
 be/src/olap/compaction.cpp               |   5 +-
 cloud/src/common/config.h                |   2 +
 cloud/src/meta-service/meta_service.cpp  |  75 ++++++-
 cloud/test/meta_service_test.cpp         | 337 +++++++++++++++++++++++++++++++
 gensrc/proto/cloud.proto                 |   2 +
 11 files changed, 429 insertions(+), 12 deletions(-)

diff --git a/be/src/cloud/cloud_delete_task.cpp 
b/be/src/cloud/cloud_delete_task.cpp
index 5698fb632cd..cf7a6a371bc 100644
--- a/be/src/cloud/cloud_delete_task.cpp
+++ b/be/src/cloud/cloud_delete_task.cpp
@@ -94,7 +94,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, 
const TPushReq& requ
     RETURN_IF_ERROR(rowset_writer->build(rowset));
     rowset->rowset_meta()->set_delete_predicate(std::move(del_pred));
 
-    auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta());
+    auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
 
     // Update tablet stats
     tablet->fetch_add_approximate_num_rowsets(1);
diff --git a/be/src/cloud/cloud_delta_writer.cpp 
b/be/src/cloud/cloud_delta_writer.cpp
index f186bc90f12..6fac6a13873 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -113,7 +113,7 @@ Status CloudDeltaWriter::commit_rowset() {
         RETURN_IF_ERROR(_rowset_builder->init());
         RETURN_IF_ERROR(_rowset_builder->build_rowset());
     }
-    return _engine.meta_mgr().commit_rowset(*rowset_meta());
+    return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
 }
 
 Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 73c9d0d72a8..245eca2d542 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -908,7 +908,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* 
tablet, int64_t old_
     return Status::OK();
 }
 
-Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
+Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const 
std::string& job_id,
                                     RowsetMetaSharedPtr* existed_rs_meta) {
     VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
                << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << 
rs_meta.txn_id();
@@ -920,6 +920,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& 
rs_meta,
     CreateRowsetResponse resp;
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_txn_id(rs_meta.txn_id());
+    req.set_tablet_job_id(job_id);
 
     RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
     doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), 
std::move(doris_rs_meta));
@@ -937,7 +938,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& 
rs_meta,
     return st;
 }
 
-Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
+Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const 
std::string& job_id,
                                    RowsetMetaSharedPtr* existed_rs_meta) {
     VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
                << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << 
rs_meta.txn_id();
@@ -950,6 +951,7 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& 
rs_meta,
     CreateRowsetResponse resp;
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_txn_id(rs_meta.txn_id());
+    req.set_tablet_job_id(job_id);
 
     RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
     doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), 
std::move(rs_meta_pb));
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 421e0b28bf3..3467262c1d4 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -69,10 +69,10 @@ public:
             CloudTablet* tablet, std::unique_lock<bthread::Mutex>& lock /* 
_sync_meta_lock */,
             const SyncOptions& options = {}, SyncRowsetStats* sync_stats = 
nullptr);
 
-    Status prepare_rowset(const RowsetMeta& rs_meta,
+    Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
                           std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr);
 
-    Status commit_rowset(const RowsetMeta& rs_meta,
+    Status commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
                          std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr);
 
     Status update_tmp_rowset(const RowsetMeta& rs_meta);
diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index 389b6c7c682..d135292a781 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -81,7 +81,7 @@ Status CloudRowsetBuilder::init() {
 
     _calc_delete_bitmap_token = 
_engine.calc_delete_bitmap_executor()->create_token();
 
-    
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta()));
+    
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(),
 ""));
 
     _is_init = true;
     return Status::OK();
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 0fa47d76cbf..66daee8fa79 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -313,7 +313,7 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
 
         RowsetMetaSharedPtr existed_rs_meta;
         auto st = 
_cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(),
-                                                                  
&existed_rs_meta);
+                                                                  _job_id, 
&existed_rs_meta);
         if (!st.ok()) {
             if (st.is<ALREADY_EXIST>()) {
                 LOG(INFO) << "Rowset " << rs_reader->version() << " has 
already existed in tablet "
@@ -348,7 +348,7 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
                                          st.to_string());
         }
 
-        st = 
_cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(),
+        st = 
_cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), 
_job_id,
                                                             &existed_rs_meta);
         if (!st.ok()) {
             if (st.is<ALREADY_EXIST>()) {
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 89782771168..026daa38c97 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1434,7 +1434,7 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t 
permits) {
     // Currently, updates are only made in the time_series.
     update_compaction_level();
 
-    
RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));
+    
RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get(),
 _uuid));
 
     // 4. modify rowsets in memory
     RETURN_IF_ERROR(modify_rowsets());
@@ -1511,7 +1511,8 @@ Status 
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
                             compaction_type() == 
ReaderType::READER_BASE_COMPACTION);
     ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
     _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, 
_is_vertical));
-    
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get()));
+    RETURN_IF_ERROR(
+            
_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get(), 
_uuid));
     return Status::OK();
 }
 
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 6bc2d73f700..f021e2c20c2 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -227,6 +227,8 @@ CONF_mBool(enable_distinguish_hdfs_path, "true");
 // If enabled, the txn status will be checked when preapre/commit rowset
 CONF_mBool(enable_load_txn_status_check, "true");
 
+CONF_mBool(enable_tablet_job_check, "true");
+
 // Declare a selection strategy for those servers have many ips.
 // Note that there should at most one ip match this list.
 // this is a list in semicolon-delimited format, in CIDR notation,
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 3dcf166c0b2..73522541a08 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -38,6 +38,7 @@
 #include <ios>
 #include <limits>
 #include <memory>
+#include <ostream>
 #include <sstream>
 #include <string>
 #include <tuple>
@@ -990,6 +991,60 @@ static void fill_schema_from_dict(MetaServiceCode& code, 
std::string& msg,
     existed_rowset_meta->CopyFrom(metas.Get(0));
 }
 
+bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string& 
msg,
+                       const std::string& instance_id, int64_t tablet_id,
+                       const std::string& rowset_id, const std::string& 
job_id) {
+    TabletIndexPB tablet_idx;
+    get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx);
+    if (code != MetaServiceCode::OK) {
+        return false;
+    }
+
+    std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), 
tablet_idx.index_id(),
+                                          tablet_idx.partition_id(), 
tablet_id});
+    std::string job_val;
+    auto err = txn->get(job_key, &job_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        std::stringstream ss;
+        ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : 
"internal error,")
+           << " instance_id=" << instance_id << " tablet_id=" << tablet_id
+           << " rowset_id=" << rowset_id << " err=" << err;
+        msg = ss.str();
+        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::STALE_PREPARE_ROWSET
+                                                      : 
cast_as<ErrCategory::READ>(err);
+        return false;
+    }
+
+    TabletJobInfoPB job_pb;
+    job_pb.ParseFromString(job_val);
+    bool match = false;
+    if (!job_pb.compaction().empty()) {
+        for (auto c : job_pb.compaction()) {
+            if (c.id() == job_id) {
+                match = true;
+            }
+        }
+    }
+
+    if (job_pb.has_schema_change()) {
+        if (job_pb.schema_change().id() == job_id) {
+            match = true;
+        }
+    }
+
+    if (!match) {
+        std::stringstream ss;
+        ss << " stale perpare rowset request,"
+           << " instance_id=" << instance_id << " tablet_id=" << tablet_id << 
" job id=" << job_id
+           << " rowset_id=" << rowset_id;
+        msg = ss.str();
+        code = MetaServiceCode::STALE_PREPARE_ROWSET;
+        return false;
+    }
+
+    return true;
+}
+
 /**
 * Check if the transaction status is as expected.
 * If the transaction is not in the expected state, return false and set the 
error code and message.
@@ -1099,6 +1154,15 @@ void 
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
         return;
     }
 
+    // Check if the compaction/sc tablet job has finished
+    if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
+        !request->tablet_job_id().empty()) {
+        if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, 
rowset_id,
+                               request->tablet_job_id())) {
+            return;
+        }
+    }
+
     // Check if the prepare rowset request is invalid.
     // If the transaction has been finished, it means this prepare rowset is a 
timeout retry request.
     // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
@@ -1236,6 +1300,15 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
         return;
     }
 
+    // Check if the compaction/sc tablet job has finished
+    if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
+        !request->tablet_job_id().empty()) {
+        if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, 
rowset_id,
+                               request->tablet_job_id())) {
+            return;
+        }
+    }
+
     // Check if the commit rowset request is invalid.
     // If the transaction has been finished, it means this commit rowset is a 
timeout retry request.
     // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
@@ -3595,4 +3668,4 @@ void 
MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control
     response->mutable_schema_dict()->Swap(&schema_dict);
 }
 
-} // namespace doris::cloud
\ No newline at end of file
+} // namespace doris::cloud
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 31dd25eab75..f03f434eded 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -320,6 +320,32 @@ static void add_tablet_metas(MetaServiceProxy* 
meta_service, std::string instanc
     ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
 }
 
+static void start_compaction_job(MetaService* meta_service, int64_t tablet_id,
+                                 const std::string& job_id, const std::string& 
initiator,
+                                 int base_compaction_cnt, int 
cumu_compaction_cnt,
+                                 TabletCompactionJobPB::CompactionType type,
+                                 StartTabletJobResponse& res,
+                                 std::pair<int64_t, int64_t> input_version = 
{0, 0}) {
+    brpc::Controller cntl;
+    StartTabletJobRequest req;
+    req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+    auto compaction = req.mutable_job()->add_compaction();
+    compaction->set_id(job_id);
+    compaction->set_initiator(initiator);
+    compaction->set_base_compaction_cnt(base_compaction_cnt);
+    compaction->set_cumulative_compaction_cnt(cumu_compaction_cnt);
+    compaction->set_type(type);
+    long now = time(nullptr);
+    compaction->set_expiration(now + 12);
+    compaction->set_lease(now + 3);
+    if (input_version.second > 0) {
+        compaction->add_input_versions(input_version.first);
+        compaction->add_input_versions(input_version.second);
+        compaction->set_check_input_versions_range(true);
+    }
+    meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+};
+
 TEST(MetaServiceTest, GetInstanceIdTest) {
     extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& 
rc_mgr,
                                        const std::string& cloud_unique_id);
@@ -9594,6 +9620,317 @@ TEST(MetaServiceTest, AddObjInfoWithRole) {
     SyncPoint::get_instance()->clear_all_call_backs();
 }
 
+TEST(MetaServiceTest, CheckJobExisted) {
+    auto meta_service = get_meta_service();
+
+    std::string instance_id = "check_job_existed_instance_id";
+    auto sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer(
+            (int*)0x01, [](int*) { 
SyncPoint::get_instance()->clear_all_call_backs(); });
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    // OK
+    {
+        constexpr auto table_id = 952701, index_id = 952702, partition_id = 
952703,
+                       tablet_id = 952704;
+        int64_t txn_id = 952705;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("compaction1");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
+        res.Clear();
+    }
+
+    // job does not exist,
+    {
+        constexpr auto table_id = 952801, index_id = 952802, partition_id = 
952803,
+                       tablet_id = 952804;
+        int64_t txn_id = 952805;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("compaction1");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) 
<< res.status().msg();
+        res.Clear();
+    }
+
+    // compaction job exists, job id not match
+    {
+        constexpr auto table_id = 952901, index_id = 952902, partition_id = 
952903,
+                       tablet_id = 952904;
+        int64_t txn_id = 952905;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("compaction2");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) 
<< res.status().msg();
+        res.Clear();
+    }
+
+    // do not set job id
+    {
+        constexpr auto table_id = 953501, index_id = 953502, partition_id = 
953503,
+                       tablet_id = 953504;
+        int64_t txn_id = 953505;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
+        res.Clear();
+    }
+
+    // job id is empty string
+    {
+        constexpr auto table_id = 953601, index_id = 953602, partition_id = 
953603,
+                       tablet_id = 953604;
+        int64_t txn_id = 953605;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->prepare_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
+        res.Clear();
+    }
+
+    // commit rowset OK
+    {
+        constexpr auto table_id = 953001, index_id = 953002, partition_id = 
953003,
+                       tablet_id = 953004;
+        int64_t txn_id = 953005;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("compaction1");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
+        res.Clear();
+    }
+
+    // commit rowset, job does not exist,
+    {
+        constexpr auto table_id = 953101, index_id = 953102, partition_id = 
953103,
+                       tablet_id = 953104;
+        int64_t txn_id = 952805;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("compaction1");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) 
<< res.status().msg();
+        res.Clear();
+    }
+
+    // commit rowset, compaction job exists, job id not match
+    {
+        constexpr auto table_id = 953201, index_id = 953202, partition_id = 
953203,
+                       tablet_id = 953204;
+        int64_t txn_id = 952905;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("compaction2");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) 
<< res.status().msg();
+        res.Clear();
+    }
+
+    // do not set job id when commit rowset
+    {
+        constexpr auto table_id = 953301, index_id = 953302, partition_id = 
953303,
+                       tablet_id = 953304;
+        int64_t txn_id = 953305;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
+        res.Clear();
+    }
+
+    // job id is empty string when commit rowset
+    {
+        constexpr auto table_id = 953401, index_id = 953402, partition_id = 
953403,
+                       tablet_id = 953404;
+        int64_t txn_id = 953405;
+        std::string label = "update_rowset_meta_test_label1";
+        CreateRowsetResponse res;
+
+        ASSERT_NO_FATAL_FAILURE(
+                create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id));
+
+        auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+
+        {
+            StartTabletJobResponse res;
+            start_compaction_job(meta_service.get(), tablet_id, "compaction1", 
"ip:port", 0, 0,
+                                 TabletCompactionJobPB::BASE, res);
+        }
+
+        brpc::Controller cntl;
+        auto arena = res.GetArena();
+        auto req = 
google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena);
+        req->set_tablet_job_id("");
+        req->mutable_rowset_meta()->CopyFrom(rowset);
+        meta_service->commit_rowset(&cntl, req, &res, nullptr);
+        if (!arena) delete req;
+
+        ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
+        res.Clear();
+    }
+}
+
 TEST(MetaServiceTest, StalePrepareRowset) {
     auto meta_service = get_meta_service();
 
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 521ffafc517..d6761a395a3 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -976,6 +976,7 @@ message CreateRowsetRequest {
     optional doris.RowsetMetaCloudPB rowset_meta = 2;
     optional bool temporary = 3;
     optional int64 txn_id = 4;
+    optional string tablet_job_id = 5;
 }
 
 message CreateRowsetResponse {
@@ -1376,6 +1377,7 @@ enum MetaServiceCode {
     VERSION_NOT_FOUND = 2010;
     TABLET_NOT_FOUND = 2011;
     STALE_TABLET_CACHE = 2012;
+    STALE_PREPARE_ROWSET = 2013;
 
     CLUSTER_NOT_FOUND = 3001;
     ALREADY_EXISTED = 3002;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to