This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new a837282ca72 [fix](routineload) fix consume data too slow in partial partitions (#32126) (#32303) a837282ca72 is described below commit a837282ca729bc928fb94d8f300acf1c54e467b8 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Fri Mar 15 22:32:10 2024 +0800 [fix](routineload) fix consume data too slow in partial partitions (#32126) (#32303) --- be/src/runtime/routine_load/data_consumer_group.h | 8 +++++--- be/src/runtime/routine_load/data_consumer_pool.cpp | 5 +++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index e15ad7115f6..e4f45cff15d 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -45,8 +45,10 @@ class DataConsumerGroup { public: typedef std::function<void(const Status&)> ConsumeFinishCallback; - DataConsumerGroup() - : _grp_id(UniqueId::gen_uid()), _thread_pool(3, 10, "data_consumer"), _counter(0) {} + DataConsumerGroup(size_t consumer_num) + : _grp_id(UniqueId::gen_uid()), + _thread_pool(consumer_num, consumer_num, "data_consumer"), + _counter(0) {} virtual ~DataConsumerGroup() { _consumers.clear(); } @@ -82,7 +84,7 @@ protected: // for kafka class KafkaDataConsumerGroup : public DataConsumerGroup { public: - KafkaDataConsumerGroup() : DataConsumerGroup(), _queue(500) {} + KafkaDataConsumerGroup(size_t consumer_num) : DataConsumerGroup(consumer_num), _queue(500) {} virtual ~KafkaDataConsumerGroup(); diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index 48d16e9c219..a361008c651 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -87,12 +87,13 @@ Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx return Status::InternalError("PAUSE: The size of begin_offset of task should not be 0."); } - std::shared_ptr<KafkaDataConsumerGroup> grp = std::make_shared<KafkaDataConsumerGroup>(); - // one data consumer group contains at least one data consumers. int max_consumer_num = config::max_consumer_num_per_group; size_t consumer_num = std::min((size_t)max_consumer_num, ctx->kafka_info->begin_offset.size()); + std::shared_ptr<KafkaDataConsumerGroup> grp = + std::make_shared<KafkaDataConsumerGroup>(consumer_num); + for (int i = 0; i < consumer_num; ++i) { std::shared_ptr<DataConsumer> consumer; RETURN_IF_ERROR(get_consumer(ctx, &consumer)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org