github-actions[bot] commented on code in PR #33414:
URL: https://github.com/apache/doris/pull/33414#discussion_r1558957128


##########
be/src/cloud/cloud_cumulative_compaction_policy.h:
##########
@@ -43,17 +62,23 @@ class CloudSizeBasedCumulativeCompactionPolicy {
             int64_t compaction_min_size = config::compaction_min_size_mbytes * 
1024 * 1024,
             int64_t promotion_version_count = 
config::compaction_promotion_version_count);
 
-    ~CloudSizeBasedCumulativeCompactionPolicy() {}
+    ~CloudSizeBasedCumulativeCompactionPolicy() override = default;
 
     int64_t new_cumulative_point(CloudTablet* tablet, const RowsetSharedPtr& 
output_rowset,
-                                 Version& last_delete_version, int64_t 
last_cumulative_point);
+                                 Version& last_delete_version,
+                                 int64_t last_cumulative_point) override;
+
+    int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& 
input_rowsets) override {

Review Comment:
   warning: method 'new_compaction_level' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static int64_t new_compaction_level(const std::vector<RowsetSharedPtr>& 
input_rowsets) override {
   ```
   



##########
be/src/cloud/cloud_cumulative_compaction_policy.cpp:
##########
@@ -213,4 +213,152 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
                    : last_cumulative_point;
 }
 
+int32_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
+        CloudTablet* tablet, const std::vector<RowsetSharedPtr>& 
candidate_rowsets,
+        const int64_t max_compaction_score, const int64_t min_compaction_score,
+        std::vector<RowsetSharedPtr>* input_rowsets, Version* 
last_delete_version,
+        size_t* compaction_score, bool allow_delete) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
+        return 0;
+    }
+
+    int64_t compaction_goal_size_mbytes =
+            tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
+
+    int transient_size = 0;
+    *compaction_score = 0;
+    input_rowsets->clear();
+    int64_t total_size = 0;
+
+    for (const auto& rowset : candidate_rowsets) {
+        // check whether this rowset is delete version
+        if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
+            *last_delete_version = rowset->version();
+            if (!input_rowsets->empty()) {
+                // we meet a delete version, and there were other versions 
before.
+                // we should compact those version before handling them over 
to base compaction
+                break;
+            } else {
+                // we meet a delete version, and no other versions before, 
skip it and continue
+                input_rowsets->clear();
+                *compaction_score = 0;
+                transient_size = 0;
+                total_size = 0;
+                continue;
+            }
+        }
+
+        *compaction_score += rowset->rowset_meta()->get_compaction_score();
+        total_size += rowset->rowset_meta()->total_disk_size();
+
+        transient_size += 1;
+        input_rowsets->push_back(rowset);
+
+        // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
+        if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) {
+            if (input_rowsets->size() == 1 &&
+                
!input_rowsets->front()->rowset_meta()->is_segments_overlapping()) {
+                // Only 1 non-overlapping rowset, skip it
+                input_rowsets->clear();
+                *compaction_score = 0;
+                total_size = 0;
+                continue;
+            }
+            return transient_size;
+        }
+    }
+
+    // if there is delete version, do compaction directly
+    if (last_delete_version->first != -1) {
+        // if there is only one rowset and not overlapping,
+        // we do not need to do cumulative compaction
+        if (input_rowsets->size() == 1 &&
+            !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) 
{
+            input_rowsets->clear();
+            *compaction_score = 0;
+        }
+        return transient_size;
+    }
+
+    // Condition 2: the number of input files reaches the threshold specified 
by parameter compaction_file_count_threshold
+    if (*compaction_score >= 
tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
+        return transient_size;
+    }
+
+    // Condition 3: level1 achieve compaction_goal_size
+    std::vector<RowsetSharedPtr> level1_rowsets;
+    if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+        int64_t continuous_size = 0;
+        for (const auto& rowset : candidate_rowsets) {
+            const auto& rs_meta = rowset->rowset_meta();
+            if (rs_meta->compaction_level() == 0) {
+                break;
+            }
+            level1_rowsets.push_back(rowset);
+            continuous_size += rs_meta->total_disk_size();
+            if (level1_rowsets.size() >= 2) {
+                if (continuous_size >= compaction_goal_size_mbytes * 1024 * 
1024) {
+                    input_rowsets->swap(level1_rowsets);
+                    return input_rowsets->size();
+                }
+            }
+        }
+    }
+
+    int64_t now = UnixMillis();
+    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
+    if (last_cumu != 0) {
+        int64_t cumu_interval = now - last_cumu;
+
+        // Condition 4: the time interval between compactions exceeds the 
value specified by parameter compaction_time_threshold_second
+        if (cumu_interval >
+            
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 
1000)) {
+            if 
(tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
+                if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
+                    input_rowsets->swap(level1_rowsets);
+                    return input_rowsets->size();
+                }
+            }
+            return transient_size;
+        }
+    }
+
+    input_rowsets->clear();
+    *compaction_score = 0;
+
+    return 0;
+}
+
+int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(

Review Comment:
   warning: method 'new_compaction_level' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static int64_t 
CloudTimeSeriesCumulativeCompactionPolicy::new_compaction_level(
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to