xiaokang commented on code in PR #22159:
URL: https://github.com/apache/doris/pull/22159#discussion_r1278267673


##########
gensrc/proto/olap_file.proto:
##########
@@ -236,6 +236,10 @@ message TabletSchemaPB {
     repeated string partial_update_input_columns = 21;
     optional bool enable_single_replica_compaction = 22 [default=false];
     optional bool skip_write_index_on_load = 23 [default=false];
+    optional string compaction_policy = 24;

Review Comment:
   default value



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java:
##########
@@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws 
AnalysisException {
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES)
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) {
             // do nothing, will be alter in 
SchemaChangeHandler.updateBinlogConfig
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+            String compactionPolicy = 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, "");
+            if (!Strings.isNullOrEmpty(compactionPolicy)
+                    && !compactionPolicy.equals("time_series") && 
!compactionPolicy.equals("size_based")) {
+                throw new AnalysisException(
+                        "Table compaction policy only support for time_series 
or size_based");
+            }
+            this.needTableStable = false;
+            setCompactionPolicy(compactionPolicy);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+            long goalSizeMbytes;
+            String goalSizeMbytesStr = properties
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+            try {
+                goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_goal_size_mbytes format: "
+                        + goalSizeMbytesStr);
+            }
+            this.needTableStable = false;
+            setTimeSeriesCompactionGoalSizeMbytes(goalSizeMbytes);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+            long fileCountThreshold;
+            String fileCountThresholdStr = properties
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+            try {
+                fileCountThreshold = Long.parseLong(fileCountThresholdStr);
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_file_count_threshold format: "
+                                                                               
 + fileCountThresholdStr);
+            }
+            this.needTableStable = false;
+            setTimeSeriesCompactionFileCountThreshold(fileCountThreshold);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS))
 {
+            long timeThresholdSeconds;
+            try {
+                timeThresholdSeconds = Long.parseLong(properties

Review Comment:
   check > 0



##########
be/src/olap/compaction.cpp:
##########
@@ -148,8 +148,10 @@ int64_t Compaction::get_avg_segment_rows() {
     // input_rowsets_size is total disk_size of input_rowset, this size is the
     // final size after codec and compress, so expect dest segment file size
     // in disk is config::vertical_compaction_max_segment_size
-    if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
-        return (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 
* 2) /
+    if (_tablet->tablet_meta()->tablet_schema()->compaction_policy() ==

Review Comment:
   use a variable for tablet->tablet_meta()->tablet_schema() to simplify code



##########
be/src/olap/cumulative_compaction_policy.cpp:
##########
@@ -368,11 +368,13 @@ int64_t 
SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
 }
 
 std::shared_ptr<CumulativeCompactionPolicy>
-CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() {

Review Comment:
   add an argument policy



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java:
##########
@@ -86,6 +86,14 @@ public class TableProperty implements Writable {
 
     private boolean skipWriteIndexOnLoad = false;
 
+    private String compactionPolicy = "";

Review Comment:
   default value



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -3120,6 +3120,36 @@ public static void getDdlStmt(DdlStmt ddlStmt, String 
dbName, TableIf table, Lis
                 sb.append(olapTable.skipWriteIndexOnLoad()).append("\"");
             }
 
+            // compaction policy
+            if (olapTable.getCompactionPolicy() != null && 
!olapTable.getCompactionPolicy().equals("")) {
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\"
 = \"");
+                sb.append(olapTable.getCompactionPolicy()).append("\"");
+            }
+
+            // time series compaction goal size
+            if (olapTable.getTimeSeriesCompactionGoalSizeMbytes() != null
+                                                && 
olapTable.getTimeSeriesCompactionGoalSizeMbytes() != 512) {

Review Comment:
   use const variable instead of literal



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java:
##########
@@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws 
AnalysisException {
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES)
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) {
             // do nothing, will be alter in 
SchemaChangeHandler.updateBinlogConfig
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+            String compactionPolicy = 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, "");
+            if (!Strings.isNullOrEmpty(compactionPolicy)
+                    && !compactionPolicy.equals("time_series") && 
!compactionPolicy.equals("size_based")) {

Review Comment:
   use const variable instead of literal



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -3120,6 +3120,36 @@ public static void getDdlStmt(DdlStmt ddlStmt, String 
dbName, TableIf table, Lis
                 sb.append(olapTable.skipWriteIndexOnLoad()).append("\"");
             }
 
+            // compaction policy
+            if (olapTable.getCompactionPolicy() != null && 
!olapTable.getCompactionPolicy().equals("")) {
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\"
 = \"");
+                sb.append(olapTable.getCompactionPolicy()).append("\"");
+            }
+
+            // time series compaction goal size
+            if (olapTable.getTimeSeriesCompactionGoalSizeMbytes() != null
+                                                && 
olapTable.getTimeSeriesCompactionGoalSizeMbytes() != 512) {
+                sb.append(",\n\"").append(PropertyAnalyzer
+                                    
.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \"");
+                
sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\"");
+            }
+
+            // time series compaction file count threshold
+            if (olapTable.getTimeSeriesCompactionFileCountThreshold() != null
+                                                && 
olapTable.getTimeSeriesCompactionFileCountThreshold() != 2000) {

Review Comment:
   use const variable instead of literal



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java:
##########
@@ -595,6 +605,82 @@ public static Boolean 
analyzeSkipWriteIndexOnLoad(Map<String, String> properties
                 + " must be `true` or `false`");
     }
 
+    public static String analyzeCompactionPolicy(Map<String, String> 
properties) throws AnalysisException {
+        if (properties == null || properties.isEmpty()) {
+            return "";
+        }
+        String compactionPolicy = "";

Review Comment:
   default value



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -3120,6 +3120,36 @@ public static void getDdlStmt(DdlStmt ddlStmt, String 
dbName, TableIf table, Lis
                 sb.append(olapTable.skipWriteIndexOnLoad()).append("\"");
             }
 
+            // compaction policy
+            if (olapTable.getCompactionPolicy() != null && 
!olapTable.getCompactionPolicy().equals("")) {
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\"
 = \"");
+                sb.append(olapTable.getCompactionPolicy()).append("\"");
+            }
+
+            // time series compaction goal size
+            if (olapTable.getTimeSeriesCompactionGoalSizeMbytes() != null
+                                                && 
olapTable.getTimeSeriesCompactionGoalSizeMbytes() != 512) {
+                sb.append(",\n\"").append(PropertyAnalyzer
+                                    
.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \"");
+                
sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\"");
+            }
+
+            // time series compaction file count threshold
+            if (olapTable.getTimeSeriesCompactionFileCountThreshold() != null
+                                                && 
olapTable.getTimeSeriesCompactionFileCountThreshold() != 2000) {
+                sb.append(",\n\"").append(PropertyAnalyzer
+                                    
.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD).append("\" = \"");
+                
sb.append(olapTable.getTimeSeriesCompactionFileCountThreshold()).append("\"");
+            }
+
+            // time series compaction time threshold
+            if (olapTable.getTimeSeriesCompactionTimeThresholdSeconds() != null
+                                                && 
olapTable.getTimeSeriesCompactionTimeThresholdSeconds() != 3600) {

Review Comment:
   use const variable instead of literal



##########
fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java:
##########
@@ -2137,13 +2137,40 @@ public void updateTableProperties(Database db, String 
tableName, Map<String, Str
         }
         long storagePolicyId = storagePolicyNameToId(storagePolicy);
 
-        if (isInMemory < 0 && storagePolicyId < 0) {
-            LOG.info("Properties already up-to-date");
-            return;
+        // if (isInMemory < 0 && storagePolicyId < 0) {

Review Comment:
   why delete the code



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java:
##########
@@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws 
AnalysisException {
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES)
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) {
             // do nothing, will be alter in 
SchemaChangeHandler.updateBinlogConfig
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+            String compactionPolicy = 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, "");
+            if (!Strings.isNullOrEmpty(compactionPolicy)
+                    && !compactionPolicy.equals("time_series") && 
!compactionPolicy.equals("size_based")) {
+                throw new AnalysisException(
+                        "Table compaction policy only support for time_series 
or size_based");
+            }
+            this.needTableStable = false;
+            setCompactionPolicy(compactionPolicy);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+            long goalSizeMbytes;
+            String goalSizeMbytesStr = properties
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+            try {
+                goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);

Review Comment:
   check size > 0



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java:
##########
@@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws 
AnalysisException {
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES)
                 || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) {
             // do nothing, will be alter in 
SchemaChangeHandler.updateBinlogConfig
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) {
+            String compactionPolicy = 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, "");
+            if (!Strings.isNullOrEmpty(compactionPolicy)
+                    && !compactionPolicy.equals("time_series") && 
!compactionPolicy.equals("size_based")) {
+                throw new AnalysisException(
+                        "Table compaction policy only support for time_series 
or size_based");
+            }
+            this.needTableStable = false;
+            setCompactionPolicy(compactionPolicy);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES))
 {
+            long goalSizeMbytes;
+            String goalSizeMbytesStr = properties
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
+            try {
+                goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("Invalid 
time_series_compaction_goal_size_mbytes format: "
+                        + goalSizeMbytesStr);
+            }
+            this.needTableStable = false;
+            setTimeSeriesCompactionGoalSizeMbytes(goalSizeMbytes);
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD))
 {
+            long fileCountThreshold;
+            String fileCountThresholdStr = properties
+                                        
.get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
+            try {
+                fileCountThreshold = Long.parseLong(fileCountThresholdStr);

Review Comment:
   check > 0



##########
be/src/olap/cumulative_compaction_time_series_policy.cpp:
##########
@@ -63,13 +63,16 @@ uint32_t 
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
         return 0;
     }
 
-    // Condition 1: the size of input files for compaction meets the 
requirement of parameter _compaction_goal_size
-    if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 
* 1024)) {
+    // Condition 1: the size of input files for compaction meets the 
requirement of parameter compaction_goal_size
+    if (total_size >=
+        
(tablet->tablet_meta()->tablet_schema()->time_series_compaction_goal_size_mbytes()
 * 1024 *

Review Comment:
   use a local variable to store tablet->tablet_meta()->tablet_schema() to 
simplify code



##########
be/src/olap/cumulative_compaction_policy.cpp:
##########
@@ -368,11 +368,13 @@ int64_t 
SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
 }
 
 std::shared_ptr<CumulativeCompactionPolicy>
-CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() {

Review Comment:
   just keep single create_cumulative_compaction_policy() instead of two create 
functions



##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -433,6 +433,63 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
                 
tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
                 need_to_save = true;
             }
+            if (tablet_meta_info.__isset.compaction_policy) {
+                
tablet->tablet_meta()->mutable_tablet_schema()->set_compaction_policy(
+                        tablet_meta_info.compaction_policy);
+                std::shared_lock rlock(tablet->get_header_lock());

Review Comment:
   lock problem:
   1. why not write lock
   2. why not lock before mutable_tablet_schema()->set_compaction_policy



##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -433,6 +433,63 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
                 
tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
                 need_to_save = true;
             }
+            if (tablet_meta_info.__isset.compaction_policy) {
+                
tablet->tablet_meta()->mutable_tablet_schema()->set_compaction_policy(
+                        tablet_meta_info.compaction_policy);
+                std::shared_lock rlock(tablet->get_header_lock());
+                for (auto& rowset_meta : 
tablet->tablet_meta()->all_mutable_rs_metas()) {
+                    rowset_meta->tablet_schema()->set_compaction_policy(
+                            tablet_meta_info.compaction_policy);
+                }
+                tablet->tablet_schema_unlocked()->set_compaction_policy(

Review Comment:
   what's difference to mutable_tablet_schema()->set_compaction_policy?



##########
be/src/olap/olap_server.cpp:
##########
@@ -876,9 +877,12 @@ std::vector<TabletSharedPtr> 
StorageEngine::_generate_compaction_tasks(
 }
 
 void StorageEngine::_update_cumulative_compaction_policy() {
-    if (_cumulative_compaction_policy == nullptr) {
-        _cumulative_compaction_policy =
-                
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy();
+    if (_all_cumulative_compaction_policy.size() < 2) {

Review Comment:
   check size 2 is tricky. check 
_all_cumulative_compaction_policy.count(policyxx).
   
   BTW, name _cumulative_compaction_policies is better



##########
be/src/olap/olap_server.cpp:
##########
@@ -989,7 +993,14 @@ Status 
StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT
                                              bool force) {
     _update_cumulative_compaction_policy();
     if (tablet->get_cumulative_compaction_policy() == nullptr) {
-        
tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy);

Review Comment:
   why change to complex logic



##########
be/src/olap/storage_engine.h:
##########
@@ -462,7 +462,9 @@ class StorageEngine {
 
     std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
 
-    std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
+    // we use unordered_map to store all cumulative compaction policy sharded 
ptr

Review Comment:
   why?



##########
be/src/http/action/compaction_action.cpp:
##########
@@ -198,7 +199,12 @@ Status 
CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
     timer.start();
 
     std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =

Review Comment:
   encapsulate the if else logic to create_cumulative_compaction_policy() 
function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to