This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eccec4a7d39 [fix](routine-load) fix get kafka offset timeout may too 
long (#33502)
eccec4a7d39 is described below

commit eccec4a7d39b182c6d07aca1e133eb92f8d3d7b3
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Mon Apr 15 19:35:26 2024 +0800

    [fix](routine-load) fix get kafka offset timeout may too long (#33502)
---
 be/src/runtime/routine_load/data_consumer.cpp        | 20 ++++++++++++++------
 be/src/runtime/routine_load/data_consumer.h          |  4 ++--
 .../routine_load/routine_load_task_executor.cpp      | 10 ++++++----
 .../routine_load/routine_load_task_executor.h        |  6 ++++--
 be/src/service/internal_service.cpp                  |  7 +++++--
 .../org/apache/doris/datasource/kafka/KafkaUtil.java |  9 +++++----
 gensrc/proto/internal_service.proto                  |  1 +
 7 files changed, 37 insertions(+), 20 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 11c15c494fa..ccf5fb4cb25 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -349,7 +349,7 @@ Status 
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
 // corresponding partition.
 // See librdkafka/rdkafkacpp.h##offsetsForTimes()
 Status KafkaDataConsumer::get_offsets_for_times(const 
std::vector<PIntegerPair>& times,
-                                                std::vector<PIntegerPair>* 
offsets) {
+                                                std::vector<PIntegerPair>* 
offsets, int timeout) {
     // create topic partition
     std::vector<RdKafka::TopicPartition*> topic_partitions;
     for (const auto& entry : times) {
@@ -364,8 +364,8 @@ Status KafkaDataConsumer::get_offsets_for_times(const 
std::vector<PIntegerPair>&
     }};
 
     // get offsets for times
-    RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, 
5000);
-    if (err != RdKafka::ERR_NO_ERROR) {
+    RdKafka::ErrorCode err = _k_consumer->offsetsForTimes(topic_partitions, 
timeout);
+    if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
         std::stringstream ss;
         ss << "failed to get offsets for times: " << RdKafka::err2str(err);
         LOG(WARNING) << ss.str();
@@ -384,13 +384,21 @@ Status KafkaDataConsumer::get_offsets_for_times(const 
std::vector<PIntegerPair>&
 
 // get latest offsets for given partitions
 Status KafkaDataConsumer::get_latest_offsets_for_partitions(
-        const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* 
offsets) {
+        const std::vector<int32_t>& partition_ids, std::vector<PIntegerPair>* 
offsets,
+        int timeout) {
+    MonotonicStopWatch watch;
+    watch.start();
     for (int32_t partition_id : partition_ids) {
         int64_t low = 0;
         int64_t high = 0;
+        auto timeout_ms = timeout - static_cast<int>(watch.elapsed_time() / 
1000 / 1000);
+        if (UNLIKELY(timeout_ms <= 0)) {
+            return Status::InternalError("get kafka latest offsets for 
partitions timeout");
+        }
+
         RdKafka::ErrorCode err =
-                _k_consumer->query_watermark_offsets(_topic, partition_id, 
&low, &high, 5000);
-        if (err != RdKafka::ERR_NO_ERROR) {
+                _k_consumer->query_watermark_offsets(_topic, partition_id, 
&low, &high, timeout_ms);
+        if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) {
             std::stringstream ss;
             ss << "failed to get latest offset for partition: " << partition_id
                << ", err: " << RdKafka::err2str(err);
diff --git a/be/src/runtime/routine_load/data_consumer.h 
b/be/src/runtime/routine_load/data_consumer.h
index 0e1377681f4..f6c10467786 100644
--- a/be/src/runtime/routine_load/data_consumer.h
+++ b/be/src/runtime/routine_load/data_consumer.h
@@ -151,10 +151,10 @@ public:
     Status get_partition_meta(std::vector<int32_t>* partition_ids);
     // get offsets for times
     Status get_offsets_for_times(const std::vector<PIntegerPair>& times,
-                                 std::vector<PIntegerPair>* offsets);
+                                 std::vector<PIntegerPair>* offsets, int 
timeout);
     // get latest offsets for partitions
     Status get_latest_offsets_for_partitions(const std::vector<int32_t>& 
partition_ids,
-                                             std::vector<PIntegerPair>* 
offsets);
+                                             std::vector<PIntegerPair>* 
offsets, int timeout);
 
 private:
     std::string _brokers;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index dcf6ecbdb6a..3e8d1dddefc 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -136,7 +136,8 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
 }
 
 Status RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
-        const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* 
partition_offsets) {
+        const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* 
partition_offsets,
+        int timeout) {
     CHECK(request.has_kafka_info());
 
     // This context is meaningless, just for unifing the interface
@@ -148,7 +149,7 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
 
     Status st = 
std::static_pointer_cast<KafkaDataConsumer>(consumer)->get_offsets_for_times(
             std::vector<PIntegerPair>(request.offset_times().begin(), 
request.offset_times().end()),
-            partition_offsets);
+            partition_offsets, timeout);
     if (st.ok()) {
         _data_consumer_pool.return_consumer(consumer);
     }
@@ -156,7 +157,8 @@ Status 
RoutineLoadTaskExecutor::get_kafka_partition_offsets_for_times(
 }
 
 Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
-        const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* 
partition_offsets) {
+        const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* 
partition_offsets,
+        int timeout) {
     CHECK(request.has_kafka_info());
 
     // This context is meaningless, just for unifing the interface
@@ -171,7 +173,7 @@ Status 
RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions(
                     ->get_latest_offsets_for_partitions(
                             
std::vector<int32_t>(request.partition_id_for_latest_offsets().begin(),
                                                  
request.partition_id_for_latest_offsets().end()),
-                            partition_offsets);
+                            partition_offsets, timeout);
     if (st.ok()) {
         _data_consumer_pool.return_consumer(consumer);
     }
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index b2a61612fe2..7ae29d60380 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -62,10 +62,12 @@ public:
                                     std::vector<int32_t>* partition_ids);
 
     Status get_kafka_partition_offsets_for_times(const PKafkaMetaProxyRequest& 
request,
-                                                 std::vector<PIntegerPair>* 
partition_offsets);
+                                                 std::vector<PIntegerPair>* 
partition_offsets,
+                                                 int timeout);
 
     Status get_kafka_latest_offsets_for_partitions(const 
PKafkaMetaProxyRequest& request,
-                                                   std::vector<PIntegerPair>* 
partition_offsets);
+                                                   std::vector<PIntegerPair>* 
partition_offsets,
+                                                   int timeout);
 
 private:
     // execute the task
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 9918980514c..aa9c207d689 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1158,6 +1158,7 @@ void 
PInternalService::get_info(google::protobuf::RpcController* controller,
         // Currently it supports 2 kinds of requests:
         // 1. get all kafka partition ids for given topic
         // 2. get all kafka partition offsets for given topic and timestamp.
+        int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() 
* 1000 : 5 * 1000;
         if (request->has_kafka_meta_request()) {
             const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
             if (!kafka_request.partition_id_for_latest_offsets().empty()) {
@@ -1165,7 +1166,8 @@ void 
PInternalService::get_info(google::protobuf::RpcController* controller,
                 std::vector<PIntegerPair> partition_offsets;
                 Status st = _exec_env->routine_load_task_executor()
                                     ->get_kafka_latest_offsets_for_partitions(
-                                            request->kafka_meta_request(), 
&partition_offsets);
+                                            request->kafka_meta_request(), 
&partition_offsets,
+                                            timeout_ms);
                 if (st.ok()) {
                     PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
                     for (const auto& entry : partition_offsets) {
@@ -1181,7 +1183,8 @@ void 
PInternalService::get_info(google::protobuf::RpcController* controller,
                 std::vector<PIntegerPair> partition_offsets;
                 Status st = _exec_env->routine_load_task_executor()
                                     ->get_kafka_partition_offsets_for_times(
-                                            request->kafka_meta_request(), 
&partition_offsets);
+                                            request->kafka_meta_request(), 
&partition_offsets,
+                                            timeout_ms);
                 if (st.ok()) {
                     PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
                     for (const auto& entry : partition_offsets) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 12a32da5a9c..d2184229bb4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -42,6 +42,7 @@ import java.util.stream.Collectors;
 public class KafkaUtil {
     private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
     private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
+    private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
 
     public static List<Integer> getAllKafkaPartitions(String brokerList, 
String topic,
             Map<String, String> convertedCustomProperties) throws 
UserException {
@@ -128,11 +129,11 @@ public class KafkaUtil {
             }
 
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    metaRequestBuilder).build();
+                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
 
             // get info
             Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = future.get(5, 
TimeUnit.SECONDS);
+            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
                 throw new UserException("failed to get offsets for times: " + 
result.getStatus().getErrorMsgsList());
@@ -190,11 +191,11 @@ public class KafkaUtil {
                 metaRequestBuilder.addPartitionIdForLatestOffsets(partitionId);
             }
             InternalService.PProxyRequest request = 
InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
-                    metaRequestBuilder).build();
+                    
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
 
             // get info
             Future<InternalService.PProxyResult> future = 
BackendServiceProxy.getInstance().getInfo(address, request);
-            InternalService.PProxyResult result = future.get(5, 
TimeUnit.SECONDS);
+            InternalService.PProxyResult result = 
future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
                 throw new UserException("failed to get latest offsets: " + 
result.getStatus().getErrorMsgsList());
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 59abc9adfb7..3d97c622843 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -412,6 +412,7 @@ message PKafkaMetaProxyRequest {
 
 message PProxyRequest {
     optional PKafkaMetaProxyRequest kafka_meta_request = 1;
+    optional int64 timeout_secs = 2;
 };
 
 message PKafkaMetaProxyResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to