This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new ebdb3def4f6 branch-4.0: [enhance](streaming job) recycle streaming job 
kv in cloud mode #56651 (#56810)
ebdb3def4f6 is described below

commit ebdb3def4f6081fc70610d26110925cf93d1fd14
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 17 17:30:38 2025 +0800

    branch-4.0: [enhance](streaming job) recycle streaming job kv in cloud mode 
#56651 (#56810)
    
    Cherry-picked from #56651
    
    Co-authored-by: hui lai <[email protected]>
---
 cloud/src/common/bvars.cpp                         |  5 +++
 cloud/src/common/bvars.h                           |  3 ++
 cloud/src/meta-service/meta_service.h              | 12 ++++++
 cloud/src/meta-service/meta_service_txn.cpp        | 44 +++++++++++++++++++++
 cloud/test/meta_service_job_test.cpp               | 45 ++++++++++++++++++++++
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |  5 +++
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |  5 +++
 .../org/apache/doris/job/manager/JobManager.java   | 27 +++++++++++++
 gensrc/proto/cloud.proto                           | 12 ++++++
 9 files changed, 158 insertions(+)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 2739727c92c..4f8da0076cf 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -90,6 +90,7 @@ BvarLatencyRecorderWithTag 
g_bvar_ms_remove_delete_bitmap_update_lock("ms", "rem
 BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
 BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", 
"get_rl_task_commit_attach");
 BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms", 
"get_streaming_task_commit_attach");
+BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job("ms", 
"delete_streaming_job");
 BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", 
"reset_rl_progress");
 BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
 BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", 
"start_tablet_job");
@@ -383,6 +384,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_put_counter("rpc_kv_precommit_txn_pu
 mBvarInt64Adder 
g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_rl_task_commit_attach_get_counter",{"instance_id"});
 // get_streaming_task_commit_attach
 mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
+// delete_streaming_job
+mBvarInt64Adder 
g_bvar_rpc_kv_delete_streaming_job_del_counter("rpc_kv_delete_streaming_job_del_counter",{"instance_id"});
 // reset_rl_progress
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
@@ -573,6 +576,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_put_bytes("rpc_kv_precommit_txn_put_
 mBvarInt64Adder 
g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl_task_commit_attach_get_bytes",{"instance_id"});
 // get_streaming_task_commit_attach
 mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
+// delete_streaming_job
+mBvarInt64Adder 
g_bvar_rpc_kv_delete_streaming_job_del_bytes("rpc_kv_delete_streaming_job_del_bytes",{"instance_id"});
 // reset_rl_progress
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 6f69a4197e9..0153248c499 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -606,6 +606,7 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_set_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
+extern BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job;
 extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
@@ -836,6 +837,7 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
 extern mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
@@ -967,6 +969,7 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
 extern mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index ce47f6a10c0..f84fa396d86 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -335,6 +335,11 @@ public:
                                           
GetStreamingTaskCommitAttachResponse* response,
                                           ::google::protobuf::Closure* done) 
override;
 
+    void delete_streaming_job(::google::protobuf::RpcController* controller,
+                              const DeleteStreamingJobRequest* request,
+                              DeleteStreamingJobResponse* response,
+                              ::google::protobuf::Closure* done) override;
+
     void reset_rl_progress(::google::protobuf::RpcController* controller,
                            const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
                            ::google::protobuf::Closure* done) override;
@@ -849,6 +854,13 @@ public:
                   response, done);
     }
 
+    void delete_streaming_job(::google::protobuf::RpcController* controller,
+                              const DeleteStreamingJobRequest* request,
+                              DeleteStreamingJobResponse* response,
+                              ::google::protobuf::Closure* done) override {
+        call_impl(&cloud::MetaService::delete_streaming_job, controller, 
request, response, done);
+    }
+
     void reset_rl_progress(::google::protobuf::RpcController* controller,
                            const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
                            ::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 1cba2e14b96..d2bede30fc4 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -901,6 +901,50 @@ void 
MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr
     }
 }
 
+void MetaServiceImpl::delete_streaming_job(::google::protobuf::RpcController* 
controller,
+                                           const DeleteStreamingJobRequest* 
request,
+                                           DeleteStreamingJobResponse* 
response,
+                                           ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(delete_streaming_job, del);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(delete_streaming_job)
+
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    if (!request->has_db_id() || !request->has_job_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "missing db_id or job_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    int64_t db_id = request->db_id();
+    int64_t job_id = request->job_id();
+    std::string key_to_delete = streaming_job_key({instance_id, db_id, 
job_id});
+
+    txn->remove(key_to_delete);
+    LOG(INFO) << "remove key=" << hex(key_to_delete);
+
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to commit delete, err=" << err;
+        msg = ss.str();
+        return;
+    }
+}
+
 void get_txn_db_id(TxnKv* txn_kv, const std::string& instance_id, int64_t 
txn_id,
                    MetaServiceCode& code, std::string& msg, int64_t* db_id, 
KVStats* stats) {
     std::stringstream ss;
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index 52359374032..a44b5d8537e 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -5214,4 +5214,49 @@ TEST(MetaServiceJobTest, 
GetStreamingTaskCommitAttachTest) {
     }
 }
 
+TEST(MetaServiceJobTest, DeleteJobKeyRemovesStreamingMeta) {
+    auto meta_service = get_meta_service(false);
+    std::string instance_id = "test_cloud_instance_id";
+    std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+    MOCK_GET_INSTANCE_ID(instance_id);
+    create_and_refresh_instance(meta_service.get(), instance_id);
+
+    int64_t db_id = 1001;
+    int64_t job_id = 2002;
+
+    // Put a dummy StreamingTaskCommitAttachmentPB under streaming_job_key
+    StreamingTaskCommitAttachmentPB attach;
+    attach.set_job_id(job_id);
+    attach.set_scanned_rows(123);
+    attach.set_load_bytes(456);
+    attach.set_num_files(3);
+    attach.set_file_bytes(789);
+    std::string val = attach.SerializeAsString();
+
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string key = streaming_job_key({instance_id, db_id, job_id});
+    txn->put(key, val);
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    // Act: call delete_streaming_job
+    brpc::Controller cntl;
+    DeleteStreamingJobRequest req;
+    DeleteStreamingJobResponse res;
+    req.set_cloud_unique_id(cloud_unique_id);
+    req.set_db_id(db_id);
+    req.set_job_id(job_id);
+    meta_service->delete_streaming_job(&cntl, &req, &res, nullptr);
+
+    // Assert: RPC returns OK
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+    // And the key is gone
+    std::unique_ptr<Transaction> check_txn;
+    ASSERT_EQ(meta_service->txn_kv()->create_txn(&check_txn), 
TxnErrorCode::TXN_OK);
+    std::string got;
+    TxnErrorCode err = check_txn->get(key, &got);
+    ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+}
+
 } // namespace doris::cloud
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index aa9d34bcb5b..027eed539bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -499,6 +499,11 @@ public class MetaServiceClient {
                 .getStreamingTaskCommitAttach(request);
     }
 
+    public Cloud.DeleteStreamingJobResponse 
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request) {
+        return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
+                .deleteStreamingJob(request);
+    }
+
     public Cloud.AlterInstanceResponse 
alterInstance(Cloud.AlterInstanceRequest request) {
         return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
                 .alterInstance(request);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index a843b4790ce..232e544f502 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -438,6 +438,11 @@ public class MetaServiceProxy {
         return w.executeRequest((client) -> 
client.getStreamingTaskCommitAttach(request));
     }
 
+    public Cloud.DeleteStreamingJobResponse 
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request)
+            throws RpcException {
+        return w.executeRequest((client) -> 
client.deleteStreamingJob(request));
+    }
+
     public Cloud.AlterInstanceResponse 
alterInstance(Cloud.AlterInstanceRequest request) throws RpcException {
         return w.executeRequest((client) -> client.alterInstance(request));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 8ce9fb7ea73..51132ad07ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -20,8 +20,11 @@ package org.apache.doris.job.manager;
 import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
@@ -222,6 +225,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         }
         writeLock();
         try {
+            deleteStremingJob(job);
             jobMap.remove(job.getJobId());
             if (isReplay) {
                 job.onReplayEnd(job);
@@ -235,6 +239,29 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
         }
     }
 
+    private void deleteStremingJob(AbstractJob<?, C> job) throws JobException {
+        boolean isStreamingJob = 
job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING);
+        if (!(Config.isCloudMode() && isStreamingJob)) {
+            return;
+        }
+        try {
+            long dbId = 
Env.getCurrentInternalCatalog().getDbOrDdlException(job.getCurrentDbName()).getId();
+            Cloud.DeleteStreamingJobRequest req = 
Cloud.DeleteStreamingJobRequest.newBuilder()
+                    .setCloudUniqueId(Config.cloud_unique_id)
+                    .setDbId(dbId)
+                    .setJobId(job.getJobId())
+                    .build();
+            Cloud.DeleteStreamingJobResponse resp = 
MetaServiceProxy.getInstance().deleteStreamingJob(req);
+            if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                throw new JobException("deleteJobKey failed for jobId={}, 
dbId={}, status={}",
+                        job.getJobId(), dbId, resp.getStatus());
+            }
+        } catch (Exception e) {
+            throw new JobException("deleteJobKey exception for jobId={}, 
dbName={}",
+                    job.getJobId(), job.getCurrentDbName(), e);
+        }
+    }
+
     public void alterJobStatus(Long jobId, JobStatus status) throws 
JobException {
         checkJobExist(jobId);
         jobMap.get(jobId).updateJobStatus(status);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 380c9119d35..1a73cb905c2 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1912,6 +1912,17 @@ message GetStreamingTaskCommitAttachResponse {
     optional StreamingTaskCommitAttachmentPB commit_attach = 2;
 }
 
+message DeleteStreamingJobRequest {
+    optional string cloud_unique_id = 1;
+    optional int64 db_id = 2;
+    optional int64 job_id = 3;
+    optional string request_ip = 4;
+}
+
+message DeleteStreamingJobResponse {
+    optional MetaServiceResponseStatus status = 1;
+}
+
 message CheckKeyInfos {
     repeated int64 db_ids = 1;
     repeated int64 table_ids = 2;
@@ -2121,6 +2132,7 @@ service MetaService {
 
     // streaming job meta
     rpc get_streaming_task_commit_attach(GetStreamingTaskCommitAttachRequest) 
returns (GetStreamingTaskCommitAttachResponse);
+    rpc delete_streaming_job(DeleteStreamingJobRequest) returns 
(DeleteStreamingJobResponse);
 
     // check KV
     rpc check_kv(CheckKVRequest) returns (CheckKVResponse);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to