github-actions[bot] commented on code in PR #31215: URL: https://github.com/apache/doris/pull/31215#discussion_r1507434763
########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -196,4 +246,454 @@ void CloudStorageEngine::_sync_tablets_thread_callback() { } } +void CloudStorageEngine::get_cumu_compaction( + int64_t tablet_id, std::vector<std::shared_ptr<CloudCumulativeCompaction>>& res) { + std::lock_guard lock(_compaction_mtx); + if (auto it = _submitted_cumu_compactions.find(tablet_id); + it != _submitted_cumu_compactions.end()) { + res = it->second; + } +} + +void CloudStorageEngine::_adjust_compaction_thread_num() { + int base_thread_num = get_base_thread_num(); + if (_base_compaction_thread_pool->max_threads() != base_thread_num) { + int old_max_threads = _base_compaction_thread_pool->max_threads(); + Status status = _base_compaction_thread_pool->set_max_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads + << " to " << base_thread_num; + } + } + if (_base_compaction_thread_pool->min_threads() != base_thread_num) { + int old_min_threads = _base_compaction_thread_pool->min_threads(); + Status status = _base_compaction_thread_pool->set_min_threads(base_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads + << " to " << base_thread_num; + } + } + + int cumu_thread_num = get_cumu_thread_num(); + if (_cumu_compaction_thread_pool->max_threads() != cumu_thread_num) { + int old_max_threads = _cumu_compaction_thread_pool->max_threads(); + Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads + << " to " << cumu_thread_num; + } + } + if (_cumu_compaction_thread_pool->min_threads() != cumu_thread_num) { + int old_min_threads = _cumu_compaction_thread_pool->min_threads(); + Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_thread_num); + if (status.ok()) { + VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads + << " to " << cumu_thread_num; + } + } +} + +void CloudStorageEngine::_compaction_tasks_producer_callback() { + LOG(INFO) << "try to start compaction producer process!"; + + int round = 0; + CompactionType compaction_type; + + // Used to record the time when the score metric was last updated. + // The update of the score metric is accompanied by the logic of selecting the tablet. + // If there is no slot available, the logic of selecting the tablet will be terminated, + // which causes the score metric update to be terminated. + // In order to avoid this situation, we need to update the score regularly. + int64_t last_cumulative_score_update_time = 0; + int64_t last_base_score_update_time = 0; + static const int64_t check_score_interval_ms = 5000; // 5 secs + + int64_t interval = config::generate_compaction_tasks_interval_ms; + do { + if (!config::disable_auto_compaction) { + _adjust_compaction_thread_num(); + + bool check_score = false; + int64_t cur_time = UnixMillis(); + if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { + compaction_type = CompactionType::CUMULATIVE_COMPACTION; + round++; + if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { + check_score = true; + last_cumulative_score_update_time = cur_time; + } + } else { + compaction_type = CompactionType::BASE_COMPACTION; + round = 0; + if (cur_time - last_base_score_update_time >= check_score_interval_ms) { + check_score = true; + last_base_score_update_time = cur_time; + } + } + std::unique_ptr<ThreadPool>& thread_pool = + (compaction_type == CompactionType::CUMULATIVE_COMPACTION) + ? _cumu_compaction_thread_pool + : _base_compaction_thread_pool; + VLOG_CRITICAL << "compaction thread pool. type: " + << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" + : "BASE") + << ", num_threads: " << thread_pool->num_threads() + << ", num_threads_pending_start: " + << thread_pool->num_threads_pending_start() + << ", num_active_threads: " << thread_pool->num_active_threads() + << ", max_threads: " << thread_pool->max_threads() + << ", min_threads: " << thread_pool->min_threads() + << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); + std::vector<CloudTabletSPtr> tablets_compaction = + _generate_cloud_compaction_tasks(compaction_type, check_score); + + /// Regardless of whether the tablet is submitted for compaction or not, + /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects + /// in the tablet, because these two objects store the tablet's own shared_ptr. + /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, + /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) + for (const auto& tablet : tablets_compaction) { + Status st = submit_compaction_task(tablet, compaction_type); + if (st.ok()) continue; + if ((!st.is<ErrorCode::BE_NO_SUITABLE_VERSION>() && + !st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) || + VLOG_DEBUG_IS_ON) { + LOG(WARNING) << "failed to submit compaction task for tablet: " + << tablet->tablet_id() << ", err: " << st; + } + } + interval = config::generate_compaction_tasks_interval_ms; + } else { + interval = config::check_auto_compaction_interval_seconds * 1000; + } + } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); +} + +std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( Review Comment: warning: function '_generate_cloud_compaction_tasks' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( ^ ``` <details> <summary>Additional context</summary> **be/src/cloud/cloud_storage_engine.cpp:371:** 86 lines including whitespace and comments (threshold 80) ```cpp std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks( ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org