This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c6c90ff63e12942314e37943252cb282689e597b Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Fri May 24 20:01:10 2024 +0800 [chore](routine-load) make routine_load_consumer_pool_size can update using HTTP API (#35315) --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- be/src/runtime/routine_load/data_consumer_pool.cpp | 5 +++-- be/src/runtime/routine_load/data_consumer_pool.h | 4 +--- be/src/runtime/routine_load/routine_load_task_executor.cpp | 3 +-- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 546d69a57b4..828dcd70811 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -815,7 +815,7 @@ DEFINE_String(kafka_debug, "disable"); // The number of pool siz of routine load consumer. // If you meet the error describe in https://github.com/edenhill/librdkafka/issues/3608 // Change this size to 0 to fix it temporarily. -DEFINE_Int32(routine_load_consumer_pool_size, "10"); +DEFINE_mInt32(routine_load_consumer_pool_size, "1024"); // Used in single-stream-multi-table load. When receive a batch of messages from kafka, // if the size of batch is more than this threshold, we will request plans for all related tables. diff --git a/be/src/common/config.h b/be/src/common/config.h index 54f8d3459dd..b57e4c5f639 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -871,7 +871,7 @@ DECLARE_mString(kafka_debug); // The number of pool siz of routine load consumer. // If you meet the error describe in https://github.com/edenhill/librdkafka/issues/3608 // Change this size to 0 to fix it temporarily. -DECLARE_Int32(routine_load_consumer_pool_size); +DECLARE_mInt32(routine_load_consumer_pool_size); // Used in single-stream-multi-table load. When receive a batch of messages from kafka, // if the size of batch is more than this threshold, we will request plans for all related tables. diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index 95af63e4de4..c0c5e43b96b 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -108,8 +108,9 @@ Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) { std::unique_lock<std::mutex> l(_lock); - if (_pool.size() == _max_pool_size) { - VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-" << _max_pool_size + if (_pool.size() == config::routine_load_consumer_pool_size) { + VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-" + << config::routine_load_consumer_pool_size << ", discard the returned consumer: " << consumer->id(); return; } diff --git a/be/src/runtime/routine_load/data_consumer_pool.h b/be/src/runtime/routine_load/data_consumer_pool.h index 25dbc57fb7f..afac2277386 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.h +++ b/be/src/runtime/routine_load/data_consumer_pool.h @@ -38,8 +38,7 @@ class StreamLoadContext; // to be reused class DataConsumerPool { public: - DataConsumerPool(int64_t max_pool_size) - : _max_pool_size(max_pool_size), _stop_background_threads_latch(1) {} + DataConsumerPool() : _stop_background_threads_latch(1) {} ~DataConsumerPool() = default; @@ -71,7 +70,6 @@ private: private: std::mutex _lock; std::list<std::shared_ptr<DataConsumer>> _pool; - int64_t _max_pool_size; CountDownLatch _stop_background_threads_latch; scoped_refptr<Thread> _clean_idle_consumer_thread; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 42f5db72fdb..9399ccdf773 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -61,8 +61,7 @@ using namespace ErrorCode; DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count, MetricUnit::NOUNIT); -RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) - : _exec_env(exec_env), _data_consumer_pool(config::routine_load_consumer_pool_size) { +RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) : _exec_env(exec_env) { REGISTER_HOOK_METRIC(routine_load_task_count, [this]() { // std::lock_guard<std::mutex> l(_lock); return _task_map.size(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org