Copilot commented on code in PR #60884:
URL: https://github.com/apache/doris/pull/60884#discussion_r2867119706


##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -4176,23 +4180,43 @@ public void setDebugSkipFoldConstant(boolean 
debugSkipFoldConstant) {
     }
 
     public int getParallelExecInstanceNum() {
-        ConnectContext connectContext = ConnectContext.get();
-        if (connectContext != null && connectContext.getEnv() != null && 
connectContext.getEnv().getAuth() != null) {
-            int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
-                    
.getParallelFragmentExecInstanceNum(connectContext.getQualifiedUser());
+        Env currentEnv = Env.getCurrentEnv();
+        if (!Strings.isNullOrEmpty(qualifiedUser)
+                && currentEnv != null
+                && currentEnv.getAuth() != null) {
+            int userParallelExecInstanceNum = currentEnv.getAuth()
+                    .getParallelFragmentExecInstanceNum(qualifiedUser);
             if (userParallelExecInstanceNum > 0) {
                 return userParallelExecInstanceNum;
             }
         }
         if (parallelPipelineTaskNum == 0) {
-            int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
+            int size = 
Env.getCurrentSystemInfo().getMinPipelineExecutorSize(resolveCloudClusterForAutoParallel());
             int autoInstance = (size + 1) / 2;
             return Math.min(autoInstance, maxInstanceNum);
         } else {
             return parallelPipelineTaskNum;
         }
     }
 
+    public void setQualifiedUser(String qualifiedUser) {
+        this.qualifiedUser = qualifiedUser;
+    }
+
+    private String resolveCloudClusterForAutoParallel() {
+        if (!Config.isCloudMode()) {
+            return "";
+        }
+        if (!Strings.isNullOrEmpty(cloudCluster)) {
+            return cloudCluster;
+        }
+        Env currentEnv = Env.getCurrentEnv();
+        if (currentEnv == null || currentEnv.getAuth() == null || 
Strings.isNullOrEmpty(qualifiedUser)) {
+            return "";
+        }
+        return 
Strings.nullToEmpty(currentEnv.getAuth().getDefaultCloudCluster(qualifiedUser));
+    }

Review Comment:
   getParallelExecInstanceNum() now depends on the new per-session transient 
qualifiedUser plus resolveCloudClusterForAutoParallel() logic, including 
behavior when ConnectContext thread-local is absent. There are no existing unit 
tests covering this method, so regressions here (auth override precedence, 
default-cluster resolution, empty-cluster fallback) are currently untested; 
please add focused tests for these new branches.



##########
fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java:
##########
@@ -958,21 +998,39 @@ public void 
testGetMinPipelineExecutorSizeWithConnectContext() {
 
         try {
             // Should return 2 (minimum from cluster1), not 16 (minimum from 
cluster2)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = getMinPipelineExecutorSizeByContext(infoService);
             Assert.assertEquals(2, result);
 
             // Now switch to cluster2
             ctx.setCloudCluster(cluster2Name);
 
             // Should return 16 (minimum from cluster2), not 2 (minimum from 
cluster1)
-            result = infoService.getMinPipelineExecutorSize();
+            result = getMinPipelineExecutorSizeByContext(infoService);
             Assert.assertEquals(16, result);
         } finally {
             // Clean up ConnectContext
             ConnectContext.remove();
         }
     }
 
+    /**
+     * Resolve min pipeline executor size by current ConnectContext-selected 
cluster.
+     * Keep test intent unchanged after removing no-arg 
getMinPipelineExecutorSize().
+     */
+    private int getMinPipelineExecutorSizeByContext(CloudSystemInfoService 
infoService) {
+        ConnectContext context = ConnectContext.get();
+        if (context == null) {
+            return infoService.getMinPipelineExecutorSize("");
+        }
+        String clusterName = "";
+        try {
+            clusterName = context.getCloudCluster(false);
+        } catch (Exception e) {
+            return 1;
+        }
+        return infoService.getMinPipelineExecutorSize(clusterName);

Review Comment:
   Helper getMinPipelineExecutorSizeByContext() catches a broad Exception and 
returns 1, which can mask unexpected failures (e.g., NPEs) in tests that assert 
the result is 1. Prefer catching the expected ComputeGroupException (or the 
specific checked exception thrown by getCloudCluster(false)) and let other 
exceptions fail the test.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java:
##########
@@ -658,6 +661,9 @@ public SessionVariable getSessionVariable() {
 
     public void setSessionVariable(SessionVariable sessionVariable) {
         this.sessionVariable = sessionVariable;
+        if (this.sessionVariable != null) {
+            this.sessionVariable.setQualifiedUser(getQualifiedUser());
+        }

Review Comment:
   PR description mentions "Added synchronization in ConnectContext", but the 
changes here only propagate qualifiedUser/cloudCluster into SessionVariable and 
do not introduce any synchronization. If synchronization is still required for 
correctness, it appears missing; otherwise please update the PR description to 
avoid confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to