Copilot commented on code in PR #60884:
URL: https://github.com/apache/doris/pull/60884#discussion_r2867119706
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -4176,23 +4180,43 @@ public void setDebugSkipFoldConstant(boolean
debugSkipFoldConstant) {
}
public int getParallelExecInstanceNum() {
- ConnectContext connectContext = ConnectContext.get();
- if (connectContext != null && connectContext.getEnv() != null &&
connectContext.getEnv().getAuth() != null) {
- int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
-
.getParallelFragmentExecInstanceNum(connectContext.getQualifiedUser());
+ Env currentEnv = Env.getCurrentEnv();
+ if (!Strings.isNullOrEmpty(qualifiedUser)
+ && currentEnv != null
+ && currentEnv.getAuth() != null) {
+ int userParallelExecInstanceNum = currentEnv.getAuth()
+ .getParallelFragmentExecInstanceNum(qualifiedUser);
if (userParallelExecInstanceNum > 0) {
return userParallelExecInstanceNum;
}
}
if (parallelPipelineTaskNum == 0) {
- int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
+ int size =
Env.getCurrentSystemInfo().getMinPipelineExecutorSize(resolveCloudClusterForAutoParallel());
int autoInstance = (size + 1) / 2;
return Math.min(autoInstance, maxInstanceNum);
} else {
return parallelPipelineTaskNum;
}
}
+ public void setQualifiedUser(String qualifiedUser) {
+ this.qualifiedUser = qualifiedUser;
+ }
+
+ private String resolveCloudClusterForAutoParallel() {
+ if (!Config.isCloudMode()) {
+ return "";
+ }
+ if (!Strings.isNullOrEmpty(cloudCluster)) {
+ return cloudCluster;
+ }
+ Env currentEnv = Env.getCurrentEnv();
+ if (currentEnv == null || currentEnv.getAuth() == null ||
Strings.isNullOrEmpty(qualifiedUser)) {
+ return "";
+ }
+ return
Strings.nullToEmpty(currentEnv.getAuth().getDefaultCloudCluster(qualifiedUser));
+ }
Review Comment:
getParallelExecInstanceNum() now depends on the new per-session transient
qualifiedUser plus resolveCloudClusterForAutoParallel() logic, including
behavior when ConnectContext thread-local is absent. There are no existing unit
tests covering this method, so regressions here (auth override precedence,
default-cluster resolution, empty-cluster fallback) are currently untested;
please add focused tests for these new branches.
##########
fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java:
##########
@@ -958,21 +998,39 @@ public void
testGetMinPipelineExecutorSizeWithConnectContext() {
try {
// Should return 2 (minimum from cluster1), not 16 (minimum from
cluster2)
- int result = infoService.getMinPipelineExecutorSize();
+ int result = getMinPipelineExecutorSizeByContext(infoService);
Assert.assertEquals(2, result);
// Now switch to cluster2
ctx.setCloudCluster(cluster2Name);
// Should return 16 (minimum from cluster2), not 2 (minimum from
cluster1)
- result = infoService.getMinPipelineExecutorSize();
+ result = getMinPipelineExecutorSizeByContext(infoService);
Assert.assertEquals(16, result);
} finally {
// Clean up ConnectContext
ConnectContext.remove();
}
}
+ /**
+ * Resolve min pipeline executor size by current ConnectContext-selected
cluster.
+ * Keep test intent unchanged after removing no-arg
getMinPipelineExecutorSize().
+ */
+ private int getMinPipelineExecutorSizeByContext(CloudSystemInfoService
infoService) {
+ ConnectContext context = ConnectContext.get();
+ if (context == null) {
+ return infoService.getMinPipelineExecutorSize("");
+ }
+ String clusterName = "";
+ try {
+ clusterName = context.getCloudCluster(false);
+ } catch (Exception e) {
+ return 1;
+ }
+ return infoService.getMinPipelineExecutorSize(clusterName);
Review Comment:
Helper getMinPipelineExecutorSizeByContext() catches a broad Exception and
returns 1, which can mask unexpected failures (e.g., NPEs) in tests that assert
the result is 1. Prefer catching the expected ComputeGroupException (or the
specific checked exception thrown by getCloudCluster(false)) and let other
exceptions fail the test.
##########
fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java:
##########
@@ -658,6 +661,9 @@ public SessionVariable getSessionVariable() {
public void setSessionVariable(SessionVariable sessionVariable) {
this.sessionVariable = sessionVariable;
+ if (this.sessionVariable != null) {
+ this.sessionVariable.setQualifiedUser(getQualifiedUser());
+ }
Review Comment:
PR description mentions "Added synchronization in ConnectContext", but the
changes here only propagate qualifiedUser/cloudCluster into SessionVariable and
do not introduce any synchronization. If synchronization is still required for
correctness, it appears missing; otherwise please update the PR description to
avoid confusion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]