kaijchen commented on code in PR #33325:
URL: https://github.com/apache/doris/pull/33325#discussion_r1580379559


##########
be/src/common/config.cpp:
##########
@@ -662,6 +662,12 @@ DEFINE_mInt64(storage_flood_stage_left_capacity_bytes, 
"1073741824"); // 1GB
 DEFINE_Int32(flush_thread_num_per_store, "6");
 // number of thread for flushing memtable per store, for high priority load 
task
 DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");
+// number of threads = min(flush_thread_num_per_store * num_store,
+//                         max_flush_thread_num_per_cpu * num_cpu)
+DEFINE_Int32(max_flush_thread_num_per_cpu, "4");
+// number of threads = min(high_priority_flush_thread_num_per_store * 
num_store,
+//                         max_high_priority_flush_thread_num_per_cpu * 
num_cpu)
+DEFINE_Int32(max_high_priority_flush_thread_num_per_cpu, "4");

Review Comment:
   ```suggestion
   ```



##########
be/src/common/config.h:
##########
@@ -715,6 +715,12 @@ DECLARE_mInt64(storage_flood_stage_left_capacity_bytes); 
// 1GB
 DECLARE_Int32(flush_thread_num_per_store);
 // number of thread for flushing memtable per store, for high priority load 
task
 DECLARE_Int32(high_priority_flush_thread_num_per_store);
+// number of threads = min(flush_thread_num_per_store * num_store,
+//                         max_flush_thread_num_per_cpu * num_cpu)
+DECLARE_Int32(max_flush_thread_num_per_cpu);
+// number of threads = min(high_priority_flush_thread_num_per_store * 
num_store,
+//                         max_high_priority_flush_thread_num_per_cpu * 
num_cpu)
+DECLARE_Int32(max_high_priority_flush_thread_num_per_cpu);

Review Comment:
   ```suggestion
   ```



##########
be/src/olap/memtable_flush_executor.cpp:
##########
@@ -202,15 +202,19 @@ void 
FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t
 
 void MemTableFlushExecutor::init(int num_disk) {
     num_disk = std::max(1, num_disk);
-    size_t min_threads = std::max(1, config::flush_thread_num_per_store);
-    size_t max_threads = num_disk * min_threads;
+    int num_cpus = std::thread::hardware_concurrency();
+    int min_threads = std::max(1, config::flush_thread_num_per_store);
+    int max_threads = num_cpus == 0 ? num_disk * min_threads
+                                    : std::min(num_disk * min_threads,
+                                               num_cpus * 
config::max_flush_thread_num_per_cpu);
     static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
                               .set_min_threads(min_threads)
                               .set_max_threads(max_threads)
                               .build(&_flush_pool));
 
     min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
-    max_threads = num_disk * min_threads;
+    max_threads = std::min(num_disk * min_threads,
+                           num_cpus * 
config::max_high_priority_flush_thread_num_per_cpu);

Review Comment:
   ```suggestion
       max_threads = num_cpus == 0 ? num_disk * min_threads
                                   : std::min(num_disk * min_threads,
                                              num_cpus * 
config::max_flush_thread_num_per_cpu);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to