liaoxin01 commented on code in PR #38474: URL: https://github.com/apache/doris/pull/38474#discussion_r1703472480
########## cloud/src/meta-service/meta_service_txn.cpp: ########## @@ -644,6 +644,58 @@ void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle } } +void MetaServiceImpl::reset_progress(::google::protobuf::RpcController* controller, + const ResetProgressRequest* request, + ResetProgressResponse* response, + ::google::protobuf::Closure* done) { + RPC_PREPROCESS(reset_progress); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + RPC_RATE_LIMIT(reset_progress) + + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + ss << "filed to create txn, err=" << err; + msg = ss.str(); + return; + } + + if (!request->has_db_id() || !request->has_job_id()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty db_id or job_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + + int64_t db_id = request->db_id(); + int64_t job_id = request->job_id(); + std::string rl_progress_key; + std::string rl_progress_val; + RLJobProgressKeyInfo rl_progress_key_info {instance_id, db_id, job_id}; + rl_job_progress_key_info(rl_progress_key_info, &rl_progress_key); + txn->remove(rl_progress_key); + err = txn->commit(); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { Review Comment: can we return success when key not found ? ########## fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java: ########## @@ -713,22 +713,35 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties, customKafkaProperties = dataSourceProperties.getCustomKafkaProperties(); } - // modify partition offset first - if (!kafkaPartitionOffsets.isEmpty()) { - // we can only modify the partition that is being consumed - ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); - } - + // convertCustomProperties and check partitions before reset progress to make modify operation atomic if (!customKafkaProperties.isEmpty()) { this.customProperties.putAll(customKafkaProperties); convertCustomProperties(true); } - // modify broker list and topic - if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) { - this.brokerList = dataSourceProperties.getBrokerList(); + + if (!kafkaPartitionOffsets.isEmpty()) { + ((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets); } + + // It is necessary to reset the Kafka progress cache if topic change, + // and should reset cache before modifying partition offset. if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) { + if (Config.isCloudMode()) { + resetCloudProgress(); + } this.topic = dataSourceProperties.getTopic(); + this.progress = new KafkaProgress(); + } + + // modify partition offset + if (!kafkaPartitionOffsets.isEmpty()) { + // we can only modify the partition that is being consumed + ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); Review Comment: Do we need to modify the progress of meta service in cloud mode? ########## gensrc/proto/cloud.proto: ########## @@ -1513,6 +1523,7 @@ service MetaService { // routine load progress rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse); + rpc reset_progress(ResetProgressRequest) returns (ResetProgressResponse); Review Comment: ```suggestion rpc reset_rl_progress(ResetRLProgressRequest) returns (ResetRLProgressResponse); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org