This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2c4d8f3e9cae8e9f447d0b96e6e646c1fb65e429 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Fri Jul 26 10:17:50 2024 +0800 [fix](cloud-schema-change) Write schema change jobs to a deserialized pb buffer rather than a new one (#38210) ## Proposed changes Currently, when starting a schema change job, job info will be persists in an new created protobuf struct rather than the original stored one, which will lead to compation job losing and result in `failed to lease compaction job`. Therefore, write the newly created schema job info into the deserialized proto buffer of the original job info to fix the job info lose. --- cloud/src/meta-service/meta_service_job.cpp | 5 ++ cloud/test/meta_service_job_test.cpp | 71 +++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index f8f4faf1edf..1886d4bdf53 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -297,6 +297,11 @@ void start_schema_change_job(MetaServiceCode& code, std::string& msg, std::strin code = cast_as<ErrCategory::READ>(err); return; } + if (!job_pb.ParseFromString(job_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "pb deserialization failed"; + return; + } job_pb.mutable_idx()->CopyFrom(request->job().idx()); // FE can ensure that a tablet does not have more than one schema_change job at the same time, // so we can directly preempt previous schema_change job. diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index fabfdb13a9d..250cf43ea98 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -23,13 +23,18 @@ #include <cstdint> #include <cstdlib> +#include <ctime> +#include <functional> #include <limits> +#include <memory> #include <random> +#include <string> #include "common/util.h" #include "cpp/sync_point.h" #include "meta-service/keys.h" #include "meta-service/meta_service.h" +#include "meta-service/txn_kv.h" #include "meta-service/txn_kv_error.h" namespace doris::cloud { @@ -2223,4 +2228,70 @@ TEST(MetaServiceJobTest, ParallelCumuCompactionTest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); } +TEST(MetaServiceJobTest, SchemaChangeJobPersistTest) { + auto meta_service = get_meta_service(); + + auto* sp = SyncPoint::get_instance(); + std::unique_ptr<int, std::function<void(int*)>> defer( + (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret<std::string>(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + brpc::Controller cntl; + + int64_t table_id = 1; + int64_t index_id = 2; + int64_t partition_id = 3; + int64_t tablet_id = 4; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false)); + + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 0, + TabletCompactionJobPB::CUMULATIVE, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + res.Clear(); + + std::unique_ptr<Transaction> txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string job_key = + job_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + std::string job_val; + TabletJobInfoPB job_pb; + ASSERT_EQ(txn->get(job_key, &job_val), TxnErrorCode::TXN_OK); + ASSERT_TRUE(job_pb.ParseFromString(job_val)); + ASSERT_EQ(job_pb.compaction_size(), 1); + ASSERT_EQ(job_pb.compaction(0).id(), "job1"); + ASSERT_EQ(job_pb.compaction(0).initiator(), "BE1"); + + int64_t new_tablet_id = 11; + ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id, + new_tablet_id, false, true)); + ASSERT_NO_FATAL_FAILURE(start_schema_change_job(meta_service.get(), table_id, index_id, + partition_id, tablet_id, new_tablet_id, "job2", + "BE1")); + + long now = time(nullptr); + FinishTabletJobRequest req; + FinishTabletJobResponse finish_res_2; + req.set_action(FinishTabletJobRequest::LEASE); + auto* compaction = req.mutable_job()->add_compaction(); + compaction->set_id("job1"); + compaction->set_initiator("BE1"); + compaction->set_lease(now + 10); + req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); + meta_service->finish_tablet_job(&cntl, &req, &finish_res_2, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + FinishTabletJobResponse finish_res; + finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job1", "BE1", {}, + finish_res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); +} + } // namespace doris::cloud --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org