This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new da33a48f39 [refactor](policy) Refactor the hierarchy of Policy. (#9786) da33a48f39 is described below commit da33a48f396f33983502fd74aaea9b5aa6384077 Author: pengxiangyu <diablo...@163.com> AuthorDate: Sat Jun 4 11:29:09 2022 +0800 [refactor](policy) Refactor the hierarchy of Policy. (#9786) The RowPolicy extends Policy --- .../org/apache/doris/common/FeMetaVersion.java | 2 +- .../apache/doris/analysis/CreatePolicyStmt.java | 4 +- .../org/apache/doris/analysis/StmtRewriter.java | 4 +- .../java/org/apache/doris/persist/EditLog.java | 9 +- .../org/apache/doris/persist/gson/GsonUtils.java | 9 + .../main/java/org/apache/doris/policy/Policy.java | 111 ++++----- .../java/org/apache/doris/policy/PolicyMgr.java | 263 ++++++++++----------- .../org/apache/doris/policy/PolicyTypeEnum.java | 2 +- .../doris/policy/{Policy.java => RowPolicy.java} | 133 ++++++----- .../java/org/apache/doris/policy/PolicyTest.java | 42 ++++ 10 files changed, 301 insertions(+), 278 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 6158431607..70cacd7a1c 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -40,7 +40,7 @@ public final class FeMetaVersion { public static final int VERSION_109 = 109; // For routine load user info public static final int VERSION_110 = 110; - // NOTE: when increment meta version, should assign the latest version to VERSION_CURRENT + // note: when increment meta version, should assign the latest version to VERSION_CURRENT public static final int VERSION_CURRENT = VERSION_110; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java index b8771575da..3f8c80c9cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java @@ -59,8 +59,8 @@ public class CreatePolicyStmt extends DdlStmt { /** * Use for cup. **/ - public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName, TableName tableName, String filterType, - UserIdentity user, Expr wherePredicate) { + public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName, TableName tableName, + String filterType, UserIdentity user, Expr wherePredicate) { this.type = type; this.ifNotExists = ifNotExists; this.policyName = policyName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java index 1700c64ae8..42a0188a84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java @@ -27,7 +27,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.TableAliasGenerator; import org.apache.doris.common.UserException; -import org.apache.doris.policy.Policy; +import org.apache.doris.policy.RowPolicy; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; @@ -1187,7 +1187,7 @@ public class StmtRewriter { Database db = currentCatalog.getDbOrAnalysisException(dbName); long dbId = db.getId(); long tableId = table.getId(); - Policy matchPolicy = currentCatalog.getPolicyMgr().getMatchRowPolicy(dbId, tableId, user); + RowPolicy matchPolicy = currentCatalog.getPolicyMgr().getMatchTablePolicy(dbId, tableId, user); if (matchPolicy == null) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index ca958128ab..de497deb75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -65,6 +65,7 @@ import org.apache.doris.mysql.privilege.UserPropertyInfo; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.policy.DropPolicyLog; import org.apache.doris.policy.Policy; +import org.apache.doris.policy.RowPolicy; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -812,7 +813,7 @@ public class EditLog { break; } case OperationType.OP_CREATE_POLICY: { - Policy log = (Policy) journal.getData(); + RowPolicy log = (RowPolicy) journal.getData(); catalog.getPolicyMgr().replayCreate(log); break; } @@ -1425,7 +1426,11 @@ public class EditLog { } public void logCreatePolicy(Policy policy) { - logEdit(OperationType.OP_CREATE_POLICY, policy); + if (policy instanceof RowPolicy) { + logEdit(OperationType.OP_CREATE_POLICY, policy); + } else { + LOG.error("invalid policy: " + policy.getType().name()); + } } public void logDropPolicy(DropPolicyLog log) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 7f5c4170c3..08a61c5bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -35,6 +35,8 @@ import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.canal.CanalSyncJob; +import org.apache.doris.policy.Policy; +import org.apache.doris.policy.RowPolicy; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -130,6 +132,12 @@ public class GsonUtils { .of(LoadJobStateUpdateInfo.class, "clazz") .registerSubtype(SparkLoadJobStateUpdateInfo.class, SparkLoadJobStateUpdateInfo.class.getSimpleName()); + + // runtime adapter for class "Policy" + private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory = RuntimeTypeAdapterFactory + .of(Policy.class, "clazz") + .registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName()); + // the builder of GSON instance. // Add any other adapters if necessary. private static final GsonBuilder GSON_BUILDER = new GsonBuilder() @@ -144,6 +152,7 @@ public class GsonUtils { .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory) .registerTypeAdapterFactory(syncJobTypeAdapterFactory) .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory) + .registerTypeAdapterFactory(policyTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java index d617dda050..894bc463ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java @@ -18,9 +18,6 @@ package org.apache.doris.policy; import org.apache.doris.analysis.CreatePolicyStmt; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -28,69 +25,53 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; -import lombok.AllArgsConstructor; import lombok.Data; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.StringReader; import java.util.List; /** - * Save policy for filtering data. + * Base class for Policy. **/ @Data -@AllArgsConstructor -public class Policy implements Writable, GsonPostProcessable { - - public static final String ROW_POLICY = "ROW"; +public abstract class Policy implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(Policy.class); - @SerializedName(value = "dbId") - private long dbId; - - @SerializedName(value = "tableId") - private long tableId; - - @SerializedName(value = "policyName") - private String policyName; + @SerializedName(value = "policyId") + protected long policyId = -1; - /** - * ROW. - **/ @SerializedName(value = "type") - private PolicyTypeEnum type; + protected PolicyTypeEnum type = null; - /** - * PERMISSIVE | RESTRICTIVE, If multiple types exist, the last type prevails. - **/ - @SerializedName(value = "filterType") - private final FilterType filterType; - - private Expr wherePredicate; + @SerializedName(value = "policyName") + protected String policyName = null; - /** - * Policy bind user. - **/ - @SerializedName(value = "user") - private final UserIdentity user; + public Policy() { + policyId = Catalog.getCurrentCatalog().getNextId(); + } /** - * Use for Serialization/deserialization. - **/ - @SerializedName(value = "originStmt") - private String originStmt; + * Base class for Policy. + * + * @param type policy type + * @param policyName policy name + */ + public Policy(final PolicyTypeEnum type, final String policyName) { + policyId = Catalog.getCurrentCatalog().getNextId(); + this.type = type; + this.policyName = policyName; + } /** * Trans stmt to Policy. @@ -101,22 +82,22 @@ public class Policy implements Writable, GsonPostProcessable { curDb = ConnectContext.get().getDatabase(); } Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb); - Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl()); UserIdentity userIdent = stmt.getUser(); userIdent.analyze(ConnectContext.get().getClusterName()); - return new Policy(db.getId(), table.getId(), stmt.getPolicyName(), stmt.getType(), stmt.getFilterType(), - stmt.getWherePredicate(), userIdent, stmt.getOrigStmt().originStmt); + switch (stmt.getType()) { + case ROW: + default: + Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl()); + return new RowPolicy(stmt.getType(), stmt.getPolicyName(), db.getId(), userIdent, + stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(), + stmt.getWherePredicate()); + } } /** * Use for SHOW POLICY. **/ - public List<String> getShowInfo() throws AnalysisException { - Database database = Catalog.getCurrentCatalog().getDbOrAnalysisException(this.dbId); - Table table = database.getTableOrAnalysisException(this.tableId); - return Lists.newArrayList(this.policyName, database.getFullName(), table.getName(), this.type.name(), - this.filterType.name(), this.wherePredicate.toSql(), this.user.getQualifiedUser(), this.originStmt); - } + public abstract List<String> getShowInfo() throws AnalysisException; @Override public void write(DataOutput out) throws IOException { @@ -124,31 +105,27 @@ public class Policy implements Writable, GsonPostProcessable { } /** - * Read policy from file. + * Read Policy from file. **/ public static Policy read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, Policy.class); } - @Override - public void gsonPostProcess() throws IOException { - if (wherePredicate != null) { - return; - } - try { - SqlScanner input = new SqlScanner(new StringReader(originStmt), 0L); - SqlParser parser = new SqlParser(input); - CreatePolicyStmt stmt = (CreatePolicyStmt) SqlParserUtils.getFirstStmt(parser); - wherePredicate = stmt.getWherePredicate(); - } catch (Exception e) { - throw new IOException("policy parse originStmt error", e); - } + protected boolean checkMatched(PolicyTypeEnum type, String policyName) { + return (type == null || type.equals(this.type)) + && (policyName == null || StringUtils.equals(policyName, this.policyName)); } - @Override - public Policy clone() { - return new Policy(this.dbId, this.tableId, this.policyName, this.type, this.filterType, this.wherePredicate, - this.user, this.originStmt); + // it is used to check whether this policy is in PolicyMgr + public boolean matchPolicy(Policy checkedPolicyCondition) { + return checkMatched(checkedPolicyCondition.getType(), checkedPolicyCondition.getPolicyName()); + } + + public boolean matchPolicy(DropPolicyLog checkedDropPolicyLogCondition) { + return checkMatched(checkedDropPolicyLogCondition.getType(), checkedDropPolicyLogCondition.getPolicyName()); } + + public abstract boolean isInvalid(); + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java index c3a1bebbf8..0746e191b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.CompoundPredicate; import org.apache.doris.analysis.CreatePolicyStmt; import org.apache.doris.analysis.DropPolicyStmt; import org.apache.doris.analysis.ShowPolicyStmt; -import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; @@ -37,7 +36,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -61,14 +59,14 @@ public class PolicyMgr implements Writable { private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - @SerializedName(value = "dbIdToPolicyMap") - private Map<Long, List<Policy>> dbIdToPolicyMap = Maps.newConcurrentMap(); + @SerializedName(value = "typeToPolicyMap") + private Map<PolicyTypeEnum, List<Policy>> typeToPolicyMap = Maps.newConcurrentMap(); /** * Cache merge policy for match. * key:dbId:tableId-type-user **/ - private Map<Long, Map<String, Policy>> dbIdToMergePolicyMap = Maps.newConcurrentMap(); + private Map<Long, Map<String, RowPolicy>> dbIdToMergeTablePolicyMap = Maps.newConcurrentMap(); private Set<String> userPolicySet = Sets.newConcurrentHashSet(); @@ -95,8 +93,7 @@ public class PolicyMgr implements Writable { Policy policy = Policy.fromCreateStmt(stmt); writeLock(); try { - if (existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(), - policy.getPolicyName(), policy.getUser())) { + if (existPolicy(policy)) { if (stmt.isIfNotExists()) { return; } @@ -113,45 +110,47 @@ public class PolicyMgr implements Writable { * Drop policy through stmt. **/ public void dropPolicy(DropPolicyStmt stmt) throws DdlException, AnalysisException { - DropPolicyLog policy = DropPolicyLog.fromDropStmt(stmt); + DropPolicyLog dropPolicyLog = DropPolicyLog.fromDropStmt(stmt); writeLock(); try { - if (!existPolicy(policy.getDbId(), policy.getTableId(), policy.getType(), - policy.getPolicyName(), policy.getUser())) { + if (!existPolicy(dropPolicyLog)) { if (stmt.isIfExists()) { return; } - throw new DdlException("the policy " + policy.getPolicyName() + " not exist"); + throw new DdlException("the policy " + dropPolicyLog.getPolicyName() + " not exist"); } - unprotectedDrop(policy); - Catalog.getCurrentCatalog().getEditLog().logDropPolicy(policy); + unprotectedDrop(dropPolicyLog); + Catalog.getCurrentCatalog().getEditLog().logDropPolicy(dropPolicyLog); } finally { writeUnlock(); } } + /** + * Check whether this user has policy. + * + * @param user user who has policy + * @return exist or not + */ public boolean existPolicy(String user) { return userPolicySet.contains(user); } - private boolean existPolicy(long dbId, long tableId, PolicyTypeEnum type, String policyName, UserIdentity user) { - List<Policy> policies = getDbPolicies(dbId); - return policies.stream().anyMatch(policy -> matchPolicy(policy, tableId, type, policyName, user)); + private boolean existPolicy(Policy checkedPolicy) { + List<Policy> policies = getPoliciesByType(checkedPolicy.getType()); + return policies.stream().anyMatch(policy -> policy.matchPolicy(checkedPolicy)); } - private List<Policy> getDbPolicies(long dbId) { - if (dbIdToPolicyMap == null) { - return new ArrayList<>(); - } - return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>()); + private boolean existPolicy(DropPolicyLog checkedDropPolicy) { + List<Policy> policies = getPoliciesByType(checkedDropPolicy.getType()); + return policies.stream().anyMatch(policy -> policy.matchPolicy(checkedDropPolicy)); } - private List<Policy> getDbUserPolicies(long dbId, String user) { - if (dbIdToPolicyMap == null) { + private List<Policy> getPoliciesByType(PolicyTypeEnum policyType) { + if (typeToPolicyMap == null) { return new ArrayList<>(); } - return dbIdToPolicyMap.getOrDefault(dbId, new ArrayList<>()).stream() - .filter(p -> p.getUser().getQualifiedUser().equals(user)).collect(Collectors.toList()); + return typeToPolicyMap.getOrDefault(policyType, new ArrayList<>()); } public void replayCreate(Policy policy) { @@ -163,12 +162,10 @@ public class PolicyMgr implements Writable { if (policy == null) { return; } - long dbId = policy.getDbId(); - List<Policy> dbPolicies = getDbPolicies(dbId); + List<Policy> dbPolicies = getPoliciesByType(policy.getType()); dbPolicies.add(policy); - dbIdToPolicyMap.put(dbId, dbPolicies); - updateMergePolicyMap(dbId); - userPolicySet.add(policy.getUser().getQualifiedUser()); + typeToPolicyMap.put(policy.getType(), dbPolicies); + updateMergeTablePolicyMap(); } public void replayDrop(DropPolicyLog log) { @@ -178,42 +175,26 @@ public class PolicyMgr implements Writable { private void unprotectedDrop(DropPolicyLog log) { long dbId = log.getDbId(); - List<Policy> policies = getDbPolicies(dbId); - policies.removeIf(p -> matchPolicy(p, log.getTableId(), log.getType(), log.getPolicyName(), log.getUser())); - dbIdToPolicyMap.put(dbId, policies); - updateMergePolicyMap(dbId); - if (log.getUser() == null) { - updateAllUserPolicySet(); - } else { - String user = log.getUser().getQualifiedUser(); - if (!existUserPolicy(user)) { - userPolicySet.remove(user); - } - } - } - - private boolean matchPolicy(Policy policy, long tableId, PolicyTypeEnum type, - String policyName, UserIdentity user) { - return policy.getTableId() == tableId - && policy.getType().equals(type) - && StringUtils.equals(policy.getPolicyName(), policyName) - && (user == null || StringUtils.equals(policy.getUser().getQualifiedUser(), user.getQualifiedUser())); + List<Policy> policies = getPoliciesByType(log.getType()); + policies.removeIf(policy -> policy.matchPolicy(log)); + typeToPolicyMap.put(log.getType(), policies); + updateMergeTablePolicyMap(); } /** * Match row policy and return it. **/ - public Policy getMatchRowPolicy(long dbId, long tableId, String user) { + public RowPolicy getMatchTablePolicy(long dbId, long tableId, String user) { readLock(); try { - if (!dbIdToMergePolicyMap.containsKey(dbId)) { + if (!dbIdToMergeTablePolicyMap.containsKey(dbId)) { return null; } - String key = Joiner.on("-").join(tableId, Policy.ROW_POLICY, user); - if (!dbIdToMergePolicyMap.get(dbId).containsKey(key)) { + String key = Joiner.on("-").join(tableId, PolicyTypeEnum.ROW.name(), user); + if (!dbIdToMergeTablePolicyMap.get(dbId).containsKey(key)) { return null; } - return dbIdToMergePolicyMap.get(dbId).get(key); + return dbIdToMergeTablePolicyMap.get(dbId).get(key); } finally { readUnlock(); } @@ -224,16 +205,25 @@ public class PolicyMgr implements Writable { **/ public ShowResultSet showPolicy(ShowPolicyStmt showStmt) throws AnalysisException { List<List<String>> rows = Lists.newArrayList(); - List<Policy> policies; long currentDbId = ConnectContext.get().getCurrentDbId(); - if (showStmt.getUser() == null) { - policies = Catalog.getCurrentCatalog().getPolicyMgr().getDbPolicies(currentDbId); - } else { - policies = Catalog.getCurrentCatalog().getPolicyMgr() - .getDbUserPolicies(currentDbId, showStmt.getUser().getQualifiedUser()); + Policy checkedPolicy = null; + switch (showStmt.getType()) { + case ROW: + default: + RowPolicy rowPolicy = new RowPolicy(); + if (showStmt.getUser() != null) { + rowPolicy.setUser(showStmt.getUser()); + } + if (currentDbId != -1) { + rowPolicy.setDbId(currentDbId); + } + checkedPolicy = rowPolicy; } + final Policy finalCheckedPolicy = checkedPolicy; + List<Policy> policies = typeToPolicyMap.getOrDefault(showStmt.getType(), new ArrayList<>()).stream() + .filter(p -> p.matchPolicy(finalCheckedPolicy)).collect(Collectors.toList()); for (Policy policy : policies) { - if (policy.getWherePredicate() == null) { + if (policy.isInvalid()) { continue; } rows.add(policy.getShowInfo()); @@ -241,94 +231,87 @@ public class PolicyMgr implements Writable { return new ShowResultSet(showStmt.getMetaData(), rows); } - private void updateAllMergePolicyMap() { - dbIdToPolicyMap.forEach((dbId, policies) -> updateMergePolicyMap(dbId)); - } - - private void updateAllUserPolicySet() { - userPolicySet.clear(); - dbIdToPolicyMap.forEach((dbId, policies) -> - policies.forEach(policy -> userPolicySet.add(policy.getUser().getQualifiedUser()))); - } - - - private boolean existUserPolicy(String user) { - readLock(); - try { - for (Map<String, Policy> policies : dbIdToMergePolicyMap.values()) { - for (Policy policy : policies.values()) { - if (policy.getUser().getQualifiedUser().equals(user)) { - return true; - } - } - } - return false; - } finally { - readUnlock(); - } - - } - /** * The merge policy cache needs to be regenerated after the update. **/ - private void updateMergePolicyMap(long dbId) { + private void updateMergeTablePolicyMap() { readLock(); try { - if (!dbIdToPolicyMap.containsKey(dbId)) { + if (!typeToPolicyMap.containsKey(PolicyTypeEnum.ROW)) { return; } - List<Policy> policies = dbIdToPolicyMap.get(dbId); - Map<String, Policy> andMap = new HashMap<>(); - Map<String, Policy> orMap = new HashMap<>(); - for (Policy policy : policies) { - // read from json, need set isAnalyzed - policy.getUser().setIsAnalyzed(); - String key = - Joiner.on("-").join(policy.getTableId(), policy.getType(), policy.getUser().getQualifiedUser()); - // merge wherePredicate - if (CompoundPredicate.Operator.AND.equals(policy.getFilterType().getOp())) { - Policy frontPolicy = andMap.get(key); - if (frontPolicy == null) { - andMap.put(key, policy.clone()); - } else { - frontPolicy.setWherePredicate( + List<Policy> allPolicies = typeToPolicyMap.get(PolicyTypeEnum.ROW); + Map<Long, List<RowPolicy>> policyMap = new HashMap<>(); + dbIdToMergeTablePolicyMap.clear(); + userPolicySet.clear(); + for (Policy policy : allPolicies) { + if (!(policy instanceof RowPolicy)) { + continue; + } + RowPolicy rowPolicy = (RowPolicy) policy; + if (!policyMap.containsKey(rowPolicy.getDbId())) { + policyMap.put(rowPolicy.getDbId(), new ArrayList<>()); + } + policyMap.get(rowPolicy.getDbId()).add(rowPolicy); + if (rowPolicy.getUser() != null) { + userPolicySet.add(rowPolicy.getUser().getQualifiedUser()); + } + } + for (Map.Entry<Long, List<RowPolicy>> entry : policyMap.entrySet()) { + List<RowPolicy> policies = entry.getValue(); + Map<String, RowPolicy> andMap = new HashMap<>(); + Map<String, RowPolicy> orMap = new HashMap<>(); + for (RowPolicy rowPolicy : policies) { + // read from json, need set isAnalyzed + rowPolicy.getUser().setIsAnalyzed(); + String key = + Joiner.on("-").join(rowPolicy.getTableId(), rowPolicy.getType(), + rowPolicy.getUser().getQualifiedUser()); + // merge wherePredicate + if (CompoundPredicate.Operator.AND.equals(rowPolicy.getFilterType().getOp())) { + RowPolicy frontPolicy = andMap.get(key); + if (frontPolicy == null) { + andMap.put(key, rowPolicy.clone()); + } else { + frontPolicy.setWherePredicate( new CompoundPredicate(CompoundPredicate.Operator.AND, frontPolicy.getWherePredicate(), - policy.getWherePredicate())); - andMap.put(key, frontPolicy.clone()); - } - } else { - Policy frontPolicy = orMap.get(key); - if (frontPolicy == null) { - orMap.put(key, policy.clone()); + rowPolicy.getWherePredicate())); + andMap.put(key, frontPolicy.clone()); + } } else { - frontPolicy.setWherePredicate( + RowPolicy frontPolicy = orMap.get(key); + if (frontPolicy == null) { + orMap.put(key, rowPolicy.clone()); + } else { + frontPolicy.setWherePredicate( new CompoundPredicate(CompoundPredicate.Operator.OR, frontPolicy.getWherePredicate(), - policy.getWherePredicate())); - orMap.put(key, frontPolicy.clone()); + rowPolicy.getWherePredicate())); + orMap.put(key, frontPolicy.clone()); + } } } - } - Map<String, Policy> mergeMap = new HashMap<>(); - Set<String> policyKeys = new HashSet<>(); - policyKeys.addAll(andMap.keySet()); - policyKeys.addAll(orMap.keySet()); - policyKeys.forEach(key -> { - if (andMap.containsKey(key) && orMap.containsKey(key)) { - Policy mergePolicy = andMap.get(key).clone(); - mergePolicy.setWherePredicate( + Map<String, RowPolicy> mergeMap = new HashMap<>(); + Set<String> policyKeys = new HashSet<>(); + policyKeys.addAll(andMap.keySet()); + policyKeys.addAll(orMap.keySet()); + policyKeys.forEach(key -> { + if (andMap.containsKey(key) && orMap.containsKey(key)) { + RowPolicy mergePolicy = andMap.get(key).clone(); + mergePolicy.setWherePredicate( new CompoundPredicate(CompoundPredicate.Operator.AND, mergePolicy.getWherePredicate(), - orMap.get(key).getWherePredicate())); - mergeMap.put(key, mergePolicy); - } - if (!andMap.containsKey(key)) { - mergeMap.put(key, orMap.get(key)); - } - if (!orMap.containsKey(key)) { - mergeMap.put(key, andMap.get(key)); - } - }); - dbIdToMergePolicyMap.put(dbId, mergeMap); + orMap.get(key).getWherePredicate())); + mergeMap.put(key, mergePolicy); + } + if (!andMap.containsKey(key)) { + mergeMap.put(key, orMap.get(key)); + } + if (!orMap.containsKey(key)) { + mergeMap.put(key, andMap.get(key)); + } + }); + long dbId = entry.getKey(); + dbIdToMergeTablePolicyMap.put(dbId, mergeMap); + } } finally { readUnlock(); } @@ -345,10 +328,8 @@ public class PolicyMgr implements Writable { public static PolicyMgr read(DataInput in) throws IOException { String json = Text.readString(in); PolicyMgr policyMgr = GsonUtils.GSON.fromJson(json, PolicyMgr.class); - // update merge policy cache - policyMgr.updateAllMergePolicyMap(); - // update user policy cache - policyMgr.updateAllUserPolicySet(); + // update merge policy cache and userPolicySet + policyMgr.updateMergeTablePolicyMap(); return policyMgr; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java index 483b8cd93b..bf82f7e0df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyTypeEnum.java @@ -22,5 +22,5 @@ package org.apache.doris.policy; **/ public enum PolicyTypeEnum { - ROW + ROW, STORAGE } diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java similarity index 52% copy from fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java copy to fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java index d617dda050..ab4b0a74c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java @@ -26,22 +26,15 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SqlParserUtils; -import org.apache.doris.persist.gson.GsonPostProcessable; -import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; -import lombok.AllArgsConstructor; import lombok.Data; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.io.StringReader; import java.util.List; @@ -50,41 +43,27 @@ import java.util.List; * Save policy for filtering data. **/ @Data -@AllArgsConstructor -public class Policy implements Writable, GsonPostProcessable { +public class RowPolicy extends Policy { - public static final String ROW_POLICY = "ROW"; + private static final Logger LOG = LogManager.getLogger(RowPolicy.class); - private static final Logger LOG = LogManager.getLogger(Policy.class); + /** + * Policy bind user. + **/ + @SerializedName(value = "user") + private UserIdentity user = null; @SerializedName(value = "dbId") - private long dbId; + private long dbId = -1; @SerializedName(value = "tableId") - private long tableId; - - @SerializedName(value = "policyName") - private String policyName; - - /** - * ROW. - **/ - @SerializedName(value = "type") - private PolicyTypeEnum type; + private long tableId = -1; /** * PERMISSIVE | RESTRICTIVE, If multiple types exist, the last type prevails. **/ @SerializedName(value = "filterType") - private final FilterType filterType; - - private Expr wherePredicate; - - /** - * Policy bind user. - **/ - @SerializedName(value = "user") - private final UserIdentity user; + private FilterType filterType = null; /** * Use for Serialization/deserialization. @@ -92,20 +71,32 @@ public class Policy implements Writable, GsonPostProcessable { @SerializedName(value = "originStmt") private String originStmt; + private Expr wherePredicate = null; + + public RowPolicy() {} + /** - * Trans stmt to Policy. - **/ - public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException { - String curDb = stmt.getTableName().getDb(); - if (curDb == null) { - curDb = ConnectContext.get().getDatabase(); - } - Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb); - Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl()); - UserIdentity userIdent = stmt.getUser(); - userIdent.analyze(ConnectContext.get().getClusterName()); - return new Policy(db.getId(), table.getId(), stmt.getPolicyName(), stmt.getType(), stmt.getFilterType(), - stmt.getWherePredicate(), userIdent, stmt.getOrigStmt().originStmt); + * Policy for Table. Policy of ROW or others. + * + * @param type PolicyType + * @param policyName policy name + * @param dbId database i + * @param user username + * @param originStmt origin stmt + * @param tableId table id + * @param filterType filter type + * @param wherePredicate where predicate + */ + public RowPolicy(final PolicyTypeEnum type, final String policyName, long dbId, + UserIdentity user, String originStmt, final long tableId, + final FilterType filterType, final Expr wherePredicate) { + super(type, policyName); + this.user = user; + this.dbId = dbId; + this.tableId = tableId; + this.filterType = filterType; + this.originStmt = originStmt; + this.wherePredicate = wherePredicate; } /** @@ -118,19 +109,6 @@ public class Policy implements Writable, GsonPostProcessable { this.filterType.name(), this.wherePredicate.toSql(), this.user.getQualifiedUser(), this.originStmt); } - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, GsonUtils.GSON.toJson(this)); - } - - /** - * Read policy from file. - **/ - public static Policy read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, Policy.class); - } - @Override public void gsonPostProcess() throws IOException { if (wherePredicate != null) { @@ -142,13 +120,44 @@ public class Policy implements Writable, GsonPostProcessable { CreatePolicyStmt stmt = (CreatePolicyStmt) SqlParserUtils.getFirstStmt(parser); wherePredicate = stmt.getWherePredicate(); } catch (Exception e) { - throw new IOException("policy parse originStmt error", e); + throw new IOException("table policy parse originStmt error", e); + } + } + + @Override + public RowPolicy clone() { + return new RowPolicy(this.type, this.policyName, this.dbId, this.user, this.originStmt, this.tableId, + this.filterType, this.wherePredicate); + } + + private boolean checkMatched(long dbId, long tableId, PolicyTypeEnum type, + String policyName, UserIdentity user) { + return super.checkMatched(type, policyName) + && (dbId == -1 || dbId == this.dbId) + && (tableId == -1 || tableId == this.tableId) + && (user == null || this.user == null + || StringUtils.equals(user.getQualifiedUser(), this.user.getQualifiedUser())); + } + + @Override + public boolean matchPolicy(Policy checkedPolicyCondition) { + if (!(checkedPolicyCondition instanceof RowPolicy)) { + return false; } + RowPolicy rowPolicy = (RowPolicy) checkedPolicyCondition; + return checkMatched(rowPolicy.getDbId(), rowPolicy.getTableId(), rowPolicy.getType(), + rowPolicy.getPolicyName(), rowPolicy.getUser()); + } + + @Override + public boolean matchPolicy(DropPolicyLog checkedDropPolicyLogCondition) { + return checkMatched(checkedDropPolicyLogCondition.getDbId(), checkedDropPolicyLogCondition.getTableId(), + checkedDropPolicyLogCondition.getType(), checkedDropPolicyLogCondition.getPolicyName(), + checkedDropPolicyLogCondition.getUser()); } @Override - public Policy clone() { - return new Policy(this.dbId, this.tableId, this.policyName, this.type, this.filterType, this.wherePredicate, - this.user, this.originStmt); + public boolean isInvalid() { + return (wherePredicate == null); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java index 6bbc977e71..5a9c0b905e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java @@ -18,6 +18,7 @@ package org.apache.doris.policy; import org.apache.doris.analysis.CreateUserStmt; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.ShowPolicyStmt; import org.apache.doris.analysis.TablePattern; @@ -36,6 +37,11 @@ import com.google.common.collect.Lists; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.List; /** @@ -199,4 +205,40 @@ public class PolicyTest extends TestWithFeService { dropPolicy("DROP ROW POLICY test_row_policy1 ON test.table1"); dropPolicy("DROP ROW POLICY test_row_policy2 ON test.table1"); } + + @Test + public void testReadWrite() throws IOException, AnalysisException { + PolicyTypeEnum type = PolicyTypeEnum.ROW; + String policyName = "policy_name"; + long dbId = 10; + UserIdentity user = new UserIdentity("test_policy", "%"); + String originStmt = "CREATE ROW POLICY test_row_policy ON test.table1" + + " AS PERMISSIVE TO test_policy USING (k1 = 1)"; + long tableId = 100; + FilterType filterType = FilterType.PERMISSIVE; + Expr wherePredicate = null; + + Policy rowPolicy = new RowPolicy(type, policyName, dbId, user, + originStmt, tableId, filterType, wherePredicate); + + ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(emptyOutputStream); + rowPolicy.write(output); + byte[] bytes = emptyOutputStream.toByteArray(); + System.out.println(emptyOutputStream.toString()); + DataInputStream input = new DataInputStream(new ByteArrayInputStream(bytes)); + + Policy newPolicy = Policy.read(input); + Assertions.assertTrue(newPolicy instanceof RowPolicy); + RowPolicy newRowPolicy = (RowPolicy) newPolicy; + Assertions.assertEquals(type, newRowPolicy.getType()); + Assertions.assertEquals(policyName, newRowPolicy.getPolicyName()); + Assertions.assertEquals(dbId, newRowPolicy.getDbId()); + user.analyze(SystemInfoService.DEFAULT_CLUSTER); + newRowPolicy.getUser().analyze(SystemInfoService.DEFAULT_CLUSTER); + Assertions.assertEquals(user.getQualifiedUser(), newRowPolicy.getUser().getQualifiedUser()); + Assertions.assertEquals(originStmt, newRowPolicy.getOriginStmt()); + Assertions.assertEquals(tableId, newRowPolicy.getTableId()); + Assertions.assertEquals(filterType, newRowPolicy.getFilterType()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org