This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit d07811258500093691f020d0c92966cdc26e4e27 Author: Drogon <jack.xsuper...@gmail.com> AuthorDate: Wed Jan 18 19:50:18 2023 +0800 (improvement)[bucket] Add auto bucket implement (#15250) --- fe/fe-core/src/main/cup/sql_parser.cup | 20 +- .../org/apache/doris/analysis/CreateTableStmt.java | 34 ++- .../apache/doris/analysis/DistributionDesc.java | 50 ++-- .../doris/analysis/HashDistributionDesc.java | 49 +--- .../doris/analysis/RandomDistributionDesc.java | 33 +-- .../org/apache/doris/catalog/DistributionInfo.java | 20 ++ .../main/java/org/apache/doris/catalog/Env.java | 6 + .../apache/doris/catalog/HashDistributionInfo.java | 26 +- .../java/org/apache/doris/catalog/OlapTable.java | 47 +++- .../doris/catalog/RandomDistributionInfo.java | 20 +- .../org/apache/doris/catalog/TableProperty.java | 8 + .../doris/clone/DynamicPartitionScheduler.java | 86 +++++- .../java/org/apache/doris/common/FeConstants.java | 4 + .../apache/doris/common/util/AutoBucketUtils.java | 98 +++++++ .../apache/doris/common/util/PropertyAnalyzer.java | 25 +- .../apache/doris/datasource/InternalCatalog.java | 36 ++- fe/fe-core/src/main/jflex/sql_scanner.flex | 1 + .../doris/common/util/AutoBucketUtilsTest.java | 294 +++++++++++++++++++++ .../org/apache/doris/utframe/UtFrameUtils.java | 83 +++++- .../suites/autobucket/test_autobucket.groovy | 41 +++ 20 files changed, 835 insertions(+), 146 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 83d105c734..12b4643552 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -45,6 +45,7 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.View; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.Version; import org.apache.doris.mysql.MysqlPassword; import org.apache.doris.load.loadv2.LoadTask; @@ -602,7 +603,8 @@ terminal String KW_WORK, KW_WRITE, KW_YEAR, - KW_MTMV; + KW_MTMV, + KW_AUTO; terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT; terminal BITAND, BITOR, BITXOR, BITNOT; @@ -2884,12 +2886,16 @@ opt_distribution ::= /* Hash distributed */ | KW_DISTRIBUTED KW_BY KW_HASH LPAREN ident_list:columns RPAREN opt_distribution_number:numDistribution {: - RESULT = new HashDistributionDesc(numDistribution, columns); + int bucketNum = (numDistribution == null ? -1 : numDistribution); + boolean is_auto_bucket = (numDistribution == null); + RESULT = new HashDistributionDesc(bucketNum, is_auto_bucket, columns); :} /* Random distributed */ | KW_DISTRIBUTED KW_BY KW_RANDOM opt_distribution_number:numDistribution {: - RESULT = new RandomDistributionDesc(numDistribution); + int bucketNum = (numDistribution == null ? -1 : numDistribution); + boolean is_auto_bucket = (numDistribution == null); + RESULT = new RandomDistributionDesc(bucketNum, is_auto_bucket); :} ; @@ -2907,13 +2913,17 @@ opt_rollup ::= opt_distribution_number ::= /* Empty */ {: - /* If distribution number is null, default distribution number is 10. */ - RESULT = 10; + /* If distribution number is null, default distribution number is FeConstants.default_bucket_num. */ + RESULT = FeConstants.default_bucket_num; :} | KW_BUCKETS INTEGER_LITERAL:numDistribution {: RESULT = numDistribution.intValue(); :} + | KW_BUCKETS KW_AUTO + {: + RESULT = null; + :} ; opt_keys ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 826aee4663..867cb6aa38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -31,6 +31,8 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.AutoBucketUtils; +import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; @@ -94,6 +96,32 @@ public class CreateTableStmt extends DdlStmt { engineNames.add("jdbc"); } + // if auto bucket auto bucket enable, rewrite distribution bucket num && + // set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true" + private static Map<String, String> maybeRewriteByAutoBucket(DistributionDesc distributionDesc, + Map<String, String> properties) throws AnalysisException { + if (distributionDesc == null || !distributionDesc.isAutoBucket()) { + return properties; + } + + // auto bucket is enable + Map<String, String> newProperties = properties; + if (newProperties == null) { + newProperties = new HashMap<String, String>(); + } + newProperties.put(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "true"); + + if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) { + distributionDesc.setBuckets(FeConstants.default_bucket_num); + } else { + long partitionSize = ParseUtil + .analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); + distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize)); + } + + return newProperties; + } + public CreateTableStmt() { // for persist tableName = new TableName(); @@ -260,7 +288,11 @@ public class CreateTableStmt extends DdlStmt { } @Override - public void analyze(Analyzer analyzer) throws UserException { + public void analyze(Analyzer analyzer) throws UserException, AnalysisException { + if (Strings.isNullOrEmpty(engineName) || engineName.equalsIgnoreCase("olap")) { + this.properties = maybeRewriteByAutoBucket(distributionDesc, properties); + } + super.analyze(analyzer); tableName.analyze(analyzer); FeNameFormat.checkTableName(tableName.getTbl()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java index 3e755c750c..02005a3985 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java @@ -22,57 +22,47 @@ import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; import org.apache.commons.lang.NotImplementedException; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.List; import java.util.Set; -public class DistributionDesc implements Writable { +public class DistributionDesc { protected DistributionInfoType type; + protected int numBucket; + protected boolean autoBucket; - public DistributionDesc() { + public DistributionDesc(int numBucket) { + this(numBucket, false); + } + public DistributionDesc(int numBucket, boolean autoBucket) { + this.numBucket = numBucket; + this.autoBucket = autoBucket; } - public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException { - throw new NotImplementedException(); + public int getBuckets() { + return numBucket; } - public String toSql() { - throw new NotImplementedException(); + public int setBuckets(int numBucket) { + return this.numBucket = numBucket; } - public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException { - throw new NotImplementedException(); + public boolean isAutoBucket() { + return autoBucket; } - public static DistributionDesc read(DataInput in) throws IOException { - DistributionInfoType type = DistributionInfoType.valueOf(Text.readString(in)); - if (type == DistributionInfoType.HASH) { - DistributionDesc desc = new HashDistributionDesc(); - desc.readFields(in); - return desc; - } else if (type == DistributionInfoType.RANDOM) { - DistributionDesc desc = new RandomDistributionDesc(); - desc.readFields(in); - return desc; - } else { - throw new IOException("Unknown distribution type: " + type); - } + public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException { + throw new NotImplementedException(); } - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, type.name()); + public String toSql() { + throw new NotImplementedException(); } - public void readFields(DataInput in) throws IOException { + public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException { throw new NotImplementedException(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java index 6d14fb3844..8b312a6ba7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java @@ -25,29 +25,25 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.common.io.Text; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.List; import java.util.Set; public class HashDistributionDesc extends DistributionDesc { - private int numBucket; private List<String> distributionColumnNames; - public HashDistributionDesc() { + public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) { + super(numBucket); type = DistributionInfoType.HASH; - distributionColumnNames = Lists.newArrayList(); + this.distributionColumnNames = distributionColumnNames; } - public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) { + public HashDistributionDesc(int numBucket, boolean autoBucket, List<String> distributionColumnNames) { + super(numBucket, autoBucket); type = DistributionInfoType.HASH; - this.numBucket = numBucket; this.distributionColumnNames = distributionColumnNames; } @@ -55,14 +51,10 @@ public class HashDistributionDesc extends DistributionDesc { return distributionColumnNames; } - public int getBuckets() { - return numBucket; - } - @Override public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException { if (numBucket <= 0) { - throw new AnalysisException("Number of hash distribution should be larger than zero."); + throw new AnalysisException("Number of hash distribution should be greater than zero."); } if (distributionColumnNames == null || distributionColumnNames.size() == 0) { throw new AnalysisException("Number of hash column should be larger than zero."); @@ -100,7 +92,11 @@ public class HashDistributionDesc extends DistributionDesc { i++; } stringBuilder.append(")\n"); - stringBuilder.append("BUCKETS ").append(numBucket); + if (autoBucket) { + stringBuilder.append("BUCKETS AUTO"); + } else { + stringBuilder.append("BUCKETS ").append(numBucket); + } return stringBuilder.toString(); } @@ -139,27 +135,8 @@ public class HashDistributionDesc extends DistributionDesc { } } - HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(numBucket, distributionColumns); + HashDistributionInfo hashDistributionInfo = + new HashDistributionInfo(numBucket, autoBucket, distributionColumns); return hashDistributionInfo; } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - out.writeInt(numBucket); - int count = distributionColumnNames.size(); - out.writeInt(count); - for (String colName : distributionColumnNames) { - Text.writeString(out, colName); - } - } - - public void readFields(DataInput in) throws IOException { - numBucket = in.readInt(); - int count = in.readInt(); - for (int i = 0; i < count; i++) { - distributionColumnNames.add(Text.readString(in)); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java index e445aa5bdf..3b89dfaff7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java @@ -23,28 +23,24 @@ import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.common.AnalysisException; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import java.util.List; import java.util.Set; public class RandomDistributionDesc extends DistributionDesc { - int numBucket; - - public RandomDistributionDesc() { + public RandomDistributionDesc(int numBucket) { + super(numBucket); type = DistributionInfoType.RANDOM; } - public RandomDistributionDesc(int numBucket) { + public RandomDistributionDesc(int numBucket, boolean autoBucket) { + super(numBucket, autoBucket); type = DistributionInfoType.RANDOM; - this.numBucket = numBucket; } @Override public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException { if (numBucket <= 0) { - throw new AnalysisException("Number of random distribution should be larger than zero."); + throw new AnalysisException("Number of random distribution should be greater than zero."); } } @@ -52,23 +48,18 @@ public class RandomDistributionDesc extends DistributionDesc { public String toSql() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("DISTRIBUTED BY RANDOM\n") - .append("BUCKETS ").append(numBucket); + .append("BUCKETS "); + if (autoBucket) { + stringBuilder.append("AUTO"); + } else { + stringBuilder.append(numBucket); + } return stringBuilder.toString(); } @Override public DistributionInfo toDistributionInfo(List<Column> columns) { - RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket); + RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket, autoBucket); return randomDistributionInfo; } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeInt(numBucket); - } - - public void readFields(DataInput in) throws IOException { - numBucket = in.readInt(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java index e7f66c42f1..227900905e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java @@ -40,12 +40,28 @@ public abstract class DistributionInfo implements Writable { @SerializedName(value = "type") protected DistributionInfoType type; + @SerializedName(value = "bucketNum") + protected int bucketNum; + + @SerializedName(value = "autoBucket") + protected boolean autoBucket; + public DistributionInfo() { // for persist } public DistributionInfo(DistributionInfoType type) { + this(type, 0, false); + } + + public DistributionInfo(DistributionInfoType type, int bucketNum) { + this(type, bucketNum, false); + } + + public DistributionInfo(DistributionInfoType type, int bucketNum, boolean autoBucket) { this.type = type; + this.bucketNum = bucketNum; + this.autoBucket = autoBucket; } public DistributionInfoType getType() { @@ -62,6 +78,10 @@ public abstract class DistributionInfo implements Writable { throw new NotImplementedException("not implemented"); } + public void markAutoBucket() { + autoBucket = true; + } + public DistributionDesc toDistributionDesc() { throw new NotImplementedException(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 2cfd39f70a..42368f40da 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2981,6 +2981,12 @@ public class Env { sb.append(olapTable.getCompressionType()).append("\""); } + // estimate_partition_size + if (!olapTable.getEstimatePartitionSize().equals("")) { + sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE).append("\" = \""); + sb.append(olapTable.getEstimatePartitionSize()).append("\""); + } + // unique key table with merge on write if (olapTable.getEnableUniqueKeyMergeOnWrite()) { sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE).append("\" = \""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java index a651b8c0ab..d746bd355f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java @@ -37,8 +37,6 @@ import java.util.Objects; public class HashDistributionInfo extends DistributionInfo { @SerializedName(value = "distributionColumns") private List<Column> distributionColumns; - @SerializedName(value = "bucketNum") - private int bucketNum; public HashDistributionInfo() { super(); @@ -46,9 +44,13 @@ public class HashDistributionInfo extends DistributionInfo { } public HashDistributionInfo(int bucketNum, List<Column> distributionColumns) { - super(DistributionInfoType.HASH); + super(DistributionInfoType.HASH, bucketNum); + this.distributionColumns = distributionColumns; + } + + public HashDistributionInfo(int bucketNum, boolean autoBucket, List<Column> distributionColumns) { + super(DistributionInfoType.HASH, bucketNum, autoBucket); this.distributionColumns = distributionColumns; - this.bucketNum = bucketNum; } public List<Column> getDistributionColumns() { @@ -65,6 +67,7 @@ public class HashDistributionInfo extends DistributionInfo { this.bucketNum = bucketNum; } + @Override public void write(DataOutput out) throws IOException { super.write(out); int columnCount = distributionColumns.size(); @@ -75,6 +78,7 @@ public class HashDistributionInfo extends DistributionInfo { out.writeInt(bucketNum); } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); int columnCount = in.readInt(); @@ -117,7 +121,7 @@ public class HashDistributionInfo extends DistributionInfo { for (Column col : distributionColumns) { distriColNames.add(col.getName()); } - DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, distriColNames); + DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, autoBucket, distriColNames); return distributionDesc; } @@ -133,7 +137,11 @@ public class HashDistributionInfo extends DistributionInfo { String colList = Joiner.on(", ").join(colNames); builder.append(colList); - builder.append(") BUCKETS ").append(bucketNum); + if (autoBucket) { + builder.append(") BUCKETS AUTO"); + } else { + builder.append(") BUCKETS ").append(bucketNum); + } return builder.toString(); } @@ -148,7 +156,11 @@ public class HashDistributionInfo extends DistributionInfo { } builder.append("]; "); - builder.append("bucket num: ").append(bucketNum).append("; "); + if (autoBucket) { + builder.append("bucket num: auto;"); + } else { + builder.append("bucket num: ").append(bucketNum).append(";"); + } return builder.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 87ba47bb27..c83acb1af4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -591,8 +591,8 @@ public class OlapTable extends Table { if (full) { return indexIdToMeta.get(indexId).getSchema(); } else { - return indexIdToMeta.get(indexId).getSchema().stream().filter(column -> - column.isVisible()).collect(Collectors.toList()); + return indexIdToMeta.get(indexId).getSchema().stream().filter(column -> column.isVisible()) + .collect(Collectors.toList()); } } @@ -1124,7 +1124,6 @@ public class OlapTable extends Table { return false; } - @Override public void write(DataOutput out) throws IOException { super.write(out); @@ -1274,6 +1273,9 @@ public class OlapTable extends Table { if (in.readBoolean()) { tableProperty = TableProperty.read(in); } + if (isAutoBucket()) { + defaultDistributionInfo.markAutoBucket(); + } // temp partitions tempPartitions = TempPartitions.read(in); @@ -1617,6 +1619,36 @@ public class OlapTable extends Table { tableProperty.buildInMemory(); } + public Boolean isAutoBucket() { + if (tableProperty != null) { + return tableProperty.isAutoBucket(); + } + return false; + } + + public void setIsAutoBucket(boolean isAutoBucket) { + if (tableProperty == null) { + tableProperty = new TableProperty(new HashMap<>()); + } + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, + Boolean.valueOf(isAutoBucket).toString()); + } + + public void setEstimatePartitionSize(String estimatePartitionSize) { + if (tableProperty == null) { + tableProperty = new TableProperty(new HashMap<>()); + } + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE, + estimatePartitionSize); + } + + public String getEstimatePartitionSize() { + if (tableProperty != null) { + return tableProperty.getEstimatePartitionSize(); + } + return ""; + } + public boolean getEnableLightSchemaChange() { if (tableProperty != null) { return tableProperty.getUseSchemaLightChange(); @@ -1870,11 +1902,11 @@ public class OlapTable extends Table { return false; } List<Expr> partitionExps = aggregateInfo.getPartitionExprs() != null - ? aggregateInfo.getPartitionExprs() : groupingExps; + ? aggregateInfo.getPartitionExprs() + : groupingExps; DistributionInfo distribution = getDefaultDistributionInfo(); if (distribution instanceof HashDistributionInfo) { - List<Column> distributeColumns = - ((HashDistributionInfo) distribution).getDistributionColumns(); + List<Column> distributeColumns = ((HashDistributionInfo) distribution).getDistributionColumns(); PartitionInfo partitionInfo = getPartitionInfo(); if (partitionInfo instanceof RangePartitionInfo) { List<Column> rangeColumns = partitionInfo.getPartitionColumns(); @@ -1882,8 +1914,7 @@ public class OlapTable extends Table { return false; } } - List<SlotRef> partitionSlots = - partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList()); + List<SlotRef> partitionSlots = partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList()); if (partitionSlots.contains(null)) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java index 2e11b5cfd4..31371e7799 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java @@ -29,21 +29,21 @@ import java.util.Objects; * Random partition. */ public class RandomDistributionInfo extends DistributionInfo { - - private int bucketNum; - public RandomDistributionInfo() { super(); } public RandomDistributionInfo(int bucketNum) { - super(DistributionInfoType.RANDOM); - this.bucketNum = bucketNum; + super(DistributionInfoType.RANDOM, bucketNum); + } + + public RandomDistributionInfo(int bucketNum, boolean autoBucket) { + super(DistributionInfoType.RANDOM, bucketNum, autoBucket); } @Override public DistributionDesc toDistributionDesc() { - DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum); + DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum, autoBucket); return distributionDesc; } @@ -55,15 +55,21 @@ public class RandomDistributionInfo extends DistributionInfo { @Override public String toSql() { StringBuilder builder = new StringBuilder(); - builder.append("DISTRIBUTED BY RANDOM BUCKETS ").append(bucketNum); + if (autoBucket) { + builder.append("DISTRIBUTED BY RANDOM() BUCKETS AUTO"); + } else { + builder.append("DISTRIBUTED BY RANDOM() BUCKETS ").append(bucketNum); + } return builder.toString(); } + @Override public void write(DataOutput out) throws IOException { super.write(out); out.writeInt(bucketNum); } + @Override public void readFields(DataInput in) throws IOException { super.readFields(in); bucketNum = in.readInt(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 860d404cd0..04db48067e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -244,6 +244,14 @@ public class TableProperty implements Writable { return isInMemory; } + public boolean isAutoBucket() { + return Boolean.parseBoolean(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "false")); + } + + public String getEstimatePartitionSize() { + return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE, ""); + } + public TStorageFormat getStorageFormat() { // Force convert all V1 table to V2 table if (TStorageFormat.V1 == storageFormat) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 754126caaa..0640a82cca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -32,6 +32,7 @@ import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.RangePartitionInfo; @@ -42,6 +43,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.AutoBucketUtils; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.PropertyAnalyzer; @@ -140,6 +142,75 @@ public class DynamicPartitionScheduler extends MasterDaemon { return defaultRuntimeInfo; } + // exponential moving average + private static long ema(ArrayList<Long> history, int period) { + double alpha = 2.0 / (period + 1); + double ema = history.get(0); + for (int i = 1; i < history.size(); i++) { + ema = alpha * history.get(i) + (1 - alpha) * ema; + } + return (long) ema; + } + + private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) { + if (historyPartitionsSize.size() < 2) { + return historyPartitionsSize.get(0); + } + + int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size(); + + boolean isAscending = true; + for (int i = 1; i < size; i++) { + if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) { + isAscending = false; + break; + } + } + + if (isAscending) { + ArrayList<Long> historyDeltaSize = Lists.newArrayList(); + for (int i = 1; i < size; i++) { + historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1)); + } + return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7); + } else { + return ema(historyPartitionsSize, 7); + } + } + + private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) { + if (!table.isAutoBucket()) { + return property.getBuckets(); + } + + List<Partition> partitions = Lists.newArrayList(); + RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo()); + List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet()); + idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint())); + for (Map.Entry<Long, PartitionItem> idToItem : idToItems) { + Partition partition = table.getPartition(idToItem.getKey()); + if (partition != null) { + partitions.add(partition); + } + } + + // auto bucket + if (partitions.size() == 0) { + return property.getBuckets(); + } + + ArrayList<Long> partitionSizeArray = Lists.newArrayList(); + for (Partition partition : partitions) { + if (partition.getVisibleVersion() >= 2) { + partitionSizeArray.add(partition.getDataSize()); + } + } + + // * 5 for uncompressed data + long uncompressedPartitionSize = getNextPartitionSize(partitionSizeArray) * 5; + return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize); + } + private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable, Column partitionColumn, String partitionFormat) { ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>(); @@ -231,21 +302,22 @@ public class DynamicPartitionScheduler extends MasterDaemon { String partitionName = dynamicPartitionProperty.getPrefix() + DynamicPartitionUtil.getFormattedPartitionName(dynamicPartitionProperty.getTimeZone(), - prevBorder, dynamicPartitionProperty.getTimeUnit()); + prevBorder, dynamicPartitionProperty.getTimeUnit()); SinglePartitionDesc rangePartitionDesc = new SinglePartitionDesc(true, partitionName, partitionKeyDesc, partitionProperties); DistributionDesc distributionDesc = null; DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); + int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable); if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List<String> distColumnNames = new ArrayList<>(); for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) { distColumnNames.add(distributionColumn.getName()); } - distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames); + distributionDesc = new HashDistributionDesc(bucketsNum, distColumnNames); } else { - distributionDesc = new RandomDistributionDesc(dynamicPartitionProperty.getBuckets()); + distributionDesc = new RandomDistributionDesc(bucketsNum); } // add partition according to partition desc and distribution desc addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false)); @@ -265,8 +337,8 @@ public class DynamicPartitionScheduler extends MasterDaemon { } private void setStoragePolicyProperty(HashMap<String, String> partitionProperties, - DynamicPartitionProperty property, ZonedDateTime now, int offset, - String storagePolicyName) { + DynamicPartitionProperty property, ZonedDateTime now, int offset, + String storagePolicyName) { partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, storagePolicyName); String baseTime = DynamicPartitionUtil.getPartitionRangeString( property, now, offset, DynamicPartitionUtil.DATETIME_FORMAT); @@ -341,8 +413,8 @@ public class DynamicPartitionScheduler extends MasterDaemon { dynamicPartitionProperty, range.lowerEndpoint().toString(), partitionFormat); String upperBorderOfReservedHistory = DynamicPartitionUtil.getHistoryPartitionRangeString( dynamicPartitionProperty, range.upperEndpoint().toString(), partitionFormat); - Range<PartitionKey> reservedHistoryPartitionKeyRange - = getClosedRange(db, olapTable, partitionColumn, partitionFormat, + Range<PartitionKey> reservedHistoryPartitionKeyRange = getClosedRange(db, olapTable, + partitionColumn, partitionFormat, lowerBorderOfReservedHistory, upperBorderOfReservedHistory); reservedHistoryPartitionKeyRangeList.add(reservedHistoryPartitionKeyRange); } catch (IllegalArgumentException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index dfde23e0a0..1b05795220 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -22,6 +22,10 @@ import org.apache.doris.persist.meta.FeMetaFormat; public class FeConstants { // Database and table's default configurations, we will never change them public static short default_replication_num = 3; + + // The default value of bucket setting && auto bucket without estimate_partition_size + public static int default_bucket_num = 10; + /* * Those two fields is responsible for determining the default key columns in duplicate table. * If user does not specify key of duplicate table in create table stmt, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java new file mode 100644 index 0000000000..49ec8c6bf7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.DiskInfo.DiskState; +import org.apache.doris.catalog.Env; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.collect.ImmutableMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class AutoBucketUtils { + private static Logger logger = LogManager.getLogger(AutoBucketUtils.class); + + static final long SIZE_100MB = 100 * 1024 * 1024L; + static final long SIZE_1GB = 1 * 1024 * 1024 * 1024L; + static final long SIZE_1TB = 1024 * SIZE_1GB; + + private static int getBENum() { + SystemInfoService infoService = Env.getCurrentSystemInfo(); + ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null); + + int activeBENum = 0; + for (Backend backend : backends.values()) { + if (backend.isAlive()) { + ++activeBENum; + } + } + return activeBENum; + } + + private static int getBucketsNumByBEDisks() { + SystemInfoService infoService = Env.getCurrentSystemInfo(); + ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null); + + int buckets = 0; + for (Backend backend : backends.values()) { + if (!backend.isLoadAvailable()) { + continue; + } + + ImmutableMap<String, DiskInfo> disks = backend.getDisks(); + for (DiskInfo diskInfo : disks.values()) { + if (diskInfo.getState() == DiskState.ONLINE && diskInfo.hasPathHash()) { + buckets += (diskInfo.getAvailableCapacityB() - 1) / (50 * SIZE_1GB) + 1; + } + } + } + return buckets; + } + + private static int convertParitionSizeToBucketsNum(long partitionSize) { + partitionSize /= 5; // for compression 5:1 + + // <= 100MB, 1 bucket + // <= 1GB, 2 buckets + // > 1GB, round to (size / 1G) + if (partitionSize <= SIZE_100MB) { + return 1; + } else if (partitionSize <= SIZE_1GB) { + return 2; + } else { + return (int) ((partitionSize - 1) / SIZE_1GB + 1); + } + } + + public static int getBucketsNum(long partitionSize) { + int bucketsNumByPartitionSize = convertParitionSizeToBucketsNum(partitionSize); + int bucketsNumByBE = getBucketsNumByBEDisks(); + int bucketsNum = Math.min(128, Math.min(bucketsNumByPartitionSize, bucketsNumByBE)); + int beNum = getBENum(); + logger.debug("AutoBucketsUtil: bucketsNumByPartitionSize {}, bucketsNumByBE {}, bucketsNum {}, beNum {}", + bucketsNumByPartitionSize, bucketsNumByBE, bucketsNum, beNum); + if (bucketsNum < bucketsNumByPartitionSize && bucketsNum < beNum) { + bucketsNum = beNum; + } + logger.debug("AutoBucketsUtil: final bucketsNum {}", bucketsNum); + return bucketsNum; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index ff59546543..c4fdf9b9d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -92,6 +92,10 @@ public class PropertyAnalyzer { public static final String PROPERTIES_INMEMORY = "in_memory"; + // _auto_bucket can only set in create table stmt rewrite bucket and can not be changed + public static final String PROPERTIES_AUTO_BUCKET = "_auto_bucket"; + public static final String PROPERTIES_ESTIMATE_PARTITION_SIZE = "estimate_partition_size"; + public static final String PROPERTIES_TABLET_TYPE = "tablet_type"; public static final String PROPERTIES_STRICT_RANGE = "strict_range"; @@ -131,7 +135,7 @@ public class PropertyAnalyzer { /** * check and replace members of DataProperty by properties. * - * @param properties key->value for members to change. + * @param properties key->value for members to change. * @param oldDataProperty old DataProperty * @return new DataProperty * @throws AnalysisException property has invalid key->value @@ -246,7 +250,8 @@ public class PropertyAnalyzer { throws AnalysisException { Short replicationNum = oldReplicationNum; String propKey = Strings.isNullOrEmpty(prefix) - ? PROPERTIES_REPLICATION_NUM : prefix + "." + PROPERTIES_REPLICATION_NUM; + ? PROPERTIES_REPLICATION_NUM + : prefix + "." + PROPERTIES_REPLICATION_NUM; if (properties != null && properties.containsKey(propKey)) { try { replicationNum = Short.valueOf(properties.get(propKey)); @@ -348,7 +353,7 @@ public class PropertyAnalyzer { } public static Set<String> analyzeBloomFilterColumns(Map<String, String> properties, List<Column> columns, - KeysType keysType) throws AnalysisException { + KeysType keysType) throws AnalysisException { Set<String> bfColumns = null; if (properties != null && properties.containsKey(PROPERTIES_BF_COLUMNS)) { bfColumns = Sets.newHashSet(); @@ -484,7 +489,7 @@ public class PropertyAnalyzer { } // analyzeCompressionType will parse the compression type from properties - public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException { + public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException { String compressionType = ""; if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) { compressionType = properties.get(PROPERTIES_COMPRESSION); @@ -546,6 +551,15 @@ public class PropertyAnalyzer { return defaultVal; } + public static String analyzeEstimatePartitionSize(Map<String, String> properties) { + String estimatePartitionSize = ""; + if (properties != null && properties.containsKey(PROPERTIES_ESTIMATE_PARTITION_SIZE)) { + estimatePartitionSize = properties.get(PROPERTIES_ESTIMATE_PARTITION_SIZE); + properties.remove(PROPERTIES_ESTIMATE_PARTITION_SIZE); + } + return estimatePartitionSize; + } + public static String analyzeStoragePolicy(Map<String, String> properties) throws AnalysisException { String storagePolicy = ""; if (properties != null && properties.containsKey(PROPERTIES_STORAGE_POLICY)) { @@ -761,7 +775,6 @@ public class PropertyAnalyzer { throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " must be `true` or `false`"); } - /** * Check the type property of the catalog props. */ @@ -777,5 +790,3 @@ public class PropertyAnalyzer { } } } - - diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 415d306f31..0466ac4af0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -241,7 +241,6 @@ public class InternalCatalog implements CatalogIf<Database> { return INTERNAL_CATALOG_NAME; } - @Override public List<String> getDbNames() { return Lists.newArrayList(fullNameToDb.keySet()); @@ -734,12 +733,12 @@ public class InternalCatalog implements CatalogIf<Database> { if (Strings.isNullOrEmpty(newPartitionName)) { if (olapTable.getPartition(partitionName) != null) { throw new DdlException("partition[" + partitionName + "] " - + "already exist in table[" + tableName + "]"); + + "already exist in table[" + tableName + "]"); } } else { if (olapTable.getPartition(newPartitionName) != null) { throw new DdlException("partition[" + newPartitionName + "] " - + "already exist in table[" + tableName + "]"); + + "already exist in table[" + tableName + "]"); } } @@ -932,7 +931,7 @@ public class InternalCatalog implements CatalogIf<Database> { } public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay, - long recycleTime) { + long recycleTime) { if (table.getType() == TableType.ELASTICSEARCH) { esRepository.deRegisterTable(table.getId()); } else if (table.getType() == TableType.OLAP) { @@ -964,7 +963,7 @@ public class InternalCatalog implements CatalogIf<Database> { } public void replayDropTable(Database db, long tableId, boolean isForceDrop, - Long recycleTime) throws MetaNotFoundException { + Long recycleTime) throws MetaNotFoundException { Table table = db.getTableOrMetaException(tableId); db.writeLock(); table.writeLock(); @@ -1002,10 +1001,10 @@ public class InternalCatalog implements CatalogIf<Database> { schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId()); } - Replica replica = - new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash, info.getDataSize(), - info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(), - info.getLastSuccessVersion()); + Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash, + info.getDataSize(), + info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(), + info.getLastSuccessVersion()); tablet.addReplica(replica); } @@ -1368,8 +1367,8 @@ public class InternalCatalog implements CatalogIf<Database> { if (distributionInfo.getType() == DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns(); - List<Column> defaultDistriCols - = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns(); + List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo) + .getDistributionColumns(); if (!newDistriCols.equals(defaultDistriCols)) { throw new DdlException( "Cannot assign hash distribution with different distribution cols. " + "default is: " @@ -1629,7 +1628,7 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.dropTempPartition(info.getPartitionName(), true); } else { Partition partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(), - info.isForceDrop()); + info.isForceDrop()); if (!info.isForceDrop() && partition != null && info.getRecycleTime() != 0) { Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime()); } @@ -1660,7 +1659,7 @@ public class InternalCatalog implements CatalogIf<Database> { DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc, Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes, boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType, - DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy, + DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy, IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction) throws DdlException { // create base index first. Preconditions.checkArgument(baseIndexId != -1); @@ -1920,6 +1919,17 @@ public class InternalCatalog implements CatalogIf<Database> { olapTable.setReplicationAllocation(replicaAlloc); + // set auto bucket + boolean isAutoBucket = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, + false); + olapTable.setIsAutoBucket(isAutoBucket); + + // set estimate partition size + if (isAutoBucket) { + String estimatePartitionSize = PropertyAnalyzer.analyzeEstimatePartitionSize(properties); + olapTable.setEstimatePartitionSize(estimatePartitionSize); + } + // set in memory boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false); diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 1f50df0105..7cbc2c896b 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -465,6 +465,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("write", new Integer(SqlParserSymbols.KW_WRITE)); keywordMap.put("year", new Integer(SqlParserSymbols.KW_YEAR)); keywordMap.put("mtmv", new Integer(SqlParserSymbols.KW_MTMV)); + keywordMap.put("auto", new Integer(SqlParserSymbols.KW_AUTO)); } // map from token id to token description diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java new file mode 100644 index 0000000000..fa61bd48a2 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ResultSetMetaData; +import org.apache.doris.qe.ShowResultSet; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.utframe.UtFrameUtils; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Mocked; +import org.hamcrest.MatcherAssert; +import org.hamcrest.core.StringContains; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +public class AutoBucketUtilsTest { + private static String databaseName = "AutoBucketUtilsTest"; + // use a unique dir so that it won't be conflict with other unit test which + // may also start a Mocked Frontend + private static String runningDirBase = "fe"; + private static String runningDir = runningDirBase + "/mocked/AutoBucketUtilsTest/" + UUID.randomUUID().toString() + + "/"; + private static List<Backend> backends = Lists.newArrayList(); + private static Random random = new Random(System.currentTimeMillis()); + private ConnectContext connectContext; + + // // create backends by be num, disk num, disk capacity + private static void createClusterWithBackends(int beNum, int diskNum, long diskCapacity) throws Exception { + UtFrameUtils.createDorisClusterWithMultiTag(runningDir, beNum); + // must set disk info, or the tablet scheduler won't work + backends = Env.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER); + for (Backend be : backends) { + setDiskInfos(diskNum, diskCapacity, be); + } + } + + private static ImmutableMap<Long, Backend> createBackends(int beNum, int diskNum, long diskCapacity) + throws Exception { + // must set disk info, or the tablet scheduler won't work + Map<Long, Backend> backends = Maps.newHashMap(); + for (int i = 0; i < beNum; ++i) { + Backend be = new Backend(10000 + i, "127.0.0." + (i + 1), 9000 + i); + be.setAlive(true); + backends.put(be.getId(), be); + } + for (Backend be : backends.values()) { + setDiskInfos(diskNum, diskCapacity, be); + } + return ImmutableMap.copyOf(backends); + } + + private static void setDiskInfos(int diskNum, long diskCapacity, Backend be) { + Map<String, TDisk> backendDisks = Maps.newHashMap(); + for (int i = 0; i < diskNum; ++i) { + TDisk disk = new TDisk(); + disk.setRootPath("/home/doris/" + UUID.randomUUID().toString()); + disk.setDiskTotalCapacity(diskCapacity); + disk.setDataUsedCapacity(0); + disk.setUsed(true); + disk.setDiskAvailableCapacity(disk.disk_total_capacity - disk.data_used_capacity); + disk.setPathHash(random.nextLong()); + disk.setStorageMedium(TStorageMedium.HDD); + backendDisks.put(disk.getRootPath(), disk); + } + be.updateDisks(backendDisks); + } + + private void expectations(Env env, EditLog editLog, SystemInfoService systemInfoService, + ImmutableMap<Long, Backend> backends) { + new Expectations() { + { + Env.getCurrentSystemInfo(); + minTimes = 0; + result = systemInfoService; + + systemInfoService.getBackendsInCluster(null); + minTimes = 0; + result = backends; + + Env.getCurrentEnv(); + minTimes = 0; + result = env; + + env.getEditLog(); + minTimes = 0; + result = editLog; + + editLog.logBackendStateChange((Backend) any); + minTimes = 0; + } + }; + } + + @Before + public void setUp() throws Exception { + FeConstants.runningUnitTest = true; + FeConstants.tablet_checker_interval_ms = 1000; + FeConstants.default_scheduler_interval_millisecond = 100; + Config.tablet_repair_delay_factor_second = 1; + connectContext = UtFrameUtils.createDefaultCtx(); + } + + @After + public void tearDown() { + Env.getCurrentEnv().clear(); + UtFrameUtils.cleanDorisFeDir(runningDirBase); + } + + private static String genTableNameWithoutDatabase(String estimatePartitionSize) { + return "size_" + estimatePartitionSize; + } + + private static String genTableName(String estimatePartitionSize) { + return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize); + } + + private static String genTableNameByTag(String estimatePartitionSize, String tag) { + return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize) + "_" + tag; + } + + private static String genCreateTableSql(String estimatePartitionSize) { + return "CREATE TABLE IF NOT EXISTS " + genTableName(estimatePartitionSize) + "\n" + + "(\n" + + "`user_id` LARGEINT NOT NULL\n" + + ")\n" + + "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n" + + "PROPERTIES (\n" + + "\"estimate_partition_size\" = \"" + estimatePartitionSize + "\",\n" + + "\"replication_num\" = \"1\"\n" + + ")"; + } + + private void createTable(String sql) throws Exception { + // create database first + UtFrameUtils.createDatabase(connectContext, databaseName); + UtFrameUtils.createTable(connectContext, sql); + } + + private void createTableBySize(String estimatePartitionSize) throws Exception { + createTable(genCreateTableSql(estimatePartitionSize)); + } + + private int getPartitionBucketNum(String tableName) throws Exception { + ShowResultSet result = UtFrameUtils.showPartitionsByName(connectContext, tableName); + ResultSetMetaData metaData = result.getMetaData(); + + for (int i = 0; i < metaData.getColumnCount(); ++i) { + if (metaData.getColumn(i).getName().equalsIgnoreCase("buckets")) { + return Integer.valueOf(result.getResultRows().get(0).get(i)); + } + } + + throw new Exception("No buckets column in show partitions result"); + } + + // also has checked create table && show partitions + @Test + public void testWithoutEstimatePartitionSize() throws Exception { + String tableName = genTableName(""); + String sql = "CREATE TABLE IF NOT EXISTS " + tableName + "\n" + + "(\n" + + "`user_id` LARGEINT NOT NULL\n" + + ")\n" + + "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")"; + + createClusterWithBackends(1, 1, 2000000000); + + createTable(sql); + ShowResultSet showCreateTableResult = UtFrameUtils.showCreateTableByName(connectContext, tableName); + String showCreateTableResultSql = showCreateTableResult.getResultRows().get(0).get(1); + MatcherAssert.assertThat(showCreateTableResultSql, + StringContains.containsString("DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n")); + int bucketNum = getPartitionBucketNum(tableName); + Assert.assertEquals(FeConstants.default_bucket_num, bucketNum); + } + + @Test + public void test100MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = AutoBucketUtils.SIZE_100MB; + ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test500MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = 5 * AutoBucketUtils.SIZE_100MB; + ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test1G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = AutoBucketUtils.SIZE_1GB; + ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(2, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test100G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = 100 * AutoBucketUtils.SIZE_1GB; + ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(20, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test500G_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB; + ImmutableMap<Long, Backend> backends = createBackends(3, 1, AutoBucketUtils.SIZE_1TB); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(63, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test500G_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB; + ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test500G_2(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB; + ImmutableMap<Long, Backend> backends = createBackends(1, 1, 100 * AutoBucketUtils.SIZE_1TB); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test1T_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = AutoBucketUtils.SIZE_1TB; + ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(128, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } + + @Test + public void test1T_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService) + throws Exception { + long estimatePartitionSize = AutoBucketUtils.SIZE_1TB; + ImmutableMap<Long, Backend> backends = createBackends(200, 7, 4 * AutoBucketUtils.SIZE_1TB); + expectations(env, editLog, systemInfoService, backends); + Assert.assertEquals(200, AutoBucketUtils.getBucketsNum(estimatePartitionSize)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index adc5a86a39..0161152904 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -18,14 +18,20 @@ package org.apache.doris.utframe; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.ShowCreateTableStmt; +import org.apache.doris.analysis.ShowPartitionsStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -35,6 +41,8 @@ import org.apache.doris.planner.Planner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.ShowExecutor; +import org.apache.doris.qe.ShowResultSet; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -62,6 +70,7 @@ import java.net.ServerSocket; import java.net.SocketException; import java.nio.channels.SocketChannel; import java.nio.file.Files; +import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; import java.util.UUID; @@ -265,7 +274,7 @@ public class UtFrameUtils { // start be MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -305,7 +314,7 @@ public class UtFrameUtils { datagramSocket.setReuseAddress(true); break; } catch (SocketException e) { - System.out.println("The port " + port + " is invalid and try another port."); + System.out.println("The port " + port + " is invalid and try another port."); } } catch (IOException e) { throw new IllegalStateException("Could not find a free TCP/IP port to start HTTP Server on"); @@ -355,8 +364,8 @@ public class UtFrameUtils { } public static String getStmtDigest(ConnectContext connectContext, String originStmt) throws Exception { - SqlScanner input = - new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode()); + SqlScanner input = new SqlScanner(new StringReader(originStmt), + connectContext.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); StatementBase statementBase = SqlParserUtils.getFirstStmt(parser); Preconditions.checkState(statementBase instanceof QueryStmt); @@ -370,4 +379,70 @@ public class UtFrameUtils { String realVNodeName = idx + ":V" + nodeName; return planResult.contains(realNodeName) || planResult.contains(realVNodeName); } + + public static void createDatabase(ConnectContext ctx, String db) throws Exception { + String createDbStmtStr = "CREATE DATABASE " + db; + CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt(createDbStmtStr, ctx); + Env.getCurrentEnv().createDb(createDbStmt); + } + + public static void createTable(ConnectContext ctx, String sql) throws Exception { + try { + createTables(ctx, sql); + } catch (ConcurrentModificationException e) { + e.printStackTrace(); + throw e; + } + } + + public static void createTables(ConnectContext ctx, String... sqls) throws Exception { + for (String sql : sqls) { + CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql, ctx); + Env.getCurrentEnv().createTable(stmt); + } + updateReplicaPathHash(); + } + + public static ShowResultSet showCreateTable(ConnectContext ctx, String sql) throws Exception { + ShowCreateTableStmt stmt = (ShowCreateTableStmt) parseAndAnalyzeStmt(sql, ctx); + ShowExecutor executor = new ShowExecutor(ctx, stmt); + return executor.execute(); + } + + public static ShowResultSet showCreateTableByName(ConnectContext ctx, String table) throws Exception { + String sql = "show create table " + table; + return showCreateTable(ctx, sql); + } + + public static ShowResultSet showPartitions(ConnectContext ctx, String sql) throws Exception { + ShowPartitionsStmt stmt = (ShowPartitionsStmt) parseAndAnalyzeStmt(sql, ctx); + ShowExecutor executor = new ShowExecutor(ctx, stmt); + return executor.execute(); + } + + public static ShowResultSet showPartitionsByName(ConnectContext ctx, String table) throws Exception { + String sql = "show partitions from " + table; + return showPartitions(ctx, sql); + } + + private static void updateReplicaPathHash() { + com.google.common.collect.Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex() + .getReplicaMetaTable(); + for (com.google.common.collect.Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) { + long beId = cell.getColumnKey(); + Backend be = Env.getCurrentSystemInfo().getBackend(beId); + if (be == null) { + continue; + } + Replica replica = cell.getValue(); + TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey()); + ImmutableMap<String, DiskInfo> diskMap = be.getDisks(); + for (DiskInfo diskInfo : diskMap.values()) { + if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) { + replica.setPathHash(diskInfo.getPathHash()); + break; + } + } + } + } } diff --git a/regression-test/suites/autobucket/test_autobucket.groovy b/regression-test/suites/autobucket/test_autobucket.groovy new file mode 100644 index 0000000000..29945e0f9a --- /dev/null +++ b/regression-test/suites/autobucket/test_autobucket.groovy @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_autobucket") { + sql "drop table if exists autobucket_test" + result = sql """ + CREATE TABLE `autobucket_test` ( + `user_id` largeint(40) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + result = sql "show create table autobucket_test" + assertTrue(result.toString().containsIgnoreCase("BUCKETS AUTO")) + + result = sql "show partitions from autobucket_test" + logger.info("${result}") + // XXX: buckets at pos(8), next maybe impl by sql meta + assertEquals(Integer.valueOf(result.get(0).get(8)), 10) + + sql "drop table if exists autobucket_test" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org