This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new c4d0fba713 Add storage policy for remote storage migration (#9997) c4d0fba713 is described below commit c4d0fba713934c7f441d52bd4e95d80660fe829f Author: pengxiangyu <diablo...@163.com> AuthorDate: Wed Jun 15 11:00:06 2022 +0800 Add storage policy for remote storage migration (#9997) --- .../Create/CREATE-POLICY.md | 41 ++++- .../Create/CREATE-POLICY.md | 41 ++++- fe/fe-core/src/main/cup/sql_parser.cup | 13 ++ .../apache/doris/analysis/CreatePolicyStmt.java | 51 ++++-- .../org/apache/doris/analysis/DropPolicyStmt.java | 25 ++- .../org/apache/doris/analysis/ShowPolicyStmt.java | 32 ++-- .../java/org/apache/doris/catalog/Resource.java | 6 +- .../java/org/apache/doris/catalog/ResourceMgr.java | 3 +- .../doris/catalog/StoragePolicyResource.java | 136 -------------- .../java/org/apache/doris/persist/EditLog.java | 9 +- .../org/apache/doris/persist/gson/GsonUtils.java | 4 +- .../main/java/org/apache/doris/policy/Policy.java | 18 +- .../java/org/apache/doris/policy/PolicyMgr.java | 3 + .../java/org/apache/doris/policy/RowPolicy.java | 15 ++ .../org/apache/doris/policy/StoragePolicy.java | 196 +++++++++++++++++++++ 15 files changed, 397 insertions(+), 196 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md index 7202f2a30b..73e7b240c7 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md @@ -32,11 +32,13 @@ CREATE POLICY ### Description -Create security policies and explain to view the rewritten SQL. +Create policies,such as: +1. Create security policies(ROW POLICY) and explain to view the rewritten SQL. +2. Create storage migration policy(STORAGE POLICY), used for cold and hot data transform -#### 行安全策略 -grammar: +#### Grammar: +1. ROW POLICY ```sql CREATE ROW POLICY test_row_policy_1 ON test.table1 AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2)); @@ -49,6 +51,21 @@ illustrate: - It is connected with AND between RESTRICTIVE AND PERMISSIVE - It cannot be created for users root and admin +2. STORAGE POLICY +```sql +CREATE STORAGE POLICY test_storage_policy_1 +PROPERTIES ("key"="value", ...); +``` +illustrate: +- PROPERTIES has such keys: + 1. storage_resource:storage resource name for policy + 2. cooldown_datetime:cool down time for tablet, can't be set with cooldown_ttl. + 3. cooldown_ttl:hot data stay time. The time cost between the time of tablet created and + the time of migrated to cold data, formatted as: + 1d:1 day + 1h:1 hour + 50000: 50000 second + ### Example 1. Create a set of row security policies @@ -76,6 +93,24 @@ illustrate: select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd') ``` +2. Create policy for storage + 1. Create policy on cooldown_datetime + ```sql + CREATE STORAGE POLICY testPolicy + PROPERTIES( + "storage_resource" = "s3", + "cooldown_datetime" = "2022-06-08 00:00:00" + ); + ``` + 2. Create policy on cooldown_ttl + ```sql + CREATE STORAGE POLICY testPolicy + PROPERTIES( + "storage_resource" = "s3", + "cooldown_ttl" = "1d" + ); + ``` + ### Keywords CREATE, POLICY diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md index f17db5b5c5..b3b9c7f041 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md @@ -32,16 +32,18 @@ CREATE POLICY ### Description -创建安全策略,explain 可以查看改写后的 SQL。 +创建策略,包含以下几种: -#### 行安全策略 -语法: +1. 创建安全策略(ROW POLICY),explain 可以查看改写后的 SQL。 +2. 创建数据迁移策略(STORAGE POLICY),用于冷热数据转换。 +#### 语法: + +1. ROW POLICY ```sql CREATE ROW POLICY test_row_policy_1 ON test.table1 AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2)); ``` - 参数说明: - filterType:RESTRICTIVE 将一组策略通过 AND 连接, PERMISSIVE 将一组策略通过 OR 连接 @@ -49,6 +51,20 @@ AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2)); - RESTRICTIVE 和 PERMISSIVE 之间通过 AND 连接的 - 不允许对 root 和 admin 用户创建 +2. STORAGE POLICY +```sql +CREATE STORAGE POLICY test_storage_policy_1 +PROPERTIES ("key"="value", ...); +``` +参数说明: +- PROPERTIES中需要指定资源的类型: + 1. storage_resource:指定策略使用的storage resource名称。 + 2. cooldown_datetime:热数据转为冷数据时间,不能与cooldown_ttl同时存在。 + 3. cooldown_ttl:热数据持续时间。从数据分片生成时开始计算,经过指定时间后转为冷数据。支持的格式: + 1d:1天 + 1h:1小时 + 50000: 50000秒 + ### Example 1. 创建一组行安全策略 @@ -75,6 +91,23 @@ AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2)); ```sql select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd') ``` +2. 创建数据迁移策略 + 1. 指定数据冷却时间创建数据迁移策略 + ```sql + CREATE STORAGE POLICY testPolicy + PROPERTIES( + "storage_resource" = "s3", + "cooldown_datetime" = "2022-06-08 00:00:00" + ); + ``` + 2. 指定热数据持续时间创建数据迁移策略 + ```sql + CREATE STORAGE POLICY testPolicy + PROPERTIES( + "storage_resource" = "s3", + "cooldown_ttl" = "1d" + ); + ``` ### Keywords diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 281c4c884f..aecbef4687 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1361,6 +1361,11 @@ create_stmt ::= {: RESULT = new CreatePolicyStmt(PolicyTypeEnum.ROW, ifNotExists, policyName, tbl, filterType, user, wherePredicate); :} + /* storage policy */ + | KW_CREATE KW_STORAGE KW_POLICY opt_if_not_exists:ifNotExists ident:policyName opt_properties:properties + {: + RESULT = new CreatePolicyStmt(PolicyTypeEnum.STORAGE, ifNotExists, policyName, properties); + :} ; channel_desc_list ::= @@ -2074,6 +2079,10 @@ drop_stmt ::= {: RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, user); :} + | KW_DROP KW_STORAGE KW_POLICY opt_if_exists:ifExists ident:policyName + {: + RESULT = new DropPolicyStmt(PolicyTypeEnum.STORAGE, ifExists, policyName, null, null); + :} ; // Recover statement @@ -2574,6 +2583,10 @@ show_stmt ::= {: RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, null); :} + | KW_SHOW KW_STORAGE KW_POLICY + {: + RESULT = new ShowPolicyStmt(PolicyTypeEnum.STORAGE, null); + :} ; show_param ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java index 3f8c80c9cf..e73d4c8f7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.PrintableMap; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.policy.FilterType; import org.apache.doris.policy.PolicyTypeEnum; @@ -28,6 +29,8 @@ import org.apache.doris.qe.ConnectContext; import lombok.Getter; +import java.util.Map; + /** * Create policy statement. * syntax: @@ -45,17 +48,20 @@ public class CreatePolicyStmt extends DdlStmt { private final String policyName; @Getter - private final TableName tableName; + private TableName tableName = null; @Getter - private final FilterType filterType; + private FilterType filterType = null; @Getter - private final UserIdentity user; + private UserIdentity user = null; @Getter private Expr wherePredicate; + @Getter + private Map<String, String> properties; + /** * Use for cup. **/ @@ -70,14 +76,31 @@ public class CreatePolicyStmt extends DdlStmt { this.wherePredicate = wherePredicate; } + /** + * Use for cup. + */ + public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName, + Map<String, String> properties) { + this.type = type; + this.ifNotExists = ifNotExists; + this.policyName = policyName; + this.properties = properties; + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - tableName.analyze(analyzer); - user.analyze(analyzer.getClusterName()); - if (user.isRootUser() || user.isAdminUser()) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt", - user.getQualifiedUser(), user.getHost(), tableName.getTbl()); + switch (type) { + case STORAGE: + break; + case ROW: + default: + tableName.analyze(analyzer); + user.analyze(analyzer.getClusterName()); + if (user.isRootUser() || user.isAdminUser()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt", + user.getQualifiedUser(), user.getHost(), tableName.getTbl()); + } } // check auth if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { @@ -92,8 +115,16 @@ public class CreatePolicyStmt extends DdlStmt { if (ifNotExists) { sb.append("IF NOT EXISTS"); } - sb.append(policyName).append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType) - .append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql()); + sb.append(policyName); + switch (type) { + case STORAGE: + sb.append(" PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")"); + break; + case ROW: + default: + sb.append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType) + .append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql()); + } return sb.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java index 87fb616c0a..c1fdd13aad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java @@ -54,9 +54,15 @@ public class DropPolicyStmt extends DdlStmt { @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - tableName.analyze(analyzer); - if (user != null) { - user.analyze(analyzer.getClusterName()); + switch (type) { + case STORAGE: + break; + case ROW: + default: + tableName.analyze(analyzer); + if (user != null) { + user.analyze(analyzer.getClusterName()); + } } // check auth if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { @@ -71,9 +77,16 @@ public class DropPolicyStmt extends DdlStmt { if (ifExists) { sb.append("IF EXISTS "); } - sb.append(policyName).append(" ON ").append(tableName.toSql()); - if (user != null) { - sb.append(" FOR ").append(user.getQualifiedUser()); + sb.append(policyName); + switch (type) { + case STORAGE: + break; + case ROW: + default: + sb.append(" ON ").append(tableName.toSql()); + if (user != null) { + sb.append(" FOR ").append(user.getQualifiedUser()); + } } return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java index f450952ba1..c7b84bcef7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java @@ -25,6 +25,8 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.policy.PolicyTypeEnum; +import org.apache.doris.policy.RowPolicy; +import org.apache.doris.policy.StoragePolicy; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; @@ -48,18 +50,6 @@ public class ShowPolicyStmt extends ShowStmt { this.user = user; } - private static final ShowResultSetMetaData ROW_META_DATA = - ShowResultSetMetaData.builder() - .addColumn(new Column("PolicyName", ScalarType.createVarchar(100))) - .addColumn(new Column("DbName", ScalarType.createVarchar(100))) - .addColumn(new Column("TableName", ScalarType.createVarchar(100))) - .addColumn(new Column("Type", ScalarType.createVarchar(20))) - .addColumn(new Column("FilterType", ScalarType.createVarchar(20))) - .addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535))) - .addColumn(new Column("User", ScalarType.createVarchar(20))) - .addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535))) - .build(); - @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); @@ -76,14 +66,26 @@ public class ShowPolicyStmt extends ShowStmt { public String toSql() { StringBuilder sb = new StringBuilder(); sb.append("SHOW ").append(type).append(" POLICY"); - if (user != null) { - sb.append(" FOR ").append(user); + switch (type) { + case STORAGE: + break; + case ROW: + default: + if (user != null) { + sb.append(" FOR ").append(user); + } } return sb.toString(); } @Override public ShowResultSetMetaData getMetaData() { - return ROW_META_DATA; + switch (type) { + case STORAGE: + return StoragePolicy.STORAGE_META_DATA; + case ROW: + default: + return RowPolicy.ROW_META_DATA; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java index 726139d4c4..d2c2e7f48f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java @@ -44,8 +44,7 @@ public abstract class Resource implements Writable { UNKNOWN, SPARK, ODBC_CATALOG, - S3, - STORAGE_POLICY; + S3; public static ResourceType fromString(String resourceType) { for (ResourceType type : ResourceType.values()) { @@ -96,9 +95,6 @@ public abstract class Resource implements Writable { case S3: resource = new S3Resource(name); break; - case STORAGE_POLICY: - resource = new StoragePolicyResource(name); - break; default: throw new DdlException("Unknown resource type: " + type); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java index 19d39e4dc5..d7d57272ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java @@ -72,8 +72,7 @@ public class ResourceMgr implements Writable { public void createResource(CreateResourceStmt stmt) throws DdlException { if (stmt.getResourceType() != ResourceType.SPARK && stmt.getResourceType() != ResourceType.ODBC_CATALOG - && stmt.getResourceType() != ResourceType.S3 - && stmt.getResourceType() != ResourceType.STORAGE_POLICY) { + && stmt.getResourceType() != ResourceType.S3) { throw new DdlException("Only support SPARK, ODBC_CATALOG and REMOTE_STORAGE resource."); } Resource resource = Resource.fromStmt(stmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StoragePolicyResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StoragePolicyResource.java deleted file mode 100644 index 285be637f7..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StoragePolicyResource.java +++ /dev/null @@ -1,136 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.catalog; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.proc.BaseProcResult; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.gson.annotations.SerializedName; - -import java.util.Map; - -/** - * Policy resource for olap table. - * Syntax: - * CREATE RESOURCE "storage_policy_name" - * PROPERTIES( - * "type"="storage_policy", - * "cooldown_datetime" = "2022-06-01", // time when data is transfter to medium - * "cooldown_ttl" = "1h", // data is transfter to medium after 1 hour - * "s3_resource" = "my_s3" // point to a s3 resource - * ); - */ -public class StoragePolicyResource extends Resource { - // required - private static final String STORAGE_RESOURCE = "storage_resource"; - // optional - private static final String COOLDOWN_DATETIME = "cooldown_datetime"; - private static final String COOLDOWN_TTL = "cooldown_ttl"; - - private static final String DEFAULT_COOLDOWN_DATETIME = "9999-01-01 00:00:00"; - private static final String DEFAULT_COOLDOWN_TTL = "1h"; - - @SerializedName(value = "properties") - private Map<String, String> properties; - - public StoragePolicyResource(String name) { - this(name, Maps.newHashMap()); - } - - public StoragePolicyResource(String name, Map<String, String> properties) { - super(name, ResourceType.STORAGE_POLICY); - this.properties = properties; - } - - public String getProperty(String propertyKey) { - return properties.get(propertyKey); - } - - @Override - protected void setProperties(Map<String, String> properties) throws DdlException { - Preconditions.checkState(properties != null); - this.properties = properties; - // check properties - // required - checkRequiredProperty(STORAGE_RESOURCE); - // optional - checkOptionalProperty(COOLDOWN_DATETIME, DEFAULT_COOLDOWN_DATETIME); - checkOptionalProperty(COOLDOWN_TTL, DEFAULT_COOLDOWN_TTL); - if (properties.containsKey(COOLDOWN_DATETIME) && properties.containsKey(COOLDOWN_TTL) - && !properties.get(COOLDOWN_DATETIME).isEmpty() && !properties.get(COOLDOWN_TTL).isEmpty()) { - throw new DdlException("Only one of [" + COOLDOWN_DATETIME + "] and [" + COOLDOWN_TTL - + "] can be specified in properties."); - } - } - - private void checkRequiredProperty(String propertyKey) throws DdlException { - String value = properties.get(propertyKey); - - if (Strings.isNullOrEmpty(value)) { - throw new DdlException("Missing [" + propertyKey + "] in properties."); - } - } - - private void checkOptionalProperty(String propertyKey, String defaultValue) { - this.properties.putIfAbsent(propertyKey, defaultValue); - } - - @Override - public void modifyProperties(Map<String, String> properties) throws DdlException { - if (properties.containsKey(COOLDOWN_DATETIME) && properties.containsKey(COOLDOWN_TTL) - && !properties.get(COOLDOWN_DATETIME).isEmpty() && !properties.get(COOLDOWN_TTL).isEmpty()) { - throw new DdlException("Only one of [" + COOLDOWN_DATETIME + "] and [" + COOLDOWN_TTL - + "] can be specified in properties."); - } - // modify properties - replaceIfEffectiveValue(this.properties, STORAGE_RESOURCE, properties.get(STORAGE_RESOURCE)); - replaceIfEffectiveValue(this.properties, COOLDOWN_DATETIME, properties.get(COOLDOWN_DATETIME)); - replaceIfEffectiveValue(this.properties, COOLDOWN_TTL, properties.get(COOLDOWN_TTL)); - } - - @Override - public void checkProperties(Map<String, String> properties) throws AnalysisException { - // check properties - Map<String, String> copiedProperties = Maps.newHashMap(properties); - copiedProperties.remove(STORAGE_RESOURCE); - copiedProperties.remove(COOLDOWN_DATETIME); - copiedProperties.remove(COOLDOWN_TTL); - - if (!copiedProperties.isEmpty()) { - throw new AnalysisException("Unknown policy resource properties: " + copiedProperties); - } - } - - @Override - public Map<String, String> getCopiedProperties() { - return Maps.newHashMap(properties); - } - - @Override - protected void getProcNodeData(BaseProcResult result) { - String lowerCaseType = type.name().toLowerCase(); - for (Map.Entry<String, String> entry : properties.entrySet()) { - result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue())); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index de497deb75..ca958128ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -65,7 +65,6 @@ import org.apache.doris.mysql.privilege.UserPropertyInfo; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.policy.DropPolicyLog; import org.apache.doris.policy.Policy; -import org.apache.doris.policy.RowPolicy; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -813,7 +812,7 @@ public class EditLog { break; } case OperationType.OP_CREATE_POLICY: { - RowPolicy log = (RowPolicy) journal.getData(); + Policy log = (Policy) journal.getData(); catalog.getPolicyMgr().replayCreate(log); break; } @@ -1426,11 +1425,7 @@ public class EditLog { } public void logCreatePolicy(Policy policy) { - if (policy instanceof RowPolicy) { - logEdit(OperationType.OP_CREATE_POLICY, policy); - } else { - LOG.error("invalid policy: " + policy.getType().name()); - } + logEdit(OperationType.OP_CREATE_POLICY, policy); } public void logDropPolicy(DropPolicyLog log) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 08a61c5bb0..1fcb098b96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -37,6 +37,7 @@ import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.canal.CanalSyncJob; import org.apache.doris.policy.Policy; import org.apache.doris.policy.RowPolicy; +import org.apache.doris.policy.StoragePolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -136,7 +137,8 @@ public class GsonUtils { // runtime adapter for class "Policy" private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory = RuntimeTypeAdapterFactory .of(Policy.class, "clazz") - .registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName()); + .registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName()) + .registerSubtype(StoragePolicy.class, StoragePolicy.class.getSimpleName()); // the builder of GSON instance. // Add any other adapters if necessary. diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java index 894bc463ad..332b9a0026 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java @@ -77,16 +77,20 @@ public abstract class Policy implements Writable, GsonPostProcessable { * Trans stmt to Policy. **/ public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException { - String curDb = stmt.getTableName().getDb(); - if (curDb == null) { - curDb = ConnectContext.get().getDatabase(); - } - Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb); - UserIdentity userIdent = stmt.getUser(); - userIdent.analyze(ConnectContext.get().getClusterName()); switch (stmt.getType()) { + case STORAGE: + StoragePolicy storagePolicy = new StoragePolicy(stmt.getType(), stmt.getPolicyName()); + storagePolicy.init(stmt.getProperties()); + return storagePolicy; case ROW: default: + String curDb = stmt.getTableName().getDb(); + if (curDb == null) { + curDb = ConnectContext.get().getDatabase(); + } + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb); + UserIdentity userIdent = stmt.getUser(); + userIdent.analyze(ConnectContext.get().getClusterName()); Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl()); return new RowPolicy(stmt.getType(), stmt.getPolicyName(), db.getId(), userIdent, stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java index 0746e191b1..ab567117aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -208,6 +208,9 @@ public class PolicyMgr implements Writable { long currentDbId = ConnectContext.get().getCurrentDbId(); Policy checkedPolicy = null; switch (showStmt.getType()) { + case STORAGE: + checkedPolicy = new StoragePolicy(); + break; case ROW: default: RowPolicy rowPolicy = new RowPolicy(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java index ab4b0a74c6..b989583a4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java @@ -23,10 +23,13 @@ import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.qe.ShowResultSetMetaData; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; @@ -45,6 +48,18 @@ import java.util.List; @Data public class RowPolicy extends Policy { + public static final ShowResultSetMetaData ROW_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("PolicyName", ScalarType.createVarchar(100))) + .addColumn(new Column("DbName", ScalarType.createVarchar(100))) + .addColumn(new Column("TableName", ScalarType.createVarchar(100))) + .addColumn(new Column("Type", ScalarType.createVarchar(20))) + .addColumn(new Column("FilterType", ScalarType.createVarchar(20))) + .addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535))) + .addColumn(new Column("User", ScalarType.createVarchar(20))) + .addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535))) + .build(); + private static final Logger LOG = LogManager.getLogger(RowPolicy.class); /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java new file mode 100644 index 0000000000..c5a48c4a7e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.policy; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.qe.ShowResultSetMetaData; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * Save policy for storage migration. + **/ +@Data +public class StoragePolicy extends Policy { + + public static final ShowResultSetMetaData STORAGE_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("PolicyName", ScalarType.createVarchar(100))) + .addColumn(new Column("Type", ScalarType.createVarchar(20))) + .addColumn(new Column("StorageResource", ScalarType.createVarchar(20))) + .addColumn(new Column("CooldownDatetime", ScalarType.createVarchar(20))) + .addColumn(new Column("CooldownTtl", ScalarType.createVarchar(20))) + .addColumn(new Column("properties", ScalarType.createVarchar(65535))) + .build(); + + private static final Logger LOG = LogManager.getLogger(StoragePolicy.class); + // required + private static final String STORAGE_RESOURCE = "storage_resource"; + // optional + private static final String COOLDOWN_DATETIME = "cooldown_datetime"; + private static final String COOLDOWN_TTL = "cooldown_ttl"; + + @SerializedName(value = "storageResource") + private String storageResource = null; + + @SerializedName(value = "cooldownDatetime") + private Date cooldownDatetime = null; + + @SerializedName(value = "cooldownTtl") + private String cooldownTtl = null; + + private Map<String, String> props; + + public StoragePolicy() {} + + /** + * Policy for Storage Migration. + * + * @param type PolicyType + * @param policyName policy name + * @param storageResource resource name for storage + * @param cooldownDatetime cool down time + * @param cooldownTtl cool down time cost after partition is created + */ + public StoragePolicy(final PolicyTypeEnum type, final String policyName, final String storageResource, + final Date cooldownDatetime, final String cooldownTtl) { + super(type, policyName); + this.storageResource = storageResource; + this.cooldownDatetime = cooldownDatetime; + this.cooldownTtl = cooldownTtl; + } + + /** + * Policy for Storage Migration. + * + * @param type PolicyType + * @param policyName policy name + */ + public StoragePolicy(final PolicyTypeEnum type, final String policyName) { + super(type, policyName); + } + + /** + * Init props for storage policy. + * + * @param props properties for storage policy + */ + public void init(final Map<String, String> props) throws AnalysisException { + if (props == null) { + throw new AnalysisException("properties config is required"); + } + checkRequiredProperty(props, STORAGE_RESOURCE); + this.storageResource = props.get(STORAGE_RESOURCE); + boolean hasCooldownDatetime = false; + boolean hasCooldownTtl = false; + if (props.containsKey(COOLDOWN_DATETIME)) { + hasCooldownDatetime = true; + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + try { + this.cooldownDatetime = df.parse(props.get(COOLDOWN_DATETIME)); + } catch (ParseException e) { + throw new AnalysisException(String.format("cooldown_datetime format error: %s", + props.get(COOLDOWN_DATETIME)), e); + } + } + if (props.containsKey(COOLDOWN_TTL)) { + hasCooldownTtl = true; + this.cooldownTtl = props.get(COOLDOWN_TTL); + } + if (hasCooldownDatetime && hasCooldownTtl) { + throw new AnalysisException(COOLDOWN_DATETIME + " and " + COOLDOWN_TTL + " can't be set together."); + } + if (!hasCooldownDatetime && !hasCooldownTtl) { + throw new AnalysisException(COOLDOWN_DATETIME + " or " + COOLDOWN_TTL + " must be set"); + } + if (!Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) { + throw new AnalysisException("storage resource doesn't exist: " + this.storageResource); + } + } + + /** + * Use for SHOW POLICY. + **/ + public List<String> getShowInfo() throws AnalysisException { + String props = ""; + if (Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) { + props = Catalog.getCurrentCatalog().getResourceMgr().getResource(this.storageResource).toString(); + } + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + return Lists.newArrayList(this.policyName, this.type.name(), this.storageResource, + df.format(this.cooldownDatetime), this.cooldownTtl, props); + } + + @Override + public void gsonPostProcess() throws IOException {} + + @Override + public StoragePolicy clone() { + return new StoragePolicy(this.type, this.policyName, this.storageResource, + this.cooldownDatetime, this.cooldownTtl); + } + + @Override + public boolean matchPolicy(Policy checkedPolicyCondition) { + if (!(checkedPolicyCondition instanceof StoragePolicy)) { + return false; + } + StoragePolicy storagePolicy = (StoragePolicy) checkedPolicyCondition; + return checkMatched(storagePolicy.getType(), storagePolicy.getPolicyName()); + } + + @Override + public boolean matchPolicy(DropPolicyLog checkedDropCondition) { + return checkMatched(checkedDropCondition.getType(), checkedDropCondition.getPolicyName()); + } + + /** + * check required key in properties. + * + * @param props properties for storage policy + * @param propertyKey key for property + * @throws AnalysisException exception for properties error + */ + private void checkRequiredProperty(final Map<String, String> props, String propertyKey) throws AnalysisException { + String value = props.get(propertyKey); + + if (Strings.isNullOrEmpty(value)) { + throw new AnalysisException("Missing [" + propertyKey + "] in properties."); + } + } + + @Override + public boolean isInvalid() { + return false; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org