This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b31648a6462 [fix](cloud) compaction and schema change potential data race when retrying prepare rowset (#51048) b31648a6462 is described below commit b31648a6462663c835fa8e0181d01e76fa5aa5fc Author: Luwei <lu...@selectdb.com> AuthorDate: Thu Jun 12 22:08:41 2025 +0800 [fix](cloud) compaction and schema change potential data race when retrying prepare rowset (#51048) related PR #51129 --- be/src/cloud/cloud_delete_task.cpp | 2 +- be/src/cloud/cloud_delta_writer.cpp | 2 +- be/src/cloud/cloud_meta_mgr.cpp | 6 +- be/src/cloud/cloud_meta_mgr.h | 4 +- be/src/cloud/cloud_rowset_builder.cpp | 2 +- be/src/cloud/cloud_schema_change_job.cpp | 4 +- be/src/olap/compaction.cpp | 5 +- cloud/src/common/config.h | 2 + cloud/src/meta-service/meta_service.cpp | 75 ++++++- cloud/test/meta_service_test.cpp | 337 +++++++++++++++++++++++++++++++ gensrc/proto/cloud.proto | 2 + 11 files changed, 429 insertions(+), 12 deletions(-) diff --git a/be/src/cloud/cloud_delete_task.cpp b/be/src/cloud/cloud_delete_task.cpp index 5698fb632cd..cf7a6a371bc 100644 --- a/be/src/cloud/cloud_delete_task.cpp +++ b/be/src/cloud/cloud_delete_task.cpp @@ -94,7 +94,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ RETURN_IF_ERROR(rowset_writer->build(rowset)); rowset->rowset_meta()->set_delete_predicate(std::move(del_pred)); - auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta()); + auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), ""); // Update tablet stats tablet->fetch_add_approximate_num_rowsets(1); diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index f186bc90f12..6fac6a13873 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -113,7 +113,7 @@ Status CloudDeltaWriter::commit_rowset() { RETURN_IF_ERROR(_rowset_builder->init()); RETURN_IF_ERROR(_rowset_builder->build_rowset()); } - return _engine.meta_mgr().commit_rowset(*rowset_meta()); + return _engine.meta_mgr().commit_rowset(*rowset_meta(), ""); } Status CloudDeltaWriter::set_txn_related_delete_bitmap() { diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 73c9d0d72a8..245eca2d542 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -908,7 +908,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_ return Status::OK(); } -Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, +Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); @@ -920,6 +920,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); req.set_txn_id(rs_meta.txn_id()); + req.set_tablet_job_id(job_id); RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true); doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta)); @@ -937,7 +938,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, return st; } -Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, +Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); @@ -950,6 +951,7 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); req.set_txn_id(rs_meta.txn_id()); + req.set_tablet_job_id(job_id); RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(); doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index 421e0b28bf3..3467262c1d4 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -69,10 +69,10 @@ public: CloudTablet* tablet, std::unique_lock<bthread::Mutex>& lock /* _sync_meta_lock */, const SyncOptions& options = {}, SyncRowsetStats* sync_stats = nullptr); - Status prepare_rowset(const RowsetMeta& rs_meta, + Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr); - Status commit_rowset(const RowsetMeta& rs_meta, + Status commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id, std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr); Status update_tmp_rowset(const RowsetMeta& rs_meta); diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index 389b6c7c682..d135292a781 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -81,7 +81,7 @@ Status CloudRowsetBuilder::init() { _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); - RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta())); + RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), "")); _is_init = true; return Status::OK(); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 0fa47d76cbf..66daee8fa79 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -313,7 +313,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam RowsetMetaSharedPtr existed_rs_meta; auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(), - &existed_rs_meta); + _job_id, &existed_rs_meta); if (!st.ok()) { if (st.is<ALREADY_EXIST>()) { LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet " @@ -348,7 +348,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam st.to_string()); } - st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), + st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id, &existed_rs_meta); if (!st.ok()) { if (st.is<ALREADY_EXIST>()) { diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 89782771168..026daa38c97 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1434,7 +1434,7 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { // Currently, updates are only made in the time_series. update_compaction_level(); - RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get())); + RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get(), _uuid)); // 4. modify rowsets in memory RETURN_IF_ERROR(modify_rowsets()); @@ -1511,7 +1511,8 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& compaction_type() == ReaderType::READER_BASE_COMPACTION); ctx.file_cache_ttl_sec = _tablet->ttl_seconds(); _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); - RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get())); + RETURN_IF_ERROR( + _engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get(), _uuid)); return Status::OK(); } diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 6bc2d73f700..f021e2c20c2 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -227,6 +227,8 @@ CONF_mBool(enable_distinguish_hdfs_path, "true"); // If enabled, the txn status will be checked when preapre/commit rowset CONF_mBool(enable_load_txn_status_check, "true"); +CONF_mBool(enable_tablet_job_check, "true"); + // Declare a selection strategy for those servers have many ips. // Note that there should at most one ip match this list. // this is a list in semicolon-delimited format, in CIDR notation, diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 3dcf166c0b2..73522541a08 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -38,6 +38,7 @@ #include <ios> #include <limits> #include <memory> +#include <ostream> #include <sstream> #include <string> #include <tuple> @@ -990,6 +991,60 @@ static void fill_schema_from_dict(MetaServiceCode& code, std::string& msg, existed_rowset_meta->CopyFrom(metas.Get(0)); } +bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string& msg, + const std::string& instance_id, int64_t tablet_id, + const std::string& rowset_id, const std::string& job_id) { + TabletIndexPB tablet_idx; + get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx); + if (code != MetaServiceCode::OK) { + return false; + } + + std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), + tablet_idx.partition_id(), tablet_id}); + std::string job_val; + auto err = txn->get(job_key, &job_val); + if (err != TxnErrorCode::TXN_OK) { + std::stringstream ss; + ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,") + << " instance_id=" << instance_id << " tablet_id=" << tablet_id + << " rowset_id=" << rowset_id << " err=" << err; + msg = ss.str(); + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::STALE_PREPARE_ROWSET + : cast_as<ErrCategory::READ>(err); + return false; + } + + TabletJobInfoPB job_pb; + job_pb.ParseFromString(job_val); + bool match = false; + if (!job_pb.compaction().empty()) { + for (auto c : job_pb.compaction()) { + if (c.id() == job_id) { + match = true; + } + } + } + + if (job_pb.has_schema_change()) { + if (job_pb.schema_change().id() == job_id) { + match = true; + } + } + + if (!match) { + std::stringstream ss; + ss << " stale perpare rowset request," + << " instance_id=" << instance_id << " tablet_id=" << tablet_id << " job id=" << job_id + << " rowset_id=" << rowset_id; + msg = ss.str(); + code = MetaServiceCode::STALE_PREPARE_ROWSET; + return false; + } + + return true; +} + /** * Check if the transaction status is as expected. * If the transaction is not in the expected state, return false and set the error code and message. @@ -1099,6 +1154,15 @@ void MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll return; } + // Check if the compaction/sc tablet job has finished + if (config::enable_tablet_job_check && request->has_tablet_job_id() && + !request->tablet_job_id().empty()) { + if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id, + request->tablet_job_id())) { + return; + } + } + // Check if the prepare rowset request is invalid. // If the transaction has been finished, it means this prepare rowset is a timeout retry request. // In this case, do not write the recycle key again, otherwise it may cause data loss. @@ -1236,6 +1300,15 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle return; } + // Check if the compaction/sc tablet job has finished + if (config::enable_tablet_job_check && request->has_tablet_job_id() && + !request->tablet_job_id().empty()) { + if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id, + request->tablet_job_id())) { + return; + } + } + // Check if the commit rowset request is invalid. // If the transaction has been finished, it means this commit rowset is a timeout retry request. // In this case, do not write the recycle key again, otherwise it may cause data loss. @@ -3595,4 +3668,4 @@ void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control response->mutable_schema_dict()->Swap(&schema_dict); } -} // namespace doris::cloud \ No newline at end of file +} // namespace doris::cloud diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 31dd25eab75..f03f434eded 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -320,6 +320,32 @@ static void add_tablet_metas(MetaServiceProxy* meta_service, std::string instanc ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } +static void start_compaction_job(MetaService* meta_service, int64_t tablet_id, + const std::string& job_id, const std::string& initiator, + int base_compaction_cnt, int cumu_compaction_cnt, + TabletCompactionJobPB::CompactionType type, + StartTabletJobResponse& res, + std::pair<int64_t, int64_t> input_version = {0, 0}) { + brpc::Controller cntl; + StartTabletJobRequest req; + req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); + auto compaction = req.mutable_job()->add_compaction(); + compaction->set_id(job_id); + compaction->set_initiator(initiator); + compaction->set_base_compaction_cnt(base_compaction_cnt); + compaction->set_cumulative_compaction_cnt(cumu_compaction_cnt); + compaction->set_type(type); + long now = time(nullptr); + compaction->set_expiration(now + 12); + compaction->set_lease(now + 3); + if (input_version.second > 0) { + compaction->add_input_versions(input_version.first); + compaction->add_input_versions(input_version.second); + compaction->set_check_input_versions_range(true); + } + meta_service->start_tablet_job(&cntl, &req, &res, nullptr); +}; + TEST(MetaServiceTest, GetInstanceIdTest) { extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr, const std::string& cloud_unique_id); @@ -9594,6 +9620,317 @@ TEST(MetaServiceTest, AddObjInfoWithRole) { SyncPoint::get_instance()->clear_all_call_backs(); } +TEST(MetaServiceTest, CheckJobExisted) { + auto meta_service = get_meta_service(); + + std::string instance_id = "check_job_existed_instance_id"; + auto sp = SyncPoint::get_instance(); + std::unique_ptr<int, std::function<void(int*)>> defer( + (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret<std::string>(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + // OK + { + constexpr auto table_id = 952701, index_id = 952702, partition_id = 952703, + tablet_id = 952704; + int64_t txn_id = 952705; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // job does not exist, + { + constexpr auto table_id = 952801, index_id = 952802, partition_id = 952803, + tablet_id = 952804; + int64_t txn_id = 952805; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // compaction job exists, job id not match + { + constexpr auto table_id = 952901, index_id = 952902, partition_id = 952903, + tablet_id = 952904; + int64_t txn_id = 952905; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id("compaction2"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // do not set job id + { + constexpr auto table_id = 953501, index_id = 953502, partition_id = 953503, + tablet_id = 953504; + int64_t txn_id = 953505; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // job id is empty string + { + constexpr auto table_id = 953601, index_id = 953602, partition_id = 953603, + tablet_id = 953604; + int64_t txn_id = 953605; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id(""); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // commit rowset OK + { + constexpr auto table_id = 953001, index_id = 953002, partition_id = 953003, + tablet_id = 953004; + int64_t txn_id = 953005; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // commit rowset, job does not exist, + { + constexpr auto table_id = 953101, index_id = 953102, partition_id = 953103, + tablet_id = 953104; + int64_t txn_id = 952805; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // commit rowset, compaction job exists, job id not match + { + constexpr auto table_id = 953201, index_id = 953202, partition_id = 953203, + tablet_id = 953204; + int64_t txn_id = 952905; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id("compaction2"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // do not set job id when commit rowset + { + constexpr auto table_id = 953301, index_id = 953302, partition_id = 953303, + tablet_id = 953304; + int64_t txn_id = 953305; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // job id is empty string when commit rowset + { + constexpr auto table_id = 953401, index_id = 953402, partition_id = 953403, + tablet_id = 953404; + int64_t txn_id = 953405; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); + req->set_tablet_job_id(""); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } +} + TEST(MetaServiceTest, StalePrepareRowset) { auto meta_service = get_meta_service(); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 521ffafc517..d6761a395a3 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -976,6 +976,7 @@ message CreateRowsetRequest { optional doris.RowsetMetaCloudPB rowset_meta = 2; optional bool temporary = 3; optional int64 txn_id = 4; + optional string tablet_job_id = 5; } message CreateRowsetResponse { @@ -1376,6 +1377,7 @@ enum MetaServiceCode { VERSION_NOT_FOUND = 2010; TABLET_NOT_FOUND = 2011; STALE_TABLET_CACHE = 2012; + STALE_PREPARE_ROWSET = 2013; CLUSTER_NOT_FOUND = 3001; ALREADY_EXISTED = 3002; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org