xiaokang commented on code in PR #22159: URL: https://github.com/apache/doris/pull/22159#discussion_r1278267673
########## gensrc/proto/olap_file.proto: ########## @@ -236,6 +236,10 @@ message TabletSchemaPB { repeated string partial_update_input_columns = 21; optional bool enable_single_replica_compaction = 22 [default=false]; optional bool skip_write_index_on_load = 23 [default=false]; + optional string compaction_policy = 24; Review Comment: default value ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java: ########## @@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws AnalysisException { || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) { // do nothing, will be alter in SchemaChangeHandler.updateBinlogConfig + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) { + String compactionPolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, ""); + if (!Strings.isNullOrEmpty(compactionPolicy) + && !compactionPolicy.equals("time_series") && !compactionPolicy.equals("size_based")) { + throw new AnalysisException( + "Table compaction policy only support for time_series or size_based"); + } + this.needTableStable = false; + setCompactionPolicy(compactionPolicy); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + long goalSizeMbytes; + String goalSizeMbytesStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + try { + goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: " + + goalSizeMbytesStr); + } + this.needTableStable = false; + setTimeSeriesCompactionGoalSizeMbytes(goalSizeMbytes); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + long fileCountThreshold; + String fileCountThresholdStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); + try { + fileCountThreshold = Long.parseLong(fileCountThresholdStr); + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_file_count_threshold format: " + + fileCountThresholdStr); + } + this.needTableStable = false; + setTimeSeriesCompactionFileCountThreshold(fileCountThreshold); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) { + long timeThresholdSeconds; + try { + timeThresholdSeconds = Long.parseLong(properties Review Comment: check > 0 ########## be/src/olap/compaction.cpp: ########## @@ -148,8 +148,10 @@ int64_t Compaction::get_avg_segment_rows() { // input_rowsets_size is total disk_size of input_rowset, this size is the // final size after codec and compress, so expect dest segment file size // in disk is config::vertical_compaction_max_segment_size - if (config::compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) { - return (config::time_series_compaction_goal_size_mbytes * 1024 * 1024 * 2) / + if (_tablet->tablet_meta()->tablet_schema()->compaction_policy() == Review Comment: use a variable for tablet->tablet_meta()->tablet_schema() to simplify code ########## be/src/olap/cumulative_compaction_policy.cpp: ########## @@ -368,11 +368,13 @@ int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { } std::shared_ptr<CumulativeCompactionPolicy> -CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() { Review Comment: add an argument policy ########## fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java: ########## @@ -86,6 +86,14 @@ public class TableProperty implements Writable { private boolean skipWriteIndexOnLoad = false; + private String compactionPolicy = ""; Review Comment: default value ########## fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java: ########## @@ -3120,6 +3120,36 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis sb.append(olapTable.skipWriteIndexOnLoad()).append("\""); } + // compaction policy + if (olapTable.getCompactionPolicy() != null && !olapTable.getCompactionPolicy().equals("")) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\" = \""); + sb.append(olapTable.getCompactionPolicy()).append("\""); + } + + // time series compaction goal size + if (olapTable.getTimeSeriesCompactionGoalSizeMbytes() != null + && olapTable.getTimeSeriesCompactionGoalSizeMbytes() != 512) { Review Comment: use const variable instead of literal ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java: ########## @@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws AnalysisException { || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) { // do nothing, will be alter in SchemaChangeHandler.updateBinlogConfig + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) { + String compactionPolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, ""); + if (!Strings.isNullOrEmpty(compactionPolicy) + && !compactionPolicy.equals("time_series") && !compactionPolicy.equals("size_based")) { Review Comment: use const variable instead of literal ########## fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java: ########## @@ -3120,6 +3120,36 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis sb.append(olapTable.skipWriteIndexOnLoad()).append("\""); } + // compaction policy + if (olapTable.getCompactionPolicy() != null && !olapTable.getCompactionPolicy().equals("")) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\" = \""); + sb.append(olapTable.getCompactionPolicy()).append("\""); + } + + // time series compaction goal size + if (olapTable.getTimeSeriesCompactionGoalSizeMbytes() != null + && olapTable.getTimeSeriesCompactionGoalSizeMbytes() != 512) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\""); + } + + // time series compaction file count threshold + if (olapTable.getTimeSeriesCompactionFileCountThreshold() != null + && olapTable.getTimeSeriesCompactionFileCountThreshold() != 2000) { Review Comment: use const variable instead of literal ########## fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java: ########## @@ -595,6 +605,82 @@ public static Boolean analyzeSkipWriteIndexOnLoad(Map<String, String> properties + " must be `true` or `false`"); } + public static String analyzeCompactionPolicy(Map<String, String> properties) throws AnalysisException { + if (properties == null || properties.isEmpty()) { + return ""; + } + String compactionPolicy = ""; Review Comment: default value ########## fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java: ########## @@ -3120,6 +3120,36 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis sb.append(olapTable.skipWriteIndexOnLoad()).append("\""); } + // compaction policy + if (olapTable.getCompactionPolicy() != null && !olapTable.getCompactionPolicy().equals("")) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\" = \""); + sb.append(olapTable.getCompactionPolicy()).append("\""); + } + + // time series compaction goal size + if (olapTable.getTimeSeriesCompactionGoalSizeMbytes() != null + && olapTable.getTimeSeriesCompactionGoalSizeMbytes() != 512) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\""); + } + + // time series compaction file count threshold + if (olapTable.getTimeSeriesCompactionFileCountThreshold() != null + && olapTable.getTimeSeriesCompactionFileCountThreshold() != 2000) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionFileCountThreshold()).append("\""); + } + + // time series compaction time threshold + if (olapTable.getTimeSeriesCompactionTimeThresholdSeconds() != null + && olapTable.getTimeSeriesCompactionTimeThresholdSeconds() != 3600) { Review Comment: use const variable instead of literal ########## fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java: ########## @@ -2137,13 +2137,40 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str } long storagePolicyId = storagePolicyNameToId(storagePolicy); - if (isInMemory < 0 && storagePolicyId < 0) { - LOG.info("Properties already up-to-date"); - return; + // if (isInMemory < 0 && storagePolicyId < 0) { Review Comment: why delete the code ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java: ########## @@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws AnalysisException { || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) { // do nothing, will be alter in SchemaChangeHandler.updateBinlogConfig + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) { + String compactionPolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, ""); + if (!Strings.isNullOrEmpty(compactionPolicy) + && !compactionPolicy.equals("time_series") && !compactionPolicy.equals("size_based")) { + throw new AnalysisException( + "Table compaction policy only support for time_series or size_based"); + } + this.needTableStable = false; + setCompactionPolicy(compactionPolicy); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + long goalSizeMbytes; + String goalSizeMbytesStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + try { + goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); Review Comment: check size > 0 ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java: ########## @@ -140,6 +180,49 @@ public void analyze(Analyzer analyzer) throws AnalysisException { || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES) || properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) { // do nothing, will be alter in SchemaChangeHandler.updateBinlogConfig + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY)) { + String compactionPolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY, ""); + if (!Strings.isNullOrEmpty(compactionPolicy) + && !compactionPolicy.equals("time_series") && !compactionPolicy.equals("size_based")) { + throw new AnalysisException( + "Table compaction policy only support for time_series or size_based"); + } + this.needTableStable = false; + setCompactionPolicy(compactionPolicy); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) { + long goalSizeMbytes; + String goalSizeMbytesStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES); + try { + goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: " + + goalSizeMbytesStr); + } + this.needTableStable = false; + setTimeSeriesCompactionGoalSizeMbytes(goalSizeMbytes); + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { + long fileCountThreshold; + String fileCountThresholdStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); + try { + fileCountThreshold = Long.parseLong(fileCountThresholdStr); Review Comment: check > 0 ########## be/src/olap/cumulative_compaction_time_series_policy.cpp: ########## @@ -63,13 +63,16 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( return 0; } - // Condition 1: the size of input files for compaction meets the requirement of parameter _compaction_goal_size - if (total_size >= (config::time_series_compaction_goal_size_mbytes * 1024 * 1024)) { + // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size + if (total_size >= + (tablet->tablet_meta()->tablet_schema()->time_series_compaction_goal_size_mbytes() * 1024 * Review Comment: use a local variable to store tablet->tablet_meta()->tablet_schema() to simplify code ########## be/src/olap/cumulative_compaction_policy.cpp: ########## @@ -368,11 +368,13 @@ int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { } std::shared_ptr<CumulativeCompactionPolicy> -CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() { Review Comment: just keep single create_cumulative_compaction_policy() instead of two create functions ########## be/src/agent/task_worker_pool.cpp: ########## @@ -433,6 +433,63 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); need_to_save = true; } + if (tablet_meta_info.__isset.compaction_policy) { + tablet->tablet_meta()->mutable_tablet_schema()->set_compaction_policy( + tablet_meta_info.compaction_policy); + std::shared_lock rlock(tablet->get_header_lock()); Review Comment: lock problem: 1. why not write lock 2. why not lock before mutable_tablet_schema()->set_compaction_policy ########## be/src/agent/task_worker_pool.cpp: ########## @@ -433,6 +433,63 @@ void TaskWorkerPool::_update_tablet_meta_worker_thread_callback() { tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); need_to_save = true; } + if (tablet_meta_info.__isset.compaction_policy) { + tablet->tablet_meta()->mutable_tablet_schema()->set_compaction_policy( + tablet_meta_info.compaction_policy); + std::shared_lock rlock(tablet->get_header_lock()); + for (auto& rowset_meta : tablet->tablet_meta()->all_mutable_rs_metas()) { + rowset_meta->tablet_schema()->set_compaction_policy( + tablet_meta_info.compaction_policy); + } + tablet->tablet_schema_unlocked()->set_compaction_policy( Review Comment: what's difference to mutable_tablet_schema()->set_compaction_policy? ########## be/src/olap/olap_server.cpp: ########## @@ -876,9 +877,12 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( } void StorageEngine::_update_cumulative_compaction_policy() { - if (_cumulative_compaction_policy == nullptr) { - _cumulative_compaction_policy = - CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(); + if (_all_cumulative_compaction_policy.size() < 2) { Review Comment: check size 2 is tricky. check _all_cumulative_compaction_policy.count(policyxx). BTW, name _cumulative_compaction_policies is better ########## be/src/olap/olap_server.cpp: ########## @@ -989,7 +993,14 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT bool force) { _update_cumulative_compaction_policy(); if (tablet->get_cumulative_compaction_policy() == nullptr) { - tablet->set_cumulative_compaction_policy(_cumulative_compaction_policy); Review Comment: why change to complex logic ########## be/src/olap/storage_engine.h: ########## @@ -462,7 +462,9 @@ class StorageEngine { std::shared_ptr<StreamLoadRecorder> _stream_load_recorder; - std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy; + // we use unordered_map to store all cumulative compaction policy sharded ptr Review Comment: why? ########## be/src/http/action/compaction_action.cpp: ########## @@ -198,7 +199,12 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, timer.start(); std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = Review Comment: encapsulate the if else logic to create_cumulative_compaction_policy() function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org