This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e7d4160f1b [feature](merge-cloud) Get routine load progress info from 
meta service (#32532)
9e7d4160f1b is described below

commit 9e7d4160f1b0d4bf955df9acc93fc196a50c43bd
Author: walter <w41te...@gmail.com>
AuthorDate: Mon Mar 25 23:02:43 2024 +0800

    [feature](merge-cloud) Get routine load progress info from meta service 
(#32532)
    
    Co-authored-by: Luwei <814383...@qq.com>
    Co-authored-by: Gavin Chou <gavineaglec...@gmail.com>
---
 cloud/src/common/bvars.cpp                         |   1 +
 cloud/src/common/bvars.h                           |   1 +
 cloud/src/meta-service/meta_service.h              |  12 ++
 cloud/src/meta-service/meta_service_txn.cpp        | 238 ++++++++++++++-------
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |  11 +
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |  15 ++
 .../load/routineload/KafkaRoutineLoadJob.java      |  33 +++
 .../doris/load/routineload/RoutineLoadJob.java     |   2 +
 .../load/routineload/RoutineLoadScheduler.java     |   6 +
 gensrc/proto/cloud.proto                           |  24 +++
 10 files changed, 263 insertions(+), 80 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 702dea86502..ab0b5934b50 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -69,6 +69,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms", 
"get_delete_bitmap"
 BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
                                                                    
"get_delete_bitmap_update_lock");
 BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
+BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", 
"get_rl_task_commit_attach");
 
 BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", 
"start_tablet_job");
 BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms", 
"finish_tablet_job");
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 1c4c4f749b6..dbdbfa834e9 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -170,6 +170,7 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_get_delete_bitmap_update_lock;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
+extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
 
 // txn_kv's bvars
 extern bvar::LatencyRecorder g_bvar_txn_kv_get;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 08a6b0884e0..a8cd0f52148 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -251,6 +251,11 @@ public:
                             GetClusterStatusResponse* response,
                             ::google::protobuf::Closure* done) override;
 
+    void get_rl_task_commit_attach(::google::protobuf::RpcController* 
controller,
+                                   const GetRLTaskCommitAttachRequest* request,
+                                   GetRLTaskCommitAttachResponse* response,
+                                   ::google::protobuf::Closure* done) override;
+
     // ATTN: If you add a new method, please also add the corresponding 
implementation in `MetaServiceProxy`.
 
     std::pair<MetaServiceCode, std::string> get_instance_info(const 
std::string& instance_id,
@@ -574,6 +579,13 @@ public:
         call_impl(&cloud::MetaService::get_cluster_status, controller, 
request, response, done);
     }
 
+    void get_rl_task_commit_attach(::google::protobuf::RpcController* 
controller,
+                                   const GetRLTaskCommitAttachRequest* request,
+                                   GetRLTaskCommitAttachResponse* response,
+                                   ::google::protobuf::Closure* done) override 
{
+        call_impl(&cloud::MetaService::get_rl_task_commit_attach, controller, 
request, response, done);
+    }
+
 private:
     template <typename Request, typename Response>
     using MetaServiceMethod = void 
(cloud::MetaService::*)(::google::protobuf::RpcController*,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 788663c2cea..03251ee0f07 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -478,6 +478,163 @@ void 
MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controlle
     }
 }
 
+void put_routine_load_progress(MetaServiceCode& code, std::string& msg,
+                               const std::string& instance_id,
+                               const CommitTxnRequest* request,
+                               Transaction* txn, int64_t db_id) {
+    std::stringstream ss;
+    int64_t txn_id = request->txn_id();
+    if (!request->has_commit_attachment()) {
+        ss << "failed to get commit attachment from req, db_id=" << db_id
+           << " txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
+    RLTaskTxnCommitAttachmentPB commit_attachment =
+            txn_commit_attachment.rl_task_txn_commit_attachment();
+    int64_t job_id = commit_attachment.job_id();
+
+    std::string rl_progress_key;
+    std::string rl_progress_val;
+    bool prev_progress_existed = true;
+    RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
+    rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
+    TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            prev_progress_existed = false;
+        } else {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get routine load progress, db_id=" << db_id << " 
txn_id=" << txn_id
+               << " err=" << err;
+            msg = ss.str();
+            return;
+        }
+    }
+
+    RoutineLoadProgressPB prev_progress_info;
+    if (prev_progress_existed) {
+        if (!prev_progress_info.ParseFromString(rl_progress_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "failed to parse routine load progress, db_id=" << db_id
+               << " txn_id=" << txn_id;
+            msg = ss.str();
+            return;
+        }
+    }
+
+    std::string new_progress_val;
+    RoutineLoadProgressPB new_progress_info;
+    new_progress_info.CopyFrom(commit_attachment.progress());
+    for (auto const& elem : prev_progress_info.partition_to_offset()) {
+        auto it = new_progress_info.partition_to_offset().find(elem.first);
+        if (it == new_progress_info.partition_to_offset().end()) {
+            new_progress_info.mutable_partition_to_offset()->insert(elem);
+        }
+    }
+
+    std::string new_statistic_val;
+    RoutineLoadJobStatisticPB* new_statistic_info = 
new_progress_info.mutable_stat();
+    if (prev_progress_info.has_stat()) {
+        const RoutineLoadJobStatisticPB& prev_statistic_info = 
prev_progress_info.stat();
+
+        
new_statistic_info->set_filtered_rows(prev_statistic_info.filtered_rows() + 
commit_attachment.filtered_rows());
+        new_statistic_info->set_loaded_rows(prev_statistic_info.loaded_rows() 
+ commit_attachment.loaded_rows());
+        
new_statistic_info->set_unselected_rows(prev_statistic_info.unselected_rows() + 
commit_attachment.unselected_rows());
+        
new_statistic_info->set_received_bytes(prev_statistic_info.received_bytes() + 
commit_attachment.received_bytes());
+        
new_statistic_info->set_task_execution_time_ms(prev_statistic_info.task_execution_time_ms()
 + commit_attachment.task_execution_time_ms());
+    } else {
+        
new_statistic_info->set_filtered_rows(commit_attachment.filtered_rows());
+        new_statistic_info->set_loaded_rows(commit_attachment.loaded_rows());
+        
new_statistic_info->set_unselected_rows(commit_attachment.unselected_rows());
+        
new_statistic_info->set_received_bytes(commit_attachment.received_bytes());
+        
new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms());
+    }
+
+    LOG(INFO) << "routine load new progress: " << 
new_progress_info.ShortDebugString();
+
+    if (!new_progress_info.SerializeToString(&new_progress_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize new progress val, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    txn->put(rl_progress_key, new_progress_val);
+}
+
+void 
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* 
controller,
+                                                const 
GetRLTaskCommitAttachRequest* request,
+                                                GetRLTaskCommitAttachResponse* 
response,
+                                                ::google::protobuf::Closure* 
done) {
+    RPC_PREPROCESS(get_rl_task_commit_attach);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(get_rl_task_commit_attach)
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    if (!request->has_db_id() || !request->has_job_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty db_id or job_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+
+    int64_t db_id = request->db_id();
+    int64_t job_id = request->job_id();
+    std::string rl_progress_key;
+    std::string rl_progress_val;
+    RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
+    rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
+    err = txn->get(rl_progress_key, &rl_progress_val);
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        code = MetaServiceCode::ROUTINE_LOAD_PROGRESS_NOT_FOUND;
+        ss << "pregress info not found, db_id=" << db_id
+           << " job_id=" << job_id << " err=" << err;
+        msg = ss.str();
+        return;
+    } else if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get pregress info, db_id=" << db_id
+           << " job_id=" << job_id << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    RLTaskTxnCommitAttachmentPB* commit_attach = 
response->mutable_commit_attach();
+    RoutineLoadProgressPB* progress_info = commit_attach->mutable_progress();
+    if (!progress_info->ParseFromString(rl_progress_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse progress info, db_id=" << db_id << " job_id=" 
<< job_id;
+        msg = ss.str();
+        return;
+    }
+
+    if (progress_info->has_stat()) {
+        const RoutineLoadJobStatisticPB& statistic_info = 
progress_info->stat();
+        commit_attach->set_filtered_rows(statistic_info.filtered_rows());
+        commit_attach->set_loaded_rows(statistic_info.loaded_rows());
+        commit_attach->set_unselected_rows(statistic_info.unselected_rows());
+        commit_attach->set_received_bytes(statistic_info.received_bytes());
+        
commit_attach->set_task_execution_time_ms(statistic_info.task_execution_time_ms());
+    }
+}
+
 /**
  * 0. Extract txn_id from request
  * 1. Get db id from TxnKv with txn_id
@@ -977,86 +1134,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
 
     if (txn_info.load_job_source_type() ==
         LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK) {
-        if (!request->has_commit_attachment()) {
-            ss << "failed to get commit attachment from req, db_id=" << db_id
-               << " txn_id=" << txn_id;
-            msg = ss.str();
-            return;
-        }
-
-        TxnCommitAttachmentPB txn_commit_attachment = 
request->commit_attachment();
-        RLTaskTxnCommitAttachmentPB commit_attachment =
-                txn_commit_attachment.rl_task_txn_commit_attachment();
-        int64_t job_id = commit_attachment.job_id();
-
-        std::string rl_progress_key;
-        std::string rl_progress_val;
-        bool prev_progress_existed = true;
-        RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
-        rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
-        TxnErrorCode err = txn->get(rl_progress_key, &rl_progress_val);
-        if (err != TxnErrorCode::TXN_OK) {
-            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-                prev_progress_existed = false;
-            } else {
-                code = cast_as<ErrCategory::READ>(err);
-                ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" 
<< txn_id
-                   << " err=" << err;
-                msg = ss.str();
-                return;
-            }
-        }
-
-        RoutineLoadProgressPB prev_progress_info;
-        if (prev_progress_existed) {
-            if (!prev_progress_info.ParseFromString(rl_progress_val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "failed to parse txn_info, db_id=" << db_id << " 
txn_id=" << txn_id;
-                msg = ss.str();
-                return;
-            }
-
-            int cal_row_num = 0;
-            for (auto const& elem : 
commit_attachment.progress().partition_to_offset()) {
-                if (elem.second >= 0) {
-                    auto it = 
prev_progress_info.partition_to_offset().find(elem.first);
-                    if (it != prev_progress_info.partition_to_offset().end() 
&& it->second >= 0) {
-                        cal_row_num += elem.second - it->second;
-                    } else {
-                        cal_row_num += elem.second + 1;
-                    }
-                }
-            }
-
-            LOG(INFO) << " calculated row num " << cal_row_num << " actual row 
num "
-                      << commit_attachment.loaded_rows() << " prev progress "
-                      << prev_progress_info.DebugString();
-
-            if (cal_row_num == 0) {
-                LOG(WARNING) << " repeated to load task in routine load, 
db_id=" << db_id
-                             << " txn_id=" << txn_id << " calculated row num " 
<< cal_row_num
-                             << " actual row num " << 
commit_attachment.loaded_rows();
-                return;
-            }
-        }
-
-        std::string new_progress_val;
-        RoutineLoadProgressPB new_progress_info;
-        new_progress_info.CopyFrom(commit_attachment.progress());
-        for (auto const& elem : prev_progress_info.partition_to_offset()) {
-            auto it = new_progress_info.partition_to_offset().find(elem.first);
-            if (it == new_progress_info.partition_to_offset().end()) {
-                new_progress_info.mutable_partition_to_offset()->insert(elem);
-            }
-        }
-
-        if (!new_progress_info.SerializeToString(&new_progress_val)) {
-            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
-            ss << "failed to serialize new progress val, txn_id=" << 
txn_info.txn_id();
-            msg = ss.str();
-            return;
-        }
-        txn->put(rl_progress_key, new_progress_val);
+        put_routine_load_progress(code, msg, instance_id, request, txn.get(), 
db_id);
     }
 
     LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << 
" txn_id=" << txn_id;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 49cb3c20590..19949ac73d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -314,4 +314,15 @@ public class MetaServiceClient {
         }
         return blockingStub.getInstance(request);
     }
+
+    public Cloud.GetRLTaskCommitAttachResponse
+            getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request) {
+        if (!request.hasCloudUniqueId()) {
+            Cloud.GetRLTaskCommitAttachRequest.Builder builder =
+                    Cloud.GetRLTaskCommitAttachRequest.newBuilder();
+            builder.mergeFrom(request);
+            return 
blockingStub.getRlTaskCommitAttach(builder.setCloudUniqueId(Config.cloud_unique_id).build());
+        }
+        return blockingStub.getRlTaskCommitAttach(request);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 9715d831e8f..680189d4d27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -442,4 +442,19 @@ public class MetaServiceProxy {
             throw new RpcException("", e.getMessage(), e);
         }
     }
+
+    public Cloud.GetRLTaskCommitAttachResponse
+            getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
+            throws RpcException {
+        if (metaServiceHostPort == null) {
+            throw new RpcException("", "cloud mode, please configure 
cloud_unique_id and meta_service_endpoint");
+        }
+        TNetworkAddress metaAddress = new 
TNetworkAddress(metaServiceHostPort.first, metaServiceHostPort.second);
+        try {
+            final MetaServiceClient client = getProxy(metaAddress);
+            return client.getRLTaskCommitAttach(request);
+        } catch (Exception e) {
+            throw new RpcException(metaAddress.hostname, e.getMessage(), e);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index bdcfb9e4a27..1067a759e5f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -23,6 +23,8 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.InternalErrorCode;
@@ -40,6 +42,7 @@ import org.apache.doris.datasource.kafka.KafkaUtil;
 import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
 import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
@@ -69,6 +72,7 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.UUID;
 
+
 /**
  * KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka.
  * The progress which is super class property is seems like "{"partition1": 
offset1, "partition2": offset2}"
@@ -247,6 +251,35 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         }
     }
 
+    @Override
+    public void updateCloudProgress() throws UserException {
+        Cloud.GetRLTaskCommitAttachRequest.Builder builder =
+                Cloud.GetRLTaskCommitAttachRequest.newBuilder();
+        builder.setCloudUniqueId(Config.cloud_unique_id);
+        builder.setDbId(dbId);
+        builder.setJobId(id);
+
+        Cloud.GetRLTaskCommitAttachResponse response;
+        try {
+            response = 
MetaServiceProxy.getInstance().getRLTaskCommitAttach(builder.build());
+            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                LOG.warn("failed to get routine load commit attach, response: 
{}", response);
+                if (response.getStatus().getCode() == 
Cloud.MetaServiceCode.ROUTINE_LOAD_PROGRESS_NOT_FOUND) {
+                    LOG.warn("not found routine load progress, response: {}", 
response);
+                    return;
+                } else {
+                    throw new UserException(response.getStatus().getMsg());
+                }
+            }
+        } catch (RpcException e) {
+            LOG.info("failed to get routine load commit attach {}", e);
+            throw new UserException(e.getMessage());
+        }
+
+        RLTaskTxnCommitAttachment commitAttach = new 
RLTaskTxnCommitAttachment(response.getCommitAttach());
+        updateProgress(commitAttach);
+    }
+
     @Override
     public int calculateCurrentConcurrentTaskNum() {
         int partitionNum = currentKafkaPartitions.size();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 77663be058c..20c1b999d03 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -779,6 +779,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         }
     }
 
+    abstract void updateCloudProgress() throws UserException;
+
     abstract void divideRoutineLoadJob(int currentConcurrentTaskNum) throws 
UserException;
 
     public int calculateCurrentConcurrentTaskNum() throws 
MetaNotFoundException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 84f9548de13..51029c3d18b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.routineload;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -73,10 +74,15 @@ public class RoutineLoadScheduler extends MasterDaemon {
         if (!routineLoadJobList.isEmpty()) {
             LOG.info("there are {} job need schedule", 
routineLoadJobList.size());
         }
+
         for (RoutineLoadJob routineLoadJob : routineLoadJobList) {
             RoutineLoadJob.JobState errorJobState = null;
             UserException userException = null;
             try {
+                if (Config.isCloudMode()) {
+                    routineLoadJob.updateCloudProgress();
+                }
+
                 routineLoadJob.prepare();
                 // judge nums of tasks more than max concurrent tasks of 
cluster
                 int desiredConcurrentTaskNum = 
routineLoadJob.calculateCurrentConcurrentTaskNum();
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index cad9eac22ba..2db7dd1a5ef 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -258,6 +258,7 @@ message TxnCoordinatorPB {
 
 message RoutineLoadProgressPB {
     map<int32, int64> partition_to_offset = 1;
+    optional RoutineLoadJobStatisticPB stat = 2;
 }
 
 message RLTaskTxnCommitAttachmentPB {
@@ -272,6 +273,14 @@ message RLTaskTxnCommitAttachmentPB {
     optional string error_log_url = 9;
 }
 
+message RoutineLoadJobStatisticPB {
+    optional int64 filtered_rows = 1;
+    optional int64 loaded_rows = 2;
+    optional int64 unselected_rows = 3;
+    optional int64 received_bytes = 4;
+    optional int64 task_execution_time_ms = 5;
+}
+
 message TxnCommitAttachmentPB {
     enum Type {
         LODD_JOB_FINAL_OPERATION = 0;
@@ -1205,6 +1214,7 @@ enum MetaServiceCode {
     JOB_TABLET_BUSY = 5001;
     JOB_ALREADY_SUCCESS = 5002;
     ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
+    ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
 
     // Rate limit
     MAX_QPS_LIMIT = 6001;
@@ -1288,6 +1298,17 @@ message GetDeleteBitmapUpdateLockResponse {
     optional MetaServiceResponseStatus status = 1;
 }
 
+message GetRLTaskCommitAttachRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional int64 db_id = 2;
+    optional int64 job_id = 3;
+}
+
+message GetRLTaskCommitAttachResponse {
+    optional MetaServiceResponseStatus status = 1;
+    optional RLTaskTxnCommitAttachmentPB commit_attach = 2;
+}
+
 service MetaService {
     rpc begin_txn(BeginTxnRequest) returns (BeginTxnResponse);
     rpc precommit_txn(PrecommitTxnRequest) returns (PrecommitTxnResponse);
@@ -1350,6 +1371,9 @@ service MetaService {
     rpc update_delete_bitmap(UpdateDeleteBitmapRequest) 
returns(UpdateDeleteBitmapResponse);
     rpc get_delete_bitmap(GetDeleteBitmapRequest) 
returns(GetDeleteBitmapResponse);
     rpc get_delete_bitmap_update_lock(GetDeleteBitmapUpdateLockRequest) 
returns(GetDeleteBitmapUpdateLockResponse);
+
+    // routine load progress
+    rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns 
(GetRLTaskCommitAttachResponse);
 };
 
 service RecyclerService {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to