This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new ebdb3def4f6 branch-4.0: [enhance](streaming job) recycle streaming job
kv in cloud mode #56651 (#56810)
ebdb3def4f6 is described below
commit ebdb3def4f6081fc70610d26110925cf93d1fd14
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 17 17:30:38 2025 +0800
branch-4.0: [enhance](streaming job) recycle streaming job kv in cloud mode
#56651 (#56810)
Cherry-picked from #56651
Co-authored-by: hui lai <[email protected]>
---
cloud/src/common/bvars.cpp | 5 +++
cloud/src/common/bvars.h | 3 ++
cloud/src/meta-service/meta_service.h | 12 ++++++
cloud/src/meta-service/meta_service_txn.cpp | 44 +++++++++++++++++++++
cloud/test/meta_service_job_test.cpp | 45 ++++++++++++++++++++++
.../apache/doris/cloud/rpc/MetaServiceClient.java | 5 +++
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 5 +++
.../org/apache/doris/job/manager/JobManager.java | 27 +++++++++++++
gensrc/proto/cloud.proto | 12 ++++++
9 files changed, 158 insertions(+)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 2739727c92c..4f8da0076cf 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -90,6 +90,7 @@ BvarLatencyRecorderWithTag
g_bvar_ms_remove_delete_bitmap_update_lock("ms", "rem
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms",
"get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms",
"get_streaming_task_commit_attach");
+BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job("ms",
"delete_streaming_job");
BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms",
"reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms",
"start_tablet_job");
@@ -383,6 +384,8 @@ mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_put_counter("rpc_kv_precommit_txn_pu
mBvarInt64Adder
g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_rl_task_commit_attach_get_counter",{"instance_id"});
// get_streaming_task_commit_attach
mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
+// delete_streaming_job
+mBvarInt64Adder
g_bvar_rpc_kv_delete_streaming_job_del_counter("rpc_kv_delete_streaming_job_del_counter",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
@@ -573,6 +576,8 @@ mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_put_bytes("rpc_kv_precommit_txn_put_
mBvarInt64Adder
g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl_task_commit_attach_get_bytes",{"instance_id"});
// get_streaming_task_commit_attach
mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
+// delete_streaming_job
+mBvarInt64Adder
g_bvar_rpc_kv_delete_streaming_job_del_bytes("rpc_kv_delete_streaming_job_del_bytes",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 6f69a4197e9..0153248c499 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -606,6 +606,7 @@ extern BvarLatencyRecorderWithTag
g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
+extern BvarLatencyRecorderWithTag g_bvar_ms_delete_streaming_job;
extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
@@ -836,6 +837,7 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
extern mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
@@ -967,6 +969,7 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
extern mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_delete_streaming_job_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index ce47f6a10c0..f84fa396d86 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -335,6 +335,11 @@ public:
GetStreamingTaskCommitAttachResponse* response,
::google::protobuf::Closure* done)
override;
+ void delete_streaming_job(::google::protobuf::RpcController* controller,
+ const DeleteStreamingJobRequest* request,
+ DeleteStreamingJobResponse* response,
+ ::google::protobuf::Closure* done) override;
+
void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override;
@@ -849,6 +854,13 @@ public:
response, done);
}
+ void delete_streaming_job(::google::protobuf::RpcController* controller,
+ const DeleteStreamingJobRequest* request,
+ DeleteStreamingJobResponse* response,
+ ::google::protobuf::Closure* done) override {
+ call_impl(&cloud::MetaService::delete_streaming_job, controller,
request, response, done);
+ }
+
void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 1cba2e14b96..d2bede30fc4 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -901,6 +901,50 @@ void
MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr
}
}
+void MetaServiceImpl::delete_streaming_job(::google::protobuf::RpcController*
controller,
+ const DeleteStreamingJobRequest*
request,
+ DeleteStreamingJobResponse*
response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(delete_streaming_job, del);
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+ RPC_RATE_LIMIT(delete_streaming_job)
+
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ ss << "filed to create txn, err=" << err;
+ msg = ss.str();
+ return;
+ }
+
+ if (!request->has_db_id() || !request->has_job_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "missing db_id or job_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+ int64_t db_id = request->db_id();
+ int64_t job_id = request->job_id();
+ std::string key_to_delete = streaming_job_key({instance_id, db_id,
job_id});
+
+ txn->remove(key_to_delete);
+ LOG(INFO) << "remove key=" << hex(key_to_delete);
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to commit delete, err=" << err;
+ msg = ss.str();
+ return;
+ }
+}
+
void get_txn_db_id(TxnKv* txn_kv, const std::string& instance_id, int64_t
txn_id,
MetaServiceCode& code, std::string& msg, int64_t* db_id,
KVStats* stats) {
std::stringstream ss;
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index 52359374032..a44b5d8537e 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -5214,4 +5214,49 @@ TEST(MetaServiceJobTest,
GetStreamingTaskCommitAttachTest) {
}
}
+TEST(MetaServiceJobTest, DeleteJobKeyRemovesStreamingMeta) {
+ auto meta_service = get_meta_service(false);
+ std::string instance_id = "test_cloud_instance_id";
+ std::string cloud_unique_id = "1:test_cloud_unique_id:1";
+ MOCK_GET_INSTANCE_ID(instance_id);
+ create_and_refresh_instance(meta_service.get(), instance_id);
+
+ int64_t db_id = 1001;
+ int64_t job_id = 2002;
+
+ // Put a dummy StreamingTaskCommitAttachmentPB under streaming_job_key
+ StreamingTaskCommitAttachmentPB attach;
+ attach.set_job_id(job_id);
+ attach.set_scanned_rows(123);
+ attach.set_load_bytes(456);
+ attach.set_num_files(3);
+ attach.set_file_bytes(789);
+ std::string val = attach.SerializeAsString();
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string key = streaming_job_key({instance_id, db_id, job_id});
+ txn->put(key, val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Act: call delete_streaming_job
+ brpc::Controller cntl;
+ DeleteStreamingJobRequest req;
+ DeleteStreamingJobResponse res;
+ req.set_cloud_unique_id(cloud_unique_id);
+ req.set_db_id(db_id);
+ req.set_job_id(job_id);
+ meta_service->delete_streaming_job(&cntl, &req, &res, nullptr);
+
+ // Assert: RPC returns OK
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ // And the key is gone
+ std::unique_ptr<Transaction> check_txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&check_txn),
TxnErrorCode::TXN_OK);
+ std::string got;
+ TxnErrorCode err = check_txn->get(key, &got);
+ ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
+}
+
} // namespace doris::cloud
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index aa9d34bcb5b..027eed539bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -499,6 +499,11 @@ public class MetaServiceClient {
.getStreamingTaskCommitAttach(request);
}
+ public Cloud.DeleteStreamingJobResponse
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request) {
+ return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
+ .deleteStreamingJob(request);
+ }
+
public Cloud.AlterInstanceResponse
alterInstance(Cloud.AlterInstanceRequest request) {
return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
.alterInstance(request);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index a843b4790ce..232e544f502 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -438,6 +438,11 @@ public class MetaServiceProxy {
return w.executeRequest((client) ->
client.getStreamingTaskCommitAttach(request));
}
+ public Cloud.DeleteStreamingJobResponse
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request)
+ throws RpcException {
+ return w.executeRequest((client) ->
client.deleteStreamingJob(request));
+ }
+
public Cloud.AlterInstanceResponse
alterInstance(Cloud.AlterInstanceRequest request) throws RpcException {
return w.executeRequest((client) -> client.alterInstance(request));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 8ce9fb7ea73..51132ad07ba 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -20,8 +20,11 @@ package org.apache.doris.job.manager;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@@ -222,6 +225,7 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
writeLock();
try {
+ deleteStremingJob(job);
jobMap.remove(job.getJobId());
if (isReplay) {
job.onReplayEnd(job);
@@ -235,6 +239,29 @@ public class JobManager<T extends AbstractJob<?, C>, C>
implements Writable {
}
}
+ private void deleteStremingJob(AbstractJob<?, C> job) throws JobException {
+ boolean isStreamingJob =
job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING);
+ if (!(Config.isCloudMode() && isStreamingJob)) {
+ return;
+ }
+ try {
+ long dbId =
Env.getCurrentInternalCatalog().getDbOrDdlException(job.getCurrentDbName()).getId();
+ Cloud.DeleteStreamingJobRequest req =
Cloud.DeleteStreamingJobRequest.newBuilder()
+ .setCloudUniqueId(Config.cloud_unique_id)
+ .setDbId(dbId)
+ .setJobId(job.getJobId())
+ .build();
+ Cloud.DeleteStreamingJobResponse resp =
MetaServiceProxy.getInstance().deleteStreamingJob(req);
+ if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ throw new JobException("deleteJobKey failed for jobId={},
dbId={}, status={}",
+ job.getJobId(), dbId, resp.getStatus());
+ }
+ } catch (Exception e) {
+ throw new JobException("deleteJobKey exception for jobId={},
dbName={}",
+ job.getJobId(), job.getCurrentDbName(), e);
+ }
+ }
+
public void alterJobStatus(Long jobId, JobStatus status) throws
JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 380c9119d35..1a73cb905c2 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1912,6 +1912,17 @@ message GetStreamingTaskCommitAttachResponse {
optional StreamingTaskCommitAttachmentPB commit_attach = 2;
}
+message DeleteStreamingJobRequest {
+ optional string cloud_unique_id = 1;
+ optional int64 db_id = 2;
+ optional int64 job_id = 3;
+ optional string request_ip = 4;
+}
+
+message DeleteStreamingJobResponse {
+ optional MetaServiceResponseStatus status = 1;
+}
+
message CheckKeyInfos {
repeated int64 db_ids = 1;
repeated int64 table_ids = 2;
@@ -2121,6 +2132,7 @@ service MetaService {
// streaming job meta
rpc get_streaming_task_commit_attach(GetStreamingTaskCommitAttachRequest)
returns (GetStreamingTaskCommitAttachResponse);
+ rpc delete_streaming_job(DeleteStreamingJobRequest) returns
(DeleteStreamingJobResponse);
// check KV
rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]