This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c6c90ff63e12942314e37943252cb282689e597b
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Fri May 24 20:01:10 2024 +0800

    [chore](routine-load) make routine_load_consumer_pool_size can update using 
HTTP API (#35315)
---
 be/src/common/config.cpp                                   | 2 +-
 be/src/common/config.h                                     | 2 +-
 be/src/runtime/routine_load/data_consumer_pool.cpp         | 5 +++--
 be/src/runtime/routine_load/data_consumer_pool.h           | 4 +---
 be/src/runtime/routine_load/routine_load_task_executor.cpp | 3 +--
 5 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 546d69a57b4..828dcd70811 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -815,7 +815,7 @@ DEFINE_String(kafka_debug, "disable");
 // The number of pool siz of routine load consumer.
 // If you meet the error describe in 
https://github.com/edenhill/librdkafka/issues/3608
 // Change this size to 0 to fix it temporarily.
-DEFINE_Int32(routine_load_consumer_pool_size, "10");
+DEFINE_mInt32(routine_load_consumer_pool_size, "1024");
 
 // Used in single-stream-multi-table load. When receive a batch of messages 
from kafka,
 // if the size of batch is more than this threshold, we will request plans for 
all related tables.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 54f8d3459dd..b57e4c5f639 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -871,7 +871,7 @@ DECLARE_mString(kafka_debug);
 // The number of pool siz of routine load consumer.
 // If you meet the error describe in 
https://github.com/edenhill/librdkafka/issues/3608
 // Change this size to 0 to fix it temporarily.
-DECLARE_Int32(routine_load_consumer_pool_size);
+DECLARE_mInt32(routine_load_consumer_pool_size);
 
 // Used in single-stream-multi-table load. When receive a batch of messages 
from kafka,
 // if the size of batch is more than this threshold, we will request plans for 
all related tables.
diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp 
b/be/src/runtime/routine_load/data_consumer_pool.cpp
index 95af63e4de4..c0c5e43b96b 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.cpp
+++ b/be/src/runtime/routine_load/data_consumer_pool.cpp
@@ -108,8 +108,9 @@ Status 
DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext> ctx
 void DataConsumerPool::return_consumer(std::shared_ptr<DataConsumer> consumer) 
{
     std::unique_lock<std::mutex> l(_lock);
 
-    if (_pool.size() == _max_pool_size) {
-        VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-" 
<< _max_pool_size
+    if (_pool.size() == config::routine_load_consumer_pool_size) {
+        VLOG_NOTICE << "data consumer pool is full: " << _pool.size() << "-"
+                    << config::routine_load_consumer_pool_size
                     << ", discard the returned consumer: " << consumer->id();
         return;
     }
diff --git a/be/src/runtime/routine_load/data_consumer_pool.h 
b/be/src/runtime/routine_load/data_consumer_pool.h
index 25dbc57fb7f..afac2277386 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.h
+++ b/be/src/runtime/routine_load/data_consumer_pool.h
@@ -38,8 +38,7 @@ class StreamLoadContext;
 // to be reused
 class DataConsumerPool {
 public:
-    DataConsumerPool(int64_t max_pool_size)
-            : _max_pool_size(max_pool_size), _stop_background_threads_latch(1) 
{}
+    DataConsumerPool() : _stop_background_threads_latch(1) {}
 
     ~DataConsumerPool() = default;
 
@@ -71,7 +70,6 @@ private:
 private:
     std::mutex _lock;
     std::list<std::shared_ptr<DataConsumer>> _pool;
-    int64_t _max_pool_size;
 
     CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _clean_idle_consumer_thread;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 42f5db72fdb..9399ccdf773 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -61,8 +61,7 @@ using namespace ErrorCode;
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count, 
MetricUnit::NOUNIT);
 
-RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
-        : _exec_env(exec_env), 
_data_consumer_pool(config::routine_load_consumer_pool_size) {
+RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env) : 
_exec_env(exec_env) {
     REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
         // std::lock_guard<std::mutex> l(_lock);
         return _task_map.size();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to