This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eccec4a7d39 [fix](routine-load) fix get kafka offset timeout may too long (#33502) eccec4a7d39 is described below commit eccec4a7d39b182c6d07aca1e133eb92f8d3d7b3 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Mon Apr 15 19:35:26 2024 +0800 [fix](routine-load) fix get kafka offset timeout may too long (#33502) --- be/src/runtime/routine_load/data_consumer.cpp | 20 ++++++++++++++------ be/src/runtime/routine_load/data_consumer.h | 4 ++-- .../routine_load/routine_load_task_executor.cpp | 10 ++++++---- .../routine_load/routine_load_task_executor.h | 6 ++++-- be/src/service/internal_service.cpp | 7 +++++-- .../org/apache/doris/datasource/kafka/KafkaUtil.java | 9 +++++---- gensrc/proto/internal_service.proto | 1 + 7 files changed, 37 insertions(+), 20 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 11c15c494fa..ccf5fb4cb25 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -349,7 +349,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids // corresponding partition. // See librdkafka/rdkafkacpp.h##offsetsForTimes() Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& times, - std::vector<PIntegerPair>* offsets) { + std::vector<PIntegerPair>* offsets, int timeout) { // create topic partition std::vector<RdKafka::TopicPartition*> topic_partitions; for (const auto& entry : times) { @@ -364,8 +364,8 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& }}; // get offsets for times - RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, 5000); - if (err != RdKafka::ERR_NO_ERROR) { + RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, timeout); + if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { std::stringstream ss; ss << "failed to get offsets for times: " << RdKafka::err2str(err); LOG(WARNING) << ss.str(); @@ -384,13 +384,21 @@ Status KafkaDataConsumer::get_offsets_for_times(const std::vector<PIntegerPair>& // get latest offsets for given partitions Status KafkaDataConsumer::get_latest_offsets_for_partitions( - const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets) { + const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* offsets, + int timeout) { + MonotonicStopWatch watch; + watch.start(); for (int32_t partition_id : partition_ids) { int64_t low = 0; int64_t high = 0; + auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 1000 / 1000); + if (UNLIKELY(timeout_ms <= 0)) { + return Status::InternalError("get kafka latest offsets for partitions timeout"); + } + RdKafka::ErrorCode err = - _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, 5000); - if (err != RdKafka::ERR_NO_ERROR) { + _k_consumer->query_watermark_offsets(_topic, partition_id, &low, &high, timeout_ms); + if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { std::stringstream ss; ss << "failed to get latest offset for partition: " << partition_id << ", err: " << RdKafka::err2str(err); diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index 0e1377681f4..f6c10467786 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -151,10 +151,10 @@ public: Status get_partition_meta(std::vector<int32_t>* partition_ids); // get offsets for times Status get_offsets_for_times(const std::vector<PIntegerPair>& times, - std::vector<PIntegerPair>* offsets); + std::vector<PIntegerPair>* offsets, int timeout); // get latest offsets for partitions Status get_latest_offsets_for_partitions(const std::vector<int32_t>& partition_ids, - std::vector<PIntegerPair>* offsets); + std::vector<PIntegerPair>* offsets, int timeout); private: std::string _brokers; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index dcf6ecbdb6a..3e8d1dddefc 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -136,7 +136,8 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe } Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times( - const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets) { + const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets, + int timeout) { CHECK(request.has_kafka_info()); // This context is meaningless, just for unifing the interface @@ -148,7 +149,7 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times( Status st = std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times( std::vector<PIntegerPair>(request.offset_times().begin(), request.offset_times().end()), - partition_offsets); + partition_offsets, timeout); if (st.ok()) { _data_consumer_pool.return_consumer(consumer); } @@ -156,7 +157,8 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times( } Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions( - const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets) { + const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets, + int timeout) { CHECK(request.has_kafka_info()); // This context is meaningless, just for unifing the interface @@ -171,7 +173,7 @@ Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions( ->get_latest_offsets_for_partitions( std::vector<int32_t>(request.partition_id_for_latest_offsets().begin(), request.partition_id_for_latest_offsets().end()), - partition_offsets); + partition_offsets, timeout); if (st.ok()) { _data_consumer_pool.return_consumer(consumer); } diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index b2a61612fe2..7ae29d60380 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -62,10 +62,12 @@ public: std::vector<int32_t>* partition_ids); Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& request, - std::vector<PIntegerPair>* partition_offsets); + std::vector<PIntegerPair>* partition_offsets, + int timeout); Status get_kafka_latest_offsets_for_partitions(const PKafkaMetaProxyRequest& request, - std::vector<PIntegerPair>* partition_offsets); + std::vector<PIntegerPair>* partition_offsets, + int timeout); private: // execute the task diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 9918980514c..aa9c207d689 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1158,6 +1158,7 @@ void PInternalService::get_info(google::protobuf::RpcController* controller, // Currently it supports 2 kinds of requests: // 1. get all kafka partition ids for given topic // 2. get all kafka partition offsets for given topic and timestamp. + int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000; if (request->has_kafka_meta_request()) { const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); if (!kafka_request.partition_id_for_latest_offsets().empty()) { @@ -1165,7 +1166,8 @@ void PInternalService::get_info(google::protobuf::RpcController* controller, std::vector<PIntegerPair> partition_offsets; Status st = _exec_env->routine_load_task_executor() ->get_kafka_latest_offsets_for_partitions( - request->kafka_meta_request(), &partition_offsets); + request->kafka_meta_request(), &partition_offsets, + timeout_ms); if (st.ok()) { PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); for (const auto& entry : partition_offsets) { @@ -1181,7 +1183,8 @@ void PInternalService::get_info(google::protobuf::RpcController* controller, std::vector<PIntegerPair> partition_offsets; Status st = _exec_env->routine_load_task_executor() ->get_kafka_partition_offsets_for_times( - request->kafka_meta_request(), &partition_offsets); + request->kafka_meta_request(), &partition_offsets, + timeout_ms); if (st.ok()) { PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); for (const auto& entry : partition_offsets) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index 12a32da5a9c..d2184229bb4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -42,6 +42,7 @@ import java.util.stream.Collectors; public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; + private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5; public static List<Integer> getAllKafkaPartitions(String brokerList, String topic, Map<String, String> convertedCustomProperties) throws UserException { @@ -128,11 +129,11 @@ public class KafkaUtil { } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).build(); + metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); // get info Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); + InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList()); @@ -190,11 +191,11 @@ public class KafkaUtil { metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId); } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( - metaRequestBuilder).build(); + metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); // get info Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); + InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList()); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 59abc9adfb7..3d97c622843 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -412,6 +412,7 @@ message PKafkaMetaProxyRequest { message PProxyRequest { optional PKafkaMetaProxyRequest kafka_meta_request = 1; + optional int64 timeout_secs = 2; }; message PKafkaMetaProxyResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org