liaoxin01 commented on code in PR #38474:
URL: https://github.com/apache/doris/pull/38474#discussion_r1703472480


##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -644,6 +644,58 @@ void 
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
     }
 }
 
+void MetaServiceImpl::reset_progress(::google::protobuf::RpcController* 
controller,
+                                     const ResetProgressRequest* request,
+                                     ResetProgressResponse* response,
+                                     ::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(reset_progress);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(reset_progress)
+
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    if (!request->has_db_id() || !request->has_job_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty db_id or job_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+
+    int64_t db_id = request->db_id();
+    int64_t job_id = request->job_id();
+    std::string rl_progress_key;
+    std::string rl_progress_val;
+    RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id};
+    rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key);
+    txn->remove(rl_progress_key);
+    err = txn->commit();
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {

Review Comment:
   can we return success when key not found ?



##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java:
##########
@@ -713,22 +713,35 @@ private void modifyPropertiesInternal(Map<String, String> 
jobProperties,
                 customKafkaProperties = 
dataSourceProperties.getCustomKafkaProperties();
             }
 
-            // modify partition offset first
-            if (!kafkaPartitionOffsets.isEmpty()) {
-                // we can only modify the partition that is being consumed
-                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
-            }
-
+            // convertCustomProperties and check partitions before reset 
progress to make modify operation atomic
             if (!customKafkaProperties.isEmpty()) {
                 this.customProperties.putAll(customKafkaProperties);
                 convertCustomProperties(true);
             }
-            // modify broker list and topic
-            if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
-                this.brokerList = dataSourceProperties.getBrokerList();
+
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                ((KafkaProgress) 
progress).checkPartitions(kafkaPartitionOffsets);
             }
+
+            // It is necessary to reset the Kafka progress cache if topic 
change,
+            // and should reset cache before modifying partition offset.
             if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
+                if (Config.isCloudMode()) {
+                    resetCloudProgress();
+                }
                 this.topic = dataSourceProperties.getTopic();
+                this.progress = new KafkaProgress();
+            }
+
+            // modify partition offset
+            if (!kafkaPartitionOffsets.isEmpty()) {
+                // we can only modify the partition that is being consumed
+                ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);

Review Comment:
   Do we need to modify the progress  of meta service in cloud mode?



##########
gensrc/proto/cloud.proto:
##########
@@ -1513,6 +1523,7 @@ service MetaService {
 
     // routine load progress
     rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns 
(GetRLTaskCommitAttachResponse);
+    rpc reset_progress(ResetProgressRequest) returns (ResetProgressResponse);

Review Comment:
   ```suggestion
       rpc reset_rl_progress(ResetRLProgressRequest) returns 
(ResetRLProgressResponse);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to