This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2a51750abdb1d4dc2d1cc1b15b126e2be7adfea8 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Jan 11 20:48:39 2024 +0800 [fix](dynamic partition) fix dynamic partition storage medium not working (#29490) --- .../doris/catalog/DynamicPartitionProperty.java | 11 +- .../doris/clone/DynamicPartitionScheduler.java | 30 ++-- .../test_storage_medium_has_disk.groovy | 166 +++++++++++++++++++++ 3 files changed, 193 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java index 8f5e9d359f9..54c49c1ee8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java @@ -19,7 +19,6 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.TimestampArithmeticExpr.TimeUnit; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.DynamicPartitionUtil; @@ -27,6 +26,8 @@ import org.apache.doris.common.util.DynamicPartitionUtil.StartOfDate; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.TimeUtils; +import com.google.common.base.Strings; + import java.util.Map; import java.util.TimeZone; @@ -97,7 +98,7 @@ public class DynamicPartitionProperty { this.reservedHistoryPeriods = properties.getOrDefault( RESERVED_HISTORY_PERIODS, NOT_SET_RESERVED_HISTORY_PERIODS); this.storagePolicy = properties.getOrDefault(STORAGE_POLICY, ""); - this.storageMedium = properties.getOrDefault(STORAGE_MEDIUM, Config.default_storage_medium); + this.storageMedium = properties.getOrDefault(STORAGE_MEDIUM, ""); createStartOfs(properties); } else { this.exist = false; @@ -228,8 +229,10 @@ public class DynamicPartitionProperty { + ",\n\"" + HISTORY_PARTITION_NUM + "\" = \"" + historyPartitionNum + "\"" + ",\n\"" + HOT_PARTITION_NUM + "\" = \"" + hotPartitionNum + "\"" + ",\n\"" + RESERVED_HISTORY_PERIODS + "\" = \"" + reservedHistoryPeriods + "\"" - + ",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy + "\"" - + ",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium + "\""; + + ",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy + "\""; + if (!Strings.isNullOrEmpty(storageMedium)) { + res += ",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium + "\""; + } if (getTimeUnit().equalsIgnoreCase(TimeUnit.WEEK.toString())) { res += ",\n\"" + START_DAY_OF_WEEK + "\" = \"" + startOfWeek.dayOfWeek + "\""; } else if (getTimeUnit().equalsIgnoreCase(TimeUnit.MONTH.toString())) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index dc03ecf8233..565f4c066f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -52,6 +52,7 @@ import org.apache.doris.common.util.RangeUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.thrift.TStorageMedium; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; @@ -351,19 +352,28 @@ public class DynamicPartitionScheduler extends MasterDaemon { */ private void setStorageMediumProperty(HashMap<String, String> partitionProperties, DynamicPartitionProperty property, ZonedDateTime now, int hotPartitionNum, int offset) { - if ((hotPartitionNum <= 0 || offset + hotPartitionNum <= 0) && !property.getStorageMedium() - .equalsIgnoreCase("ssd")) { - return; - } - String cooldownTime; - if (property.getStorageMedium().equalsIgnoreCase("ssd")) { - cooldownTime = TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS); + // 1. no hot partition, then use dynamic_partition.storage_medium + // 2. has hot partition + // 1) dynamic_partition.storage_medium = 'ssd', then use ssd; + // 2) otherwise + // a. cooldown partition, then use hdd + // b. hot partition. then use ssd + if (hotPartitionNum <= 0 || property.getStorageMedium().equalsIgnoreCase("ssd")) { + if (!Strings.isNullOrEmpty(property.getStorageMedium())) { + partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, property.getStorageMedium()); + partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, + TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS)); + } + } else if (offset + hotPartitionNum <= 0) { + partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.HDD.name()); + partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, + TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS)); } else { - cooldownTime = DynamicPartitionUtil.getPartitionRangeString( + String cooldownTime = DynamicPartitionUtil.getPartitionRangeString( property, now, offset + hotPartitionNum, DynamicPartitionUtil.DATETIME_FORMAT); + partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.SSD.name()); + partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, cooldownTime); } - partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.SSD.name()); - partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, cooldownTime); } private void setStoragePolicyProperty(HashMap<String, String> partitionProperties, diff --git a/regression-test/suites/storage_medium_p0/test_storage_medium_has_disk.groovy b/regression-test/suites/storage_medium_p0/test_storage_medium_has_disk.groovy new file mode 100644 index 00000000000..9c90c89e57c --- /dev/null +++ b/regression-test/suites/storage_medium_p0/test_storage_medium_has_disk.groovy @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions + +suite('test_storage_medium_has_disk') { + def checkPartitionMedium = { table, isHdd -> + def partitions = sql_return_maparray "SHOW PARTITIONS FROM ${table}" + assertTrue(partitions.size() > 0) + partitions.each { assertEquals(isHdd ? 'HDD' : 'SSD', it.StorageMedium) } + } + + def checkTableStaticPartition = { isPartitionTable, isHdd -> + def table = "tbl_static_${isPartitionTable}_${isHdd}" + def sqlText = "CREATE TABLE ${table} (k INT)" + if (isPartitionTable) { + sqlText += " PARTITION BY RANGE(k) (PARTITION p1 VALUES LESS THAN ('100'), PARTITION p2 VALUES LESS THAN ('200'))" + } + sqlText += ' DISTRIBUTED BY HASH(k) BUCKETS 5' + if (isHdd) { + sqlText += " PROPERTIES ('storage_medium' = 'hdd')" + } + sql sqlText + checkPartitionMedium table, isHdd + } + + def beReportTablet = { -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + backendId_to_backendIP.each { beId, beIp -> + def port = backendId_to_backendHttpPort.get(beId) as int + be_report_tablet beIp, port + } + sleep 8000 + } + + def options = new ClusterOptions() + options.feConfigs += ['default_storage_medium=SSD', 'dynamic_partition_check_interval_seconds=1'] + options.beConfigs += ['report_disk_state_interval_seconds=1'] + options.beDisks = ['HDD=1', 'SSD=1'] + docker(options) { + // test static partitions + for (def isPartitionTable : [true, false]) { + for (def isHdd : [true, false]) { + checkTableStaticPartition isPartitionTable, isHdd + } + } + + //test dynamic partition without hot partitions + def table = 'tbl_dynamic_1' + sql """CREATE TABLE ${table} (k DATE) + PARTITION BY RANGE(k)() + DISTRIBUTED BY HASH (k) BUCKETS 5 + PROPERTIES + ( + "storage_medium" = "hdd", + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.end" = "1", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "3", + "dynamic_partition.replication_num" = "1", + "dynamic_partition.create_history_partition"= "true", + "dynamic_partition.start" = "-3" + ) + """ + checkPartitionMedium table, false + + table = 'tbl_dynamic_2' + sql """CREATE TABLE ${table} (k DATE) + PARTITION BY RANGE(k)() + DISTRIBUTED BY HASH (k) BUCKETS 5 + PROPERTIES + ( + "storage_medium" = "ssd", + "dynamic_partition.storage_medium" = "hdd", + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "3", + "dynamic_partition.replication_num" = "1", + "dynamic_partition.create_history_partition"= "true", + "dynamic_partition.start" = "-4" + ) + """ + checkPartitionMedium table, true + + // test dynamic_partition with hot partitions, + // storage medium ssd will set all partitions's storage_medium to ssd + table = 'tbl_dynamic_hot_1' + sql """CREATE TABLE ${table} (k DATE) + PARTITION BY RANGE(k)() + DISTRIBUTED BY HASH (k) BUCKETS 5 + PROPERTIES + ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.hot_partition_num" = "1", + "dynamic_partition.storage_medium" = "ssd", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "3", + "dynamic_partition.replication_num" = "1", + "dynamic_partition.create_history_partition"= "true", + "dynamic_partition.hot"= "true", + "dynamic_partition.start" = "-4" + ) + """ + beReportTablet + checkPartitionMedium table, false + + // test dynamic_partition with hot partitions + for (def i = 0; i < 2; i++) { + table = 'tbl_dynamic_hot_2' + sql "DROP TABLE IF EXISTS ${table} FORCE" + sql """CREATE TABLE ${table} (k DATE) + PARTITION BY RANGE(k)() + DISTRIBUTED BY HASH (k) BUCKETS 5 + PROPERTIES + ( + "dynamic_partition.storage_medium" = "hdd", + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.hot_partition_num" = "1", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "3", + "dynamic_partition.replication_num" = "1", + "dynamic_partition.create_history_partition"= "true", + "dynamic_partition.hot"= "true", + "dynamic_partition.start" = "-4" + ) + """ + + beReportTablet + def partitions = sql_return_maparray "SHOW PARTITIONS FROM ${table}" + if (i == 0 && (partitions.size() != 8 || partitions.get(3).StorageMedium != 'HDD' + || partitions.get(4).StorageMedium != 'SSD')) { + sleep 5000 + continue + } + + assertEquals(8, partitions.size()) + for (def j = 0; j < 8; j++) { + assertEquals(j < 4 ? 'HDD' : 'SSD', partitions.get(j).StorageMedium) + } + } + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org