This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a837282ca72 [fix](routineload) fix consume data too slow in partial 
partitions (#32126) (#32303)
a837282ca72 is described below

commit a837282ca729bc928fb94d8f300acf1c54e467b8
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Fri Mar 15 22:32:10 2024 +0800

    [fix](routineload) fix consume data too slow in partial partitions (#32126) 
(#32303)
---
 be/src/runtime/routine_load/data_consumer_group.h  | 8 +++++---
 be/src/runtime/routine_load/data_consumer_pool.cpp | 5 +++--
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer_group.h 
b/be/src/runtime/routine_load/data_consumer_group.h
index e15ad7115f6..e4f45cff15d 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -45,8 +45,10 @@ class DataConsumerGroup {
 public:
     typedef std::function<void(const Status&)> ConsumeFinishCallback;
 
-    DataConsumerGroup()
-            : _grp_id(UniqueId::gen_uid()), _thread_pool(3, 10, 
"data_consumer"), _counter(0) {}
+    DataConsumerGroup(size_t consumer_num)
+            : _grp_id(UniqueId::gen_uid()),
+              _thread_pool(consumer_num, consumer_num, "data_consumer"),
+              _counter(0) {}
 
     virtual ~DataConsumerGroup() { _consumers.clear(); }
 
@@ -82,7 +84,7 @@ protected:
 // for kafka
 class KafkaDataConsumerGroup : public DataConsumerGroup {
 public:
-    KafkaDataConsumerGroup() : DataConsumerGroup(), _queue(500) {}
+    KafkaDataConsumerGroup(size_t consumer_num) : 
DataConsumerGroup(consumer_num), _queue(500) {}
 
     virtual ~KafkaDataConsumerGroup();
 
diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp 
b/be/src/runtime/routine_load/data_consumer_pool.cpp
index 48d16e9c219..a361008c651 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.cpp
+++ b/be/src/runtime/routine_load/data_consumer_pool.cpp
@@ -87,12 +87,13 @@ Status 
DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx
         return Status::InternalError("PAUSE: The size of begin_offset of task 
should not be 0.");
     }
 
-    std::shared_ptr<KafkaDataConsumerGroup> grp = 
std::make_shared<KafkaDataConsumerGroup>();
-
     // one data consumer group contains at least one data consumers.
     int max_consumer_num = config::max_consumer_num_per_group;
     size_t consumer_num = std::min((size_t)max_consumer_num, 
ctx->kafka_info->begin_offset.size());
 
+    std::shared_ptr<KafkaDataConsumerGroup> grp =
+            std::make_shared<KafkaDataConsumerGroup>(consumer_num);
+
     for (int i = 0; i < consumer_num; ++i) {
         std::shared_ptr<DataConsumer> consumer;
         RETURN_IF_ERROR(get_consumer(ctx, &consumer));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to