This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 71f96e6e06f [Improvement]downgrade resource tag when there is not 
queryable replica (#44255)
71f96e6e06f is described below

commit 71f96e6e06fc8ac0fc30545514c1d4d95ef205af
Author: wangbo <wan...@selectdb.com>
AuthorDate: Tue Dec 10 20:13:27 2024 +0800

    [Improvement]downgrade resource tag when there is not queryable replica 
(#44255)
---
 .../doris/datasource/FederationBackendPolicy.java  |  3 +
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  2 +
 .../java/org/apache/doris/mysql/MysqlProto.java    |  3 +-
 .../org/apache/doris/mysql/privilege/Auth.java     |  9 ++
 .../mysql/privilege/CommonUserProperties.java      | 11 +++
 .../apache/doris/mysql/privilege/UserProperty.java | 22 +++++
 .../doris/mysql/privilege/UserPropertyMgr.java     |  8 ++
 .../org/apache/doris/planner/OlapScanNode.java     | 47 +++++++++-
 .../java/org/apache/doris/qe/ConnectContext.java   |  8 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |  3 +-
 .../org/apache/doris/system/BeSelectionPolicy.java | 33 ++++++--
 .../apache/doris/planner/ResourceTagQueryTest.java | 12 ++-
 .../apache/doris/system/SystemInfoServiceTest.java | 23 +++++
 .../workload_manager_p0/skip_rg_test_table.csv     |  2 +
 .../workload_manager_p0/test_resource_tag.groovy   | 99 ++++++++++++++++++++++
 15 files changed, 269 insertions(+), 16 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 1e1787c1f64..4a24645bf3e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -155,6 +155,7 @@ public class FederationBackendPolicy {
 
     public void init(List<String> preLocations) throws UserException {
         Set<Tag> tags = Sets.newHashSet();
+        boolean allowResourceTagDowngrade = false;
         if (ConnectContext.get() != null && 
ConnectContext.get().getCurrentUserIdentity() != null) {
             String qualifiedUser = 
ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
             // Some request from stream load(eg, mysql load) may not set user 
info in ConnectContext
@@ -164,6 +165,7 @@ public class FederationBackendPolicy {
                 if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
                     throw new UserException("No valid resource tag for user: " 
+ qualifiedUser);
                 }
+                allowResourceTagDowngrade = 
Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser);
             }
         } else {
             if (LOG.isDebugEnabled()) {
@@ -176,6 +178,7 @@ public class FederationBackendPolicy {
                 .needQueryAvailable()
                 .needLoadAvailable()
                 .addTags(tags)
+                .setAllowResourceTagDowngrade(allowResourceTagDowngrade)
                 
.preferComputeNode(Config.prefer_compute_node_for_external_table)
                 .assignExpectBeNum(Config.min_backend_num_for_external_table)
                 .addPreLocations(preLocations)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 2f9efc1ed1b..00348b2d83a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -425,8 +425,10 @@ public class LoadAction extends RestBaseController {
         BeSelectionPolicy policy = null;
         String qualifiedUser = ConnectContext.get().getQualifiedUser();
         Set<Tag> userTags = 
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
+        boolean allowResourceTagDowngrade = 
Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser);
         policy = new BeSelectionPolicy.Builder()
                 .addTags(userTags)
+                .setAllowResourceTagDowngrade(allowResourceTagDowngrade)
                 .setEnableRoundRobin(true)
                 .needLoadAvailable().build();
         policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
index a672a217a33..c16cec5689a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
@@ -266,7 +266,8 @@ public class MysqlProto {
         }
 
         // set resource tag if has
-        
context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser));
+        
context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser),
+                
Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser));
         return true;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
index a1fb57d01ca..9d6f52d5a51 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
@@ -1234,6 +1234,15 @@ public class Auth implements Writable {
         }
     }
 
+    public boolean isAllowResourceTagDowngrade(String qualifiedUser) {
+        readLock();
+        try {
+            return propertyMgr.isAllowResourceTagDowngrade(qualifiedUser);
+        } finally {
+            readUnlock();
+        }
+    }
+
     public long getExecMemLimit(String qualifiedUser) {
         readLock();
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
index db838a91a56..bd2c3d02823 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java
@@ -69,6 +69,9 @@ public class CommonUserProperties implements Writable, 
GsonPostProcessable {
     @SerializedName(value = "wg", alternate = {"workloadGroup"})
     private String workloadGroup = WorkloadGroupMgr.DEFAULT_GROUP_NAME;
 
+    @SerializedName(value = "ard", alternate = {"AllowResourceTagDowngrade"})
+    private boolean allowResourceTagDowngrade = false;
+
     private String[] sqlBlockRulesSplit = {};
 
     long getMaxConn() {
@@ -164,6 +167,14 @@ public class CommonUserProperties implements Writable, 
GsonPostProcessable {
         this.workloadGroup = workloadGroup;
     }
 
+    public void setAllowResourceTagDowngrade(boolean 
allowResourceTagDowngrade) {
+        this.allowResourceTagDowngrade = allowResourceTagDowngrade;
+    }
+
+    public boolean isAllowResourceTagDowngrade() {
+        return this.allowResourceTagDowngrade;
+    }
+
     @Deprecated
     public static CommonUserProperties read(DataInput in) throws IOException {
         String json = Text.readString(in);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
index e4a76b23820..08f64cc006e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
@@ -86,6 +86,8 @@ public class UserProperty implements Writable {
 
     public static final String PROP_WORKLOAD_GROUP = "default_workload_group";
 
+    public static final String PROP_ALLOW_RESOURCE_TAG_DOWNGRADE = 
"allow_resource_tag_downgrade";
+
     public static final String DEFAULT_CLOUD_CLUSTER = "default_cloud_cluster";
     public static final String DEFAULT_COMPUTE_GROUP = "default_compute_group";
 
@@ -139,6 +141,8 @@ public class UserProperty implements Writable {
         ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_EXEC_MEM_LIMIT + 
"$", Pattern.CASE_INSENSITIVE));
         ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_QUERY_TIMEOUT 
+ "$", Pattern.CASE_INSENSITIVE));
         ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_INSERT_TIMEOUT 
+ "$", Pattern.CASE_INSENSITIVE));
+        ADVANCED_PROPERTIES.add(
+                Pattern.compile("^" + PROP_ALLOW_RESOURCE_TAG_DOWNGRADE + "$", 
Pattern.CASE_INSENSITIVE));
 
         COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", 
Pattern.CASE_INSENSITIVE));
         COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER 
+ "$", Pattern.CASE_INSENSITIVE));
@@ -201,6 +205,10 @@ public class UserProperty implements Writable {
         return Sets.newHashSet(this.commonProperties.getResourceTags());
     }
 
+    public boolean isAllowResourceTagDowngrade() {
+        return this.commonProperties.isAllowResourceTagDowngrade();
+    }
+
     public long getExecMemLimit() {
         return commonProperties.getExecMemLimit();
     }
@@ -221,6 +229,7 @@ public class UserProperty implements Writable {
         int queryTimeout = this.commonProperties.getQueryTimeout();
         int insertTimeout = this.commonProperties.getInsertTimeout();
         String workloadGroup = this.commonProperties.getWorkloadGroup();
+        boolean allowResourceTagDowngrade = 
this.commonProperties.isAllowResourceTagDowngrade();
 
         String newDefaultLoadCluster = defaultLoadCluster;
         String newDefaultCloudCluster = defaultCloudCluster;
@@ -358,6 +367,15 @@ public class UserProperty implements Writable {
                     throw new DdlException("workload group " + value + " not 
exists");
                 }
                 workloadGroup = value;
+            } else if 
(keyArr[0].equalsIgnoreCase(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE)) {
+                if (keyArr.length != 1) {
+                    throw new DdlException(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE + 
" format error");
+                }
+                if (!"true".equalsIgnoreCase(value) && 
!"false".equalsIgnoreCase(value)) {
+                    throw new DdlException(
+                            "allow_resource_tag_downgrade's value must be true 
or false");
+                }
+                allowResourceTagDowngrade = Boolean.parseBoolean(value);
             } else {
                 if (isReplay) {
                     // After using SET PROPERTY to modify the user property, 
if FE rolls back to a version without
@@ -381,6 +399,7 @@ public class UserProperty implements Writable {
         this.commonProperties.setQueryTimeout(queryTimeout);
         this.commonProperties.setInsertTimeout(insertTimeout);
         this.commonProperties.setWorkloadGroup(workloadGroup);
+        
this.commonProperties.setAllowResourceTagDowngrade(allowResourceTagDowngrade);
         if (newDppConfigs.containsKey(newDefaultLoadCluster)) {
             defaultLoadCluster = newDefaultLoadCluster;
         } else {
@@ -546,6 +565,9 @@ public class UserProperty implements Writable {
 
         result.add(Lists.newArrayList(PROP_WORKLOAD_GROUP, 
String.valueOf(commonProperties.getWorkloadGroup())));
 
+        result.add(Lists.newArrayList(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE,
+                
String.valueOf(commonProperties.isAllowResourceTagDowngrade())));
+
         // load cluster
         if (defaultLoadCluster != null) {
             result.add(Lists.newArrayList(PROP_DEFAULT_LOAD_CLUSTER, 
defaultLoadCluster));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
index 29ae1f438a1..b40bb92fbfa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
@@ -179,6 +179,14 @@ public class UserPropertyMgr implements Writable {
         return tags;
     }
 
+    public boolean isAllowResourceTagDowngrade(String qualifiedUser) {
+        UserProperty existProperty = propertyMap.get(qualifiedUser);
+        if (existProperty == null) {
+            return false;
+        }
+        return existProperty.isAllowResourceTagDowngrade();
+    }
+
     public Pair<String, DppConfig> getLoadClusterInfo(String qualifiedUser, 
String cluster) throws DdlException {
         Pair<String, DppConfig> loadClusterInfo = null;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 41c055062e3..92175523f22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -888,6 +888,21 @@ public class OlapScanNode extends ScanNode {
 
             int replicaInTablet = 0;
             long oneReplicaBytes = 0;
+
+
+            // when resource tag has no alive replica and 
allowResourceTagDowngrade = true,
+            // resource tag should be disabled, we should find at least one 
alive replica
+            boolean shouldSkipResourceTag = false;
+            boolean isAllowRgDowngrade = context.isAllowResourceTagDowngrade();
+            if (needCheckTags && isAllowRgDowngrade && 
!checkTagHasAvailReplica(allowedTags, replicas)) {
+                shouldSkipResourceTag = true;
+                if (ConnectContext.get() != null && LOG.isDebugEnabled()) {
+                    LOG.debug("query {} skip resource tag for table {}.",
+                            DebugUtil.printId(ConnectContext.get().queryId()),
+                            olapTable != null ? olapTable.getId() : -1);
+                }
+            }
+
             for (Replica replica : replicas) {
                 Backend backend = null;
                 long backendId = -1;
@@ -906,14 +921,15 @@ public class OlapScanNode extends ScanNode {
                                 replica.getId());
                     }
                     String err = "replica " + replica.getId() + "'s backend " 
+ backendId
-                            + " does not exist or not alive";
+                            + " with tag " + backend.getLocationTag() + " does 
not exist or not alive";
                     errs.add(err);
                     continue;
                 }
                 if (!backend.isMixNode()) {
                     continue;
                 }
-                if (needCheckTags && !allowedTags.isEmpty() && 
!allowedTags.contains(backend.getLocationTag())) {
+                if (!shouldSkipResourceTag && needCheckTags && 
!allowedTags.isEmpty() && !allowedTags.contains(
+                        backend.getLocationTag())) {
                     String err = String.format(
                             "Replica on backend %d with tag %s," + " which is 
not in user's resource tags: %s",
                             backend.getId(), backend.getLocationTag(), 
allowedTags);
@@ -954,6 +970,10 @@ public class OlapScanNode extends ScanNode {
                 throw new UserException("tablet " + tabletId + " err: " + 
Joiner.on(", ").join(errs));
             }
             if (tabletIsNull) {
+                if (needCheckTags && !isAllowRgDowngrade) {
+                    errs.add("If user specified tag has no queryable replica, "
+                            + "you can set property 
'allow_resource_tag_downgrade'='true' to skip resource tag.");
+                }
                 throw new UserException("tablet " + tabletId + " has no 
queryable replicas. err: "
                         + Joiner.on(", ").join(errs));
             }
@@ -974,6 +994,29 @@ public class OlapScanNode extends ScanNode {
         }
     }
 
+    private boolean checkTagHasAvailReplica(Set<Tag> allowedTags, 
List<Replica> replicas) {
+        try {
+            for (Replica replica : replicas) {
+                long backendId = replica.getBackendId();
+                Backend backend = 
Env.getCurrentSystemInfo().getBackend(backendId);
+
+                if (backend == null || !backend.isAlive()) {
+                    continue;
+                }
+                if (!backend.isMixNode()) {
+                    continue;
+                }
+                if (!allowedTags.isEmpty() && 
allowedTags.contains(backend.getLocationTag())) {
+                    return true;
+                }
+            }
+            return false;
+        } catch (Throwable t) {
+            LOG.warn("error happens when check resource tag has avail replica 
", t);
+            return true;
+        }
+    }
+
     private boolean isEnableCooldownReplicaAffinity() {
         ConnectContext connectContext = ConnectContext.get();
         if (connectContext != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c81cf4920e1..bfbb15a7a7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -203,6 +203,7 @@ public class ConnectContext {
     // This property is obtained from UserProperty when the client connection 
is created.
     // Only when the connection is created again, the new resource tags will 
be retrieved from the UserProperty
     private Set<Tag> resourceTags = Sets.newHashSet();
+    private boolean allowResourceTagDowngrade = false;
     // If set to true, the resource tags set in resourceTags will be used to 
limit the query resources.
     // If set to false, the system will not restrict query resources.
     private boolean isResourceTagsSet = false;
@@ -999,9 +1000,14 @@ public class ConnectContext {
         return resourceTags;
     }
 
-    public void setResourceTags(Set<Tag> resourceTags) {
+    public boolean isAllowResourceTagDowngrade() {
+        return allowResourceTagDowngrade;
+    }
+
+    public void setResourceTags(Set<Tag> resourceTags, boolean 
allowResourceTagDowngrade) {
         this.resourceTags = resourceTags;
         this.isResourceTagsSet = !this.resourceTags.isEmpty();
+        this.allowResourceTagDowngrade = allowResourceTagDowngrade;
     }
 
     public void setCurrentConnectedFEIp(String ip) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index fcc6c2362cf..0c633186abf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -663,7 +663,8 @@ public abstract class ConnectProcessor {
         }
 
         // set resource tag
-        
ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser));
+        
ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser),
+                
Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(ctx.qualifiedUser));
 
         ctx.setThreadLocalInfo();
         StmtExecutor executor = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
index 59cb4f19139..dbe16853bfd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -44,6 +44,7 @@ public class BeSelectionPolicy {
     public boolean needLoadAvailable = false;
     // Resource tag. Empty means no need to consider resource tag.
     public Set<Tag> resourceTags = Sets.newHashSet();
+    public boolean allowResourceTagDowngrade = false;
     // storage medium. null means no need to consider storage medium.
     public TStorageMedium storageMedium = null;
     // Check if disk usage reaches limit. false means no need to check.
@@ -92,6 +93,11 @@ public class BeSelectionPolicy {
             return this;
         }
 
+        public Builder setAllowResourceTagDowngrade(boolean 
allowResourceTagDowngrade) {
+            policy.allowResourceTagDowngrade = allowResourceTagDowngrade;
+            return this;
+        }
+
         public Builder setStorageMedium(TStorageMedium medium) {
             policy.storageMedium = medium;
             return this;
@@ -137,7 +143,7 @@ public class BeSelectionPolicy {
         }
     }
 
-    private boolean isMatch(Backend backend) {
+    private boolean isMatch(Backend backend, boolean needResourceTagAvail) {
         // Compute node is only used when preferComputeNode is set.
         if (!preferComputeNode && backend.isComputeNode()) {
             if (LOG.isDebugEnabled()) {
@@ -146,10 +152,11 @@ public class BeSelectionPolicy {
             return false;
         }
 
-        if (needScheduleAvailable && !backend.isScheduleAvailable() || 
needQueryAvailable && !backend.isQueryAvailable()
-                || needLoadAvailable && !backend.isLoadAvailable() || 
!resourceTags.isEmpty() && !resourceTags.contains(
-                backend.getLocationTag()) || storageMedium != null && 
!backend.hasSpecifiedStorageMedium(
-                storageMedium)) {
+        if (needScheduleAvailable && !backend.isScheduleAvailable()
+                || needQueryAvailable && !backend.isQueryAvailable()
+                || needLoadAvailable && !backend.isLoadAvailable()
+                || (needResourceTagAvail && !resourceTags.isEmpty() && 
!resourceTags.contains(backend.getLocationTag()))
+                || storageMedium != null && 
!backend.hasSpecifiedStorageMedium(storageMedium)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Backend [{}] is not match by Other rules, policy: 
[{}]", backend.getHost(), this);
             }
@@ -176,7 +183,16 @@ public class BeSelectionPolicy {
     }
 
     public List<Backend> getCandidateBackends(Collection<Backend> backends) {
-        List<Backend> filterBackends = 
backends.stream().filter(this::isMatch).collect(Collectors.toList());
+        boolean needResourceTagAvail = !this.allowResourceTagDowngrade || 
backends.stream()
+                .filter(backend -> 
resourceTags.contains(backend.getLocationTag()) && backend.isAlive())
+                .count() != 0;
+        List<Backend> filterBackends = new ArrayList<>();
+        for (Backend be : backends) {
+            if (this.isMatch(be, needResourceTagAvail)) {
+                filterBackends.add(be);
+            }
+        }
+
         List<Backend> preLocationFilterBackends = filterBackends.stream()
                 .filter(iterm -> 
preferredLocations.contains(iterm.getHost())).collect(Collectors.toList());
         // If preLocations were chosen, use the preLocation backends. 
Otherwise we just ignore this filter.
@@ -221,8 +237,9 @@ public class BeSelectionPolicy {
 
     @Override
     public String toString() {
-        return String.format("computeNode=%s | query=%s | load=%s | 
schedule=%s | tags=%s | medium=%s",
+        return String.format("computeNode=%s | query=%s | load=%s | 
schedule=%s | tags=%s(%s) | medium=%s",
                 preferComputeNode, needQueryAvailable, needLoadAvailable, 
needScheduleAvailable,
-                resourceTags.stream().map(tag -> 
tag.toString()).collect(Collectors.joining(",")), storageMedium);
+                resourceTags.stream().map(tag -> 
tag.toString()).collect(Collectors.joining(",")),
+                this.allowResourceTagDowngrade, storageMedium);
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
index 850d8b27b06..9568841134a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
@@ -205,7 +205,7 @@ public class ResourceTagQueryTest {
         Assert.assertEquals(1, userTags.size());
 
         // update connection context and query
-        connectContext.setResourceTags(userTags);
+        connectContext.setResourceTags(userTags, false);
         String queryStr = "explain select * from test.tbl1";
         String explainString = 
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
         System.out.println(explainString);
@@ -221,7 +221,7 @@ public class ResourceTagQueryTest {
         }
 
         // update connection context and query, it will failed because no 
zone1 backend
-        connectContext.setResourceTags(userTags);
+        connectContext.setResourceTags(userTags, false);
         Assert.assertTrue(connectContext.isResourceTagsSet());
         queryStr = "explain select * from test.tbl1";
         String error = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
@@ -280,7 +280,13 @@ public class ResourceTagQueryTest {
         Assert.assertEquals(1000000, execMemLimit);
 
         List<List<String>> userProps = 
Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER);
-        Assert.assertEquals(13, userProps.size());
+        Assert.assertEquals(14, userProps.size());
+
+        // set resource tag downgrade
+        String setResourceTagDownStr = "set property for 'root' 
'allow_resource_tag_downgrade' = 'false';";
+        ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setResourceTagDownStr));
+        boolean tagDowngrade = 
Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(Auth.ROOT_USER);
+        Assert.assertTrue(!tagDowngrade);
 
         // now :
         // be1 be2 be3 ==>tag1;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 62ade50c919..f582c37706c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -333,6 +333,29 @@ public class SystemInfoServiceTest {
         Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07, 
3).size());
     }
 
+    @Test
+    public void testResourceTagDowngrade() throws Exception {
+        Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
+        addBackend(10001, "192.168.1.1", 9050);
+        Backend be1 = infoService.getBackend(10001);
+        be1.setTagMap(taga.toMap());
+        be1.setAlive(true);
+
+        addBackend(10002, "192.168.1.2", 9050);
+        Backend be2 = infoService.getBackend(10002);
+        be2.setAlive(true);
+
+        BeSelectionPolicy policy1 = new 
BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy1, 
1).size());
+
+        be1.setAlive(false);
+        Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy1, 
1).size());
+
+        BeSelectionPolicy policy2 = new 
BeSelectionPolicy.Builder().setAllowResourceTagDowngrade(true)
+                .addTags(Sets.newHashSet(taga)).build();
+        Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy2, 
1).size());
+    }
+
     @Test
     public void testPreferLocationsSelect() throws Exception {
         Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
diff --git a/regression-test/data/workload_manager_p0/skip_rg_test_table.csv 
b/regression-test/data/workload_manager_p0/skip_rg_test_table.csv
new file mode 100644
index 00000000000..edcc00b603a
--- /dev/null
+++ b/regression-test/data/workload_manager_p0/skip_rg_test_table.csv
@@ -0,0 +1,2 @@
+1|2
+3|4
\ No newline at end of file
diff --git 
a/regression-test/suites/workload_manager_p0/test_resource_tag.groovy 
b/regression-test/suites/workload_manager_p0/test_resource_tag.groovy
new file mode 100644
index 00000000000..d6557645eb8
--- /dev/null
+++ b/regression-test/suites/workload_manager_p0/test_resource_tag.groovy
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_resource_tag") {
+        sql "drop user if exists test_rg;"
+        sql "create user test_rg"
+        sql "GRANT SELECT_PRIV,LOAD_PRIV,ALTER_PRIV,CREATE_PRIV,DROP_PRIV ON 
*.*.* TO test_rg;"
+        sql "set property for test_rg 'resource_tags.location' = 'c3p0';"
+        //cloud-mode
+        if (isCloudMode()) {
+                def clusters = sql " SHOW CLUSTERS; "
+                assertTrue(!clusters.isEmpty())
+                def validCluster = clusters[0][0]
+                sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO 
test_rg""";
+        }
+
+        // test query
+        connect(user = 'test_rg', password = '', url = context.config.jdbcUrl) 
{
+                sql "drop table if exists test_skip_rg_bad_replica_tab;"
+                sql """
+            CREATE TABLE test_skip_rg_bad_replica_tab
+                (
+                    k1 int,
+                    k2 int,
+                )ENGINE=OLAP
+                duplicate KEY(k1)
+                DISTRIBUTED BY HASH (k1) BUCKETS 1
+                PROPERTIES(
+                'replication_allocation' = 'tag.location.default: 1'
+                );
+            """
+                sql """
+        insert into test_skip_rg_bad_replica_tab values
+        (9,10),
+        (1,2)
+        """
+                test {
+                        sql "select count(1) as t1 from 
test_skip_rg_bad_replica_tab;"
+                        exception "which is not in user's resource tags: 
[{\"location\" : \"c3p0\"}], If user specified tag has no queryable replica, 
you can set property 'allow_resource_tag_downgrade'='true' to skip resource 
tag."
+                }
+        }
+        sql "set property for test_rg 'allow_resource_tag_downgrade' = 'true';"
+
+        connect(user = 'test_rg', password = '', url = context.config.jdbcUrl) 
{
+                sql "select count(1) as t2 from test_skip_rg_bad_replica_tab;"
+                sql "drop table test_skip_rg_bad_replica_tab";
+        }
+
+
+        // test stream load
+        sql "set property for test_rg 'allow_resource_tag_downgrade' = 
'false';"
+        sql """ DROP TABLE IF EXISTS 
${context.config.defaultDb}.skip_rg_test_table """
+        sql """
+            CREATE TABLE IF NOT EXISTS 
${context.config.defaultDb}.skip_rg_test_table (
+                `k1` int NULL,
+                `k2` int NULL
+            ) ENGINE=OLAP
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        def test_failed_command = "curl --location-trusted -u test_rg: -H 
column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2  -T 
${context.dataPath}/skip_rg_test_table.csv 
http://${context.config.feHttpAddress}/api/${context.config.defaultDb}/skip_rg_test_table/_stream_load";
+        log.info("stream load skip_rg_test_table failed test cmd: 
${test_failed_command}")
+        def process = test_failed_command.execute()
+        code1 = process.waitFor()
+        out1 = process.text
+        log.info("stream load skip_rg_test_table failed test result, 
${out1}".toString())
+        assertTrue("${out1}".toString().contains("No backend load available") 
|| "${out1}".toString().contains("No available backends"))
+
+        sql "set property for test_rg 'allow_resource_tag_downgrade' = 'true';"
+
+        def test_succ_command = "curl --location-trusted -u test_rg: -H 
column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2  -T 
${context.dataPath}/skip_rg_test_table.csv 
http://${context.config.feHttpAddress}/api/${context.config.defaultDb}/skip_rg_test_table/_stream_load";
+        def process2 = test_succ_command.execute()
+        code2 = process2.waitFor()
+        out2 = process2.text
+        jsonRet = parseJson(out2)
+        log.info("stream load skip_rg_test_table succ test result, 
${out2}".toString())
+        assertFalse("${out2}".toString().contains("No backend load available"))
+        assertTrue(jsonRet['Status'] == 'Success')
+
+
+        // clear
+        sql "drop user test_rg"
+        sql "drop table ${context.config.defaultDb}.skip_rg_test_table"
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to