This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2c4d8f3e9cae8e9f447d0b96e6e646c1fb65e429
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Fri Jul 26 10:17:50 2024 +0800

    [fix](cloud-schema-change) Write schema change jobs to a deserialized pb 
buffer rather than a new one (#38210)
    
    ## Proposed changes
    
    Currently, when starting a schema change job, job info will be persists
    in an new created protobuf struct rather than the original stored one,
    which will lead to compation job losing and result in `failed to lease
    compaction job`.
    
    Therefore, write the newly created schema job info into the deserialized
    proto buffer of the original job info to fix the job info lose.
---
 cloud/src/meta-service/meta_service_job.cpp |  5 ++
 cloud/test/meta_service_job_test.cpp        | 71 +++++++++++++++++++++++++++++
 2 files changed, 76 insertions(+)

diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index f8f4faf1edf..1886d4bdf53 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -297,6 +297,11 @@ void start_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::strin
         code = cast_as<ErrCategory::READ>(err);
         return;
     }
+    if (!job_pb.ParseFromString(job_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        msg = "pb deserialization failed";
+        return;
+    }
     job_pb.mutable_idx()->CopyFrom(request->job().idx());
     // FE can ensure that a tablet does not have more than one schema_change 
job at the same time,
     // so we can directly preempt previous schema_change job.
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index fabfdb13a9d..250cf43ea98 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -23,13 +23,18 @@
 
 #include <cstdint>
 #include <cstdlib>
+#include <ctime>
+#include <functional>
 #include <limits>
+#include <memory>
 #include <random>
+#include <string>
 
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/keys.h"
 #include "meta-service/meta_service.h"
+#include "meta-service/txn_kv.h"
 #include "meta-service/txn_kv_error.h"
 
 namespace doris::cloud {
@@ -2223,4 +2228,70 @@ TEST(MetaServiceJobTest, ParallelCumuCompactionTest) {
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
 }
 
+TEST(MetaServiceJobTest, SchemaChangeJobPersistTest) {
+    auto meta_service = get_meta_service();
+
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer(
+            (int*)0x01, [](int*) { 
SyncPoint::get_instance()->clear_all_call_backs(); });
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    brpc::Controller cntl;
+
+    int64_t table_id = 1;
+    int64_t index_id = 2;
+    int64_t partition_id = 3;
+    int64_t tablet_id = 4;
+
+    ASSERT_NO_FATAL_FAILURE(
+            create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id, false));
+
+    StartTabletJobResponse res;
+    start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 0,
+                         TabletCompactionJobPB::CUMULATIVE, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+    res.Clear();
+
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string job_key =
+            job_tablet_key({instance_id, table_id, index_id, partition_id, 
tablet_id});
+    std::string job_val;
+    TabletJobInfoPB job_pb;
+    ASSERT_EQ(txn->get(job_key, &job_val), TxnErrorCode::TXN_OK);
+    ASSERT_TRUE(job_pb.ParseFromString(job_val));
+    ASSERT_EQ(job_pb.compaction_size(), 1);
+    ASSERT_EQ(job_pb.compaction(0).id(), "job1");
+    ASSERT_EQ(job_pb.compaction(0).initiator(), "BE1");
+
+    int64_t new_tablet_id = 11;
+    ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, 
index_id, partition_id,
+                                          new_tablet_id, false, true));
+    ASSERT_NO_FATAL_FAILURE(start_schema_change_job(meta_service.get(), 
table_id, index_id,
+                                                    partition_id, tablet_id, 
new_tablet_id, "job2",
+                                                    "BE1"));
+
+    long now = time(nullptr);
+    FinishTabletJobRequest req;
+    FinishTabletJobResponse finish_res_2;
+    req.set_action(FinishTabletJobRequest::LEASE);
+    auto* compaction = req.mutable_job()->add_compaction();
+    compaction->set_id("job1");
+    compaction->set_initiator("BE1");
+    compaction->set_lease(now + 10);
+    req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+    meta_service->finish_tablet_job(&cntl, &req, &finish_res_2, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+    FinishTabletJobResponse finish_res;
+    finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, 
"job1", "BE1", {},
+                             finish_res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to