This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cb39671a73 [fix](policy) Add readlock for show policy (#13497)
cb39671a73 is described below
commit cb39671a73364e07721c3c95789494370ced058a
Author: Stalary <[email protected]>
AuthorDate: Tue Oct 25 21:42:40 2022 +0800
[fix](policy) Add readlock for show policy (#13497)
Add readlock for show policy resolve ConcurrentModificationException
---
.../org/apache/doris/analysis/AlterPolicyStmt.java | 6 +-
.../java/org/apache/doris/catalog/S3Resource.java | 2 +-
.../java/org/apache/doris/policy/PolicyMgr.java | 120 ++++++++++++++-------
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
4 files changed, 88 insertions(+), 42 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java
index ae3fb503e4..b058846ba0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java
@@ -37,6 +37,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+/**
+ * Alter policy
+ **/
@Data
public class AlterPolicyStmt extends DdlStmt {
private final String policyName;
@@ -65,7 +68,8 @@ public class AlterPolicyStmt extends DdlStmt {
}
// check resource existence
- List<Policy> policiesByType =
Env.getCurrentEnv().getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
+ List<Policy> policiesByType = Env.getCurrentEnv().getPolicyMgr()
+ .getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
Optional<Policy> hasPolicy = policiesByType.stream()
.filter(policy ->
policy.getPolicyName().equals(this.policyName)).findAny();
StoragePolicy storagePolicy = (StoragePolicy) hasPolicy.orElseThrow(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
index e3178bafd9..aa303cfaaa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
@@ -209,7 +209,7 @@ public class S3Resource extends Resource {
this.usedByPolicySet.forEach(
policy -> {
List<Policy> policiesByType =
Env.getCurrentEnv().getPolicyMgr()
- .getPoliciesByType(PolicyTypeEnum.STORAGE);
+
.getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
Optional<Policy> findPolicy = policiesByType.stream()
.filter(p -> p.getType() ==
PolicyTypeEnum.STORAGE && policy.equals(p.getPolicyName()))
.findAny();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
index e360b98cc4..0e096a8b30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
@@ -34,6 +34,7 @@ import org.apache.doris.qe.ShowResultSet;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -89,6 +90,9 @@ public class PolicyMgr implements Writable {
lock.readLock().unlock();
}
+ /**
+ * Create default storage policy used by master.
+ **/
public void createDefaultStoragePolicy() {
writeLock();
try {
@@ -164,8 +168,13 @@ public class PolicyMgr implements Writable {
* @return exist or not
*/
public boolean existPolicy(Policy checkedPolicy) {
- List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
- return policies.stream().anyMatch(policy ->
policy.matchPolicy(checkedPolicy));
+ readLock();
+ try {
+ List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
+ return policies.stream().anyMatch(policy ->
policy.matchPolicy(checkedPolicy));
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -175,8 +184,13 @@ public class PolicyMgr implements Writable {
* @return exist or not
*/
private boolean existPolicy(DropPolicyLog checkedDropPolicy) {
- List<Policy> policies = getPoliciesByType(checkedDropPolicy.getType());
- return policies.stream().anyMatch(policy ->
policy.matchPolicy(checkedDropPolicy));
+ readLock();
+ try {
+ List<Policy> policies =
getPoliciesByType(checkedDropPolicy.getType());
+ return policies.stream().anyMatch(policy ->
policy.matchPolicy(checkedDropPolicy));
+ } finally {
+ readUnlock();
+ }
}
/**
@@ -186,16 +200,30 @@ public class PolicyMgr implements Writable {
* @return Policy in typeToPolicyMap
*/
public Policy getPolicy(Policy checkedPolicy) {
- List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
- for (Policy policy : policies) {
- if (policy.matchPolicy(checkedPolicy)) {
- return policy;
+ readLock();
+ try {
+ List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
+ for (Policy policy : policies) {
+ if (policy.matchPolicy(checkedPolicy)) {
+ return policy;
+ }
}
+ return null;
+ } finally {
+ readUnlock();
+ }
+ }
+
+ public List<Policy> getCopiedPoliciesByType(PolicyTypeEnum policyType) {
+ readLock();
+ try {
+ return ImmutableList.copyOf(getPoliciesByType(policyType));
+ } finally {
+ readUnlock();
}
- return null;
}
- public List<Policy> getPoliciesByType(PolicyTypeEnum policyType) {
+ private List<Policy> getPoliciesByType(PolicyTypeEnum policyType) {
if (typeToPolicyMap == null) {
return new ArrayList<>();
}
@@ -279,21 +307,26 @@ public class PolicyMgr implements Writable {
checkedPolicy = rowPolicy;
}
final Policy finalCheckedPolicy = checkedPolicy;
- List<Policy> policies =
typeToPolicyMap.getOrDefault(showStmt.getType(), new ArrayList<>()).stream()
- .filter(p ->
p.matchPolicy(finalCheckedPolicy)).collect(Collectors.toList());
- for (Policy policy : policies) {
- if (policy.isInvalid()) {
- continue;
- }
+ readLock();
+ try {
+ List<Policy> policies =
getPoliciesByType(showStmt.getType()).stream()
+ .filter(p ->
p.matchPolicy(finalCheckedPolicy)).collect(Collectors.toList());
+ for (Policy policy : policies) {
+ if (policy.isInvalid()) {
+ continue;
+ }
- if (policy instanceof StoragePolicy && ((StoragePolicy)
policy).getStorageResource() == null) {
- // default storage policy not init.
- continue;
- }
+ if (policy instanceof StoragePolicy && ((StoragePolicy)
policy).getStorageResource() == null) {
+ // default storage policy not init.
+ continue;
+ }
- rows.add(policy.getShowInfo());
+ rows.add(policy.getShowInfo());
+ }
+ return new ShowResultSet(showStmt.getMetaData(), rows);
+ } finally {
+ readUnlock();
}
- return new ShowResultSet(showStmt.getMetaData(), rows);
}
/**
@@ -329,18 +362,16 @@ public class PolicyMgr implements Writable {
for (RowPolicy rowPolicy : policies) {
// read from json, need set isAnalyzed
rowPolicy.getUser().setIsAnalyzed();
- String key =
- Joiner.on("-").join(rowPolicy.getTableId(),
rowPolicy.getType(),
- rowPolicy.getUser().getQualifiedUser());
+ String key = Joiner.on("-")
+ .join(rowPolicy.getTableId(), rowPolicy.getType(),
rowPolicy.getUser().getQualifiedUser());
// merge wherePredicate
if
(CompoundPredicate.Operator.AND.equals(rowPolicy.getFilterType().getOp())) {
RowPolicy frontPolicy = andMap.get(key);
if (frontPolicy == null) {
andMap.put(key, rowPolicy.clone());
} else {
- frontPolicy.setWherePredicate(
- new
CompoundPredicate(CompoundPredicate.Operator.AND,
frontPolicy.getWherePredicate(),
- rowPolicy.getWherePredicate()));
+ frontPolicy.setWherePredicate(new
CompoundPredicate(CompoundPredicate.Operator.AND,
+ frontPolicy.getWherePredicate(),
rowPolicy.getWherePredicate()));
andMap.put(key, frontPolicy.clone());
}
} else {
@@ -348,9 +379,8 @@ public class PolicyMgr implements Writable {
if (frontPolicy == null) {
orMap.put(key, rowPolicy.clone());
} else {
- frontPolicy.setWherePredicate(
- new
CompoundPredicate(CompoundPredicate.Operator.OR,
frontPolicy.getWherePredicate(),
- rowPolicy.getWherePredicate()));
+ frontPolicy.setWherePredicate(new
CompoundPredicate(CompoundPredicate.Operator.OR,
+ frontPolicy.getWherePredicate(),
rowPolicy.getWherePredicate()));
orMap.put(key, frontPolicy.clone());
}
}
@@ -363,8 +393,8 @@ public class PolicyMgr implements Writable {
if (andMap.containsKey(key) && orMap.containsKey(key)) {
RowPolicy mergePolicy = andMap.get(key).clone();
mergePolicy.setWherePredicate(
- new
CompoundPredicate(CompoundPredicate.Operator.AND,
mergePolicy.getWherePredicate(),
- orMap.get(key).getWherePredicate()));
+ new
CompoundPredicate(CompoundPredicate.Operator.AND,
mergePolicy.getWherePredicate(),
+ orMap.get(key).getWherePredicate()));
mergeMap.put(key, mergePolicy);
}
if (!andMap.containsKey(key)) {
@@ -398,12 +428,22 @@ public class PolicyMgr implements Writable {
return policyMgr;
}
- public Optional<Policy> findPolicy(final String storagePolicyName,
PolicyTypeEnum policyType) {
- List<Policy> policiesByType = getPoliciesByType(policyType);
- return policiesByType.stream()
- .filter(policy ->
policy.getPolicyName().equals(storagePolicyName)).findAny();
+ /**
+ * Find policy by policy name and type
+ **/
+ public Optional<Policy> findPolicy(final String policyName, PolicyTypeEnum
policyType) {
+ readLock();
+ try {
+ List<Policy> policiesByType = getPoliciesByType(policyType);
+ return policiesByType.stream().filter(policy ->
policy.getPolicyName().equals(policyName)).findAny();
+ } finally {
+ readUnlock();
+ }
}
+ /**
+ * Alter policy by stmt.
+ **/
public void alterPolicy(AlterPolicyStmt stmt) throws DdlException,
AnalysisException {
String storagePolicyName = stmt.getPolicyName();
Map<String, String> properties = stmt.getProperties();
@@ -414,8 +454,7 @@ public class PolicyMgr implements Writable {
Optional<Policy> policy = findPolicy(storagePolicyName,
PolicyTypeEnum.STORAGE);
StoragePolicy storagePolicy = (StoragePolicy) policy.orElseThrow(
- () -> new DdlException("Storage policy(" + storagePolicyName +
") dose not exist.")
- );
+ () -> new DdlException("Storage policy(" + storagePolicyName +
") dose not exist."));
storagePolicy.modifyProperties(properties);
// log alter
@@ -423,6 +462,9 @@ public class PolicyMgr implements Writable {
LOG.info("Alter storage policy success. policy: {}", storagePolicy);
}
+ /**
+ * Check storage policy whether exist by policy name.
+ **/
public void checkStoragePolicyExist(String storagePolicyName) throws
DdlException {
if (Strings.isNullOrEmpty(storagePolicyName)) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b7e5aee6f9..8f6641ccb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1010,7 +1010,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
- List<Policy> policyList =
Env.getCurrentEnv().getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
+ List<Policy> policyList =
Env.getCurrentEnv().getPolicyMgr().getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
policyList.stream().filter(p -> p instanceof StoragePolicy).map(p ->
(StoragePolicy) p).forEach(
iter -> {
// default policy not init.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]