This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9186b6213f5 [opt](query cache) Limit instance count for non-scan
fragments when query cache is enabled (#60878)
9186b6213f5 is described below
commit 9186b6213f51df83754d7b9a193fecd83014dc6f
Author: HappenLee <[email protected]>
AuthorDate: Tue Mar 3 16:45:27 2026 +0800
[opt](query cache) Limit instance count for non-scan fragments when query
cache is enabled (#60878)
When query cache is enabled, non-scan fragments could spawn a large
number of concurrent instances which increases resource contention and
reduces cache effectiveness. This change ensures non-scan fragments are
constrained to a reasonable number of instances when the query cache is
active.
### Key changes
- Enforce an instance-count limit for non-scan fragments when query
cache is enabled
- Apply the limit only to non-scan fragments so scan-heavy paths keep
their parallelism
- Reduce risk of cache thrashing and improve overall query stability
This message expands the original commit description for clarity and
rationale.
---
.../worker/job/UnassignedShuffleJob.java | 19 +-
.../worker/job/UnassignedShuffleJobTest.java | 487 +++++++++++++++++++++
2 files changed, 504 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
index 61324f3dfe8..b60282d20a0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+import org.apache.doris.catalog.Env;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
@@ -55,13 +56,15 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
useSerialSource = fragment.useSerialSource(
distributeContext.isLoadJob ? null :
statementContext.getConnectContext());
- int expectInstanceNum = degreeOfParallelism();
List<AssignedJob> biggestParallelChildFragment =
getInstancesOfBiggestParallelChildFragment(inputJobs);
+ int expectInstanceNum =
degreeOfParallelism(biggestParallelChildFragment.size(), inputJobs);
if (expectInstanceNum > 0 && expectInstanceNum <
biggestParallelChildFragment.size()) {
// When group by cardinality is smaller than number of backend,
only some backends always
// process while other has no data to process.
// So we shuffle instances to make different backends handle
different queries.
+ // Additional: when query cache limits instance count, the
shuffling still applies to
+ // spread the reduced set of instances across distinct workers to
avoid cache thrashing.
List<DistributedPlanWorker>
shuffleWorkersInBiggestParallelChildFragment
= distinctShuffleWorkers(biggestParallelChildFragment);
Function<Integer, DistributedPlanWorker> workerSelector =
instanceIndex -> {
@@ -79,7 +82,7 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
}
}
- protected int degreeOfParallelism() {
+ protected int degreeOfParallelism(int childInstanceNum,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
// TODO: check we use nested loop join do right outer / semi / anti
join,
// we should add an exchange node with gather distribute under
the nested loop join
int expectInstanceNum = -1;
@@ -87,6 +90,18 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
if (connectContext != null && connectContext.getSessionVariable() !=
null) {
expectInstanceNum =
connectContext.getSessionVariable().getExchangeInstanceParallel();
}
+ // If child fragment uses query cache, limit instance num to avoid too
many instances
+ if (childInstanceNum > 0 && connectContext != null) {
+ boolean childHasQueryCacheParam = inputJobs.values().stream()
+ .anyMatch(job ->
job.unassignedJob().getFragment().queryCacheParam != null);
+ if (childHasQueryCacheParam) {
+ int maxInstanceNum =
connectContext.getSessionVariable().getParallelExecInstanceNum()
+ * Env.getCurrentSystemInfo().getBackendsNumber(false);
+ expectInstanceNum = expectInstanceNum > 0
+ ? Math.min(expectInstanceNum,
Math.min(childInstanceNum, maxInstanceNum))
+ : Math.min(childInstanceNum, maxInstanceNum);
+ }
+ }
return expectInstanceNum;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJobTest.java
new file mode 100644
index 00000000000..403ddfc9e1a
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJobTest.java
@@ -0,0 +1,487 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TQueryCacheParam;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Unit tests for {@link UnassignedShuffleJob}, specifically the
+ * degreeOfParallelism logic that limits instance count when query cache is
enabled.
+ */
+public class UnassignedShuffleJobTest {
+
+ private ConnectContext connectContext;
+ private SessionVariable sessionVariable;
+ private StatementContext statementContext;
+ private PlanFragment fragment;
+ private MockedStatic<Env> envMockedStatic;
+ private MockedStatic<ConnectContext> connectContextMockedStatic;
+ private AtomicLong instanceIdCounter;
+
+ @BeforeEach
+ public void setUp() {
+ sessionVariable = Mockito.mock(SessionVariable.class);
+ connectContext = Mockito.mock(ConnectContext.class);
+
Mockito.when(connectContext.getSessionVariable()).thenReturn(sessionVariable);
+
+ // nextInstanceId() is called from buildInstances; provide unique IDs
+ instanceIdCounter = new AtomicLong(0);
+ Mockito.when(connectContext.nextInstanceId()).thenAnswer(
+ invocation -> new TUniqueId(0,
instanceIdCounter.incrementAndGet()));
+
+ // Mock static ConnectContext.get() for
SystemInfoService.getBackendsNumber
+ connectContextMockedStatic = Mockito.mockStatic(ConnectContext.class);
+
connectContextMockedStatic.when(ConnectContext::get).thenReturn(connectContext);
+
+ statementContext = Mockito.mock(StatementContext.class);
+
Mockito.when(statementContext.getConnectContext()).thenReturn(connectContext);
+
+ fragment = Mockito.mock(PlanFragment.class);
+
Mockito.when(fragment.useSerialSource(Mockito.any())).thenReturn(false);
+
+ // Mock Env.getCurrentSystemInfo()
+ SystemInfoService systemInfoService =
Mockito.mock(SystemInfoService.class);
+ Mockito.when(systemInfoService.getBackendsNumber(false)).thenReturn(3);
+
+ Env env = Mockito.mock(Env.class);
+ Mockito.when(env.getClusterInfo()).thenReturn(systemInfoService);
+
+ envMockedStatic = Mockito.mockStatic(Env.class);
+ envMockedStatic.when(Env::getCurrentEnv).thenReturn(env);
+
envMockedStatic.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (envMockedStatic != null) {
+ envMockedStatic.close();
+ }
+ if (connectContextMockedStatic != null) {
+ connectContextMockedStatic.close();
+ }
+ }
+
+ /**
+ * Helper: create a mock AssignedJob whose unassignedJob().getFragment()
has the given queryCacheParam.
+ */
+ private AssignedJob createMockAssignedJob(TQueryCacheParam
queryCacheParam, DistributedPlanWorker worker) {
+ PlanFragment childFragment = Mockito.mock(PlanFragment.class);
+ childFragment.queryCacheParam = queryCacheParam;
+
+ UnassignedJob childUnassignedJob = Mockito.mock(UnassignedJob.class);
+
Mockito.when(childUnassignedJob.getFragment()).thenReturn(childFragment);
+
+ AssignedJob assignedJob = Mockito.mock(AssignedJob.class);
+
Mockito.when(assignedJob.unassignedJob()).thenReturn(childUnassignedJob);
+ Mockito.when(assignedJob.getAssignedWorker()).thenReturn(worker);
+ return assignedJob;
+ }
+
+ private DistributedPlanWorker createMockWorker(long id) {
+ DistributedPlanWorker worker =
Mockito.mock(DistributedPlanWorker.class);
+ Mockito.when(worker.id()).thenReturn(id);
+ return worker;
+ }
+
+ private DistributeContext createDistributeContext(boolean isLoadJob) {
+ DistributedPlanWorkerManager workerManager =
Mockito.mock(DistributedPlanWorkerManager.class);
+ return new DistributeContext(workerManager, isLoadJob);
+ }
+
+ /**
+ * Create a mock UnassignedJob whose getAllChildrenTypes() returns an
empty BitSet,
+ * which is required by AbstractTreeNode constructor to avoid NPE.
+ */
+ private UnassignedJob createMockUnassignedJob() {
+ UnassignedJob mockJob = Mockito.mock(UnassignedJob.class);
+ Mockito.when(mockJob.getAllChildrenTypes()).thenReturn(new BitSet());
+ return mockJob;
+ }
+
+ // ======================== Tests for degreeOfParallelism
========================
+
+ /**
+ * Test: When no query cache is enabled (queryCacheParam is null),
degreeOfParallelism
+ * should return the exchangeInstanceParallel value (no limiting).
+ */
+ @Test
+ public void testDegreeOfParallelismWithoutQueryCache() {
+ // exchangeInstanceParallel = -1 means not set
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ // Build inputJobs with no query cache
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ DistributedPlanWorker worker = createMockWorker(1);
+
+ // Add 20 child instances with no query cache
+ for (int i = 0; i < 20; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(null, worker));
+ }
+
+ // Access protected method via a testable subclass
+ TestableUnassignedShuffleJob testableJob = new
TestableUnassignedShuffleJob(
+ statementContext, fragment, exchangeToChildJob);
+ int result = testableJob.testDegreeOfParallelism(20, inputJobs);
+
+ // Without query cache, should return -1 (not limited)
+ Assertions.assertEquals(-1, result);
+ }
+
+ /**
+ * Test: When query cache is enabled and exchangeInstanceParallel is not
set,
+ * degreeOfParallelism should limit to min(childInstanceNum,
parallelExecInstanceNum * backendNum).
+ */
+ @Test
+ public void testDegreeOfParallelismWithQueryCacheLimitsInstances() {
+ // exchangeInstanceParallel not set
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
+ // parallelExecInstanceNum = 8, backendNum = 3 => maxInstanceNum = 24
+
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ DistributedPlanWorker worker = createMockWorker(1);
+
+ // 100 child instances with query cache enabled
+ TQueryCacheParam cacheParam = new TQueryCacheParam();
+ for (int i = 0; i < 100; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(cacheParam,
worker));
+ }
+
+ TestableUnassignedShuffleJob testableJob = new
TestableUnassignedShuffleJob(
+ statementContext, fragment, exchangeToChildJob);
+ int result = testableJob.testDegreeOfParallelism(100, inputJobs);
+
+ // Should be min(100, 8 * 3) = 24
+ Assertions.assertEquals(24, result);
+ }
+
+ /**
+ * Test: When query cache is enabled and childInstanceNum is smaller than
maxInstanceNum,
+ * degreeOfParallelism should return childInstanceNum.
+ */
+ @Test
+ public void testDegreeOfParallelismWithQueryCacheChildSmallerThanMax() {
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
+ // parallelExecInstanceNum = 8, backendNum = 3 => maxInstanceNum = 24
+
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ DistributedPlanWorker worker = createMockWorker(1);
+
+ // Only 10 child instances (< 24), all with query cache
+ TQueryCacheParam cacheParam = new TQueryCacheParam();
+ for (int i = 0; i < 10; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(cacheParam,
worker));
+ }
+
+ TestableUnassignedShuffleJob testableJob = new
TestableUnassignedShuffleJob(
+ statementContext, fragment, exchangeToChildJob);
+ int result = testableJob.testDegreeOfParallelism(10, inputJobs);
+
+ // Should be min(10, 24) = 10
+ Assertions.assertEquals(10, result);
+ }
+
+ /**
+ * Test: When query cache is enabled AND exchangeInstanceParallel is set
to a small value,
+ * degreeOfParallelism should take the minimum of all three.
+ */
+ @Test
+ public void testDegreeOfParallelismWithQueryCacheAndExchangeParallel() {
+ // exchangeInstanceParallel = 5
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(5);
+ // parallelExecInstanceNum = 8, backendNum = 3 => maxInstanceNum = 24
+
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ DistributedPlanWorker worker = createMockWorker(1);
+
+ TQueryCacheParam cacheParam = new TQueryCacheParam();
+ for (int i = 0; i < 100; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(cacheParam,
worker));
+ }
+
+ TestableUnassignedShuffleJob testableJob = new
TestableUnassignedShuffleJob(
+ statementContext, fragment, exchangeToChildJob);
+ int result = testableJob.testDegreeOfParallelism(100, inputJobs);
+
+ // Should be min(5, min(100, 24)) = min(5, 24) = 5
+ Assertions.assertEquals(5, result);
+ }
+
+ /**
+ * Test: When query cache is enabled AND exchangeInstanceParallel is set
to a large value,
+ * the query cache limit should still apply.
+ */
+ @Test
+ public void
testDegreeOfParallelismWithQueryCacheAndLargeExchangeParallel() {
+ // exchangeInstanceParallel = 50 (larger than maxInstanceNum)
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(50);
+ // parallelExecInstanceNum = 4, backendNum = 3 => maxInstanceNum = 12
+
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(4);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ DistributedPlanWorker worker = createMockWorker(1);
+
+ TQueryCacheParam cacheParam = new TQueryCacheParam();
+ for (int i = 0; i < 100; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(cacheParam,
worker));
+ }
+
+ TestableUnassignedShuffleJob testableJob = new
TestableUnassignedShuffleJob(
+ statementContext, fragment, exchangeToChildJob);
+ int result = testableJob.testDegreeOfParallelism(100, inputJobs);
+
+ // Should be min(50, min(100, 12)) = min(50, 12) = 12
+ Assertions.assertEquals(12, result);
+ }
+
+ /**
+ * Test: Mixed input jobs - some with query cache, some without.
+ * If ANY child job has queryCacheParam, the limiting should apply.
+ */
+ @Test
+ public void testDegreeOfParallelismWithMixedQueryCacheJobs() {
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
+
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(8);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ DistributedPlanWorker worker = createMockWorker(1);
+
+ // Mix: some with cache, some without
+ TQueryCacheParam cacheParam = new TQueryCacheParam();
+ for (int i = 0; i < 50; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(cacheParam,
worker));
+ }
+ for (int i = 0; i < 50; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(null, worker));
+ }
+
+ TestableUnassignedShuffleJob testableJob = new
TestableUnassignedShuffleJob(
+ statementContext, fragment, exchangeToChildJob);
+ int result = testableJob.testDegreeOfParallelism(100, inputJobs);
+
+ // Query cache detected => min(100, 8*3) = 24
+ Assertions.assertEquals(24, result);
+ }
+
+ /**
+ * Test: When connectContext is null, degreeOfParallelism should return -1.
+ */
+ @Test
+ public void testDegreeOfParallelismWithNullConnectContext() {
+ StatementContext nullCtxStatement =
Mockito.mock(StatementContext.class);
+ Mockito.when(nullCtxStatement.getConnectContext()).thenReturn(null);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ TestableUnassignedShuffleJob testableJob = new
TestableUnassignedShuffleJob(
+ nullCtxStatement, fragment, exchangeToChildJob);
+
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ inputJobs.put(exchangeNode, createMockAssignedJob(new
TQueryCacheParam(), createMockWorker(1)));
+
+ int result = testableJob.testDegreeOfParallelism(1, inputJobs);
+ Assertions.assertEquals(-1, result);
+ }
+
+ // ======================== Tests for computeAssignedJobs
========================
+
+ /**
+ * Test: When query cache limits instances below the child fragment size,
+ * computeAssignedJobs should produce the limited number of instances.
+ * This exercises the if-branch where expectInstanceNum <
biggestParallelChildFragment.size().
+ */
+ @Test
+ public void testComputeAssignedJobsWithQueryCacheLimitsInstanceCount() {
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
+ // parallelExecInstanceNum = 2, backendNum = 3 => maxInstanceNum = 6
+
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(2);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ exchangeToChildJob.put(exchangeNode, createMockUnassignedJob());
+
+ UnassignedShuffleJob job = new UnassignedShuffleJob(statementContext,
fragment, exchangeToChildJob);
+
+ // Create 20 child assigned jobs with query cache
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ TQueryCacheParam cacheParam = new TQueryCacheParam();
+ for (int i = 0; i < 20; i++) {
+ DistributedPlanWorker worker = createMockWorker(i % 3 + 1);
+ inputJobs.put(exchangeNode, createMockAssignedJob(cacheParam,
worker));
+ }
+
+ DistributeContext distributeContext = createDistributeContext(false);
+
+ List<AssignedJob> result = job.computeAssignedJobs(distributeContext,
inputJobs);
+
+ // Should be limited to min(20, 2*3) = 6 instances
+ Assertions.assertEquals(6, result.size());
+ }
+
+ /**
+ * Test: When query cache is NOT enabled, computeAssignedJobs should keep
+ * the same instance count as the biggest parallel child fragment.
+ */
+ @Test
+ public void testComputeAssignedJobsWithoutQueryCacheKeepsChildCount() {
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ exchangeToChildJob.put(exchangeNode, createMockUnassignedJob());
+
+ UnassignedShuffleJob job = new UnassignedShuffleJob(statementContext,
fragment, exchangeToChildJob);
+
+ // Create 15 child assigned jobs WITHOUT query cache
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ for (int i = 0; i < 15; i++) {
+ DistributedPlanWorker worker = createMockWorker(i % 3 + 1);
+ inputJobs.put(exchangeNode, createMockAssignedJob(null, worker));
+ }
+
+ DistributeContext distributeContext = createDistributeContext(false);
+
+ List<AssignedJob> result = job.computeAssignedJobs(distributeContext,
inputJobs);
+
+ // Without query cache, degreeOfParallelism returns -1, so instance
count = child count = 15
+ Assertions.assertEquals(15, result.size());
+ }
+
+ /**
+ * Test: When query cache limits instances AND expectInstanceNum <
biggestParallelChildFragment.size(),
+ * the if-branch is taken (instances are shuffled to distinct workers).
+ */
+ @Test
+ public void testComputeAssignedJobsWithQueryCacheTakesIfBranch() {
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(-1);
+ // parallelExecInstanceNum = 1, backendNum = 3 => maxInstanceNum = 3
+
Mockito.when(sessionVariable.getParallelExecInstanceNum()).thenReturn(1);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ exchangeToChildJob.put(exchangeNode, createMockUnassignedJob());
+
+ UnassignedShuffleJob job = new UnassignedShuffleJob(statementContext,
fragment, exchangeToChildJob);
+
+ // Create 20 child assigned jobs with query cache, spread across 5
workers
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ TQueryCacheParam cacheParam = new TQueryCacheParam();
+ for (int i = 0; i < 20; i++) {
+ DistributedPlanWorker worker = createMockWorker(i % 5 + 1);
+ inputJobs.put(exchangeNode, createMockAssignedJob(cacheParam,
worker));
+ }
+
+ DistributeContext distributeContext = createDistributeContext(false);
+
+ List<AssignedJob> result = job.computeAssignedJobs(distributeContext,
inputJobs);
+
+ // Should be limited to min(20, 1*3) = 3 instances (< 20, so if-branch
is taken)
+ Assertions.assertEquals(3, result.size());
+ }
+
+ /**
+ * Regression test: When query cache is NOT enabled and
exchangeInstanceParallel is set to a value
+ * LARGER than the child fragment count, computeAssignedJobs must NOT
expand the instance count.
+ * Previously the else-branch used "expectInstanceNum > 0 ?
expectInstanceNum : childSize" which
+ * would wrongly inflate instances beyond the child count.
+ */
+ @Test
+ public void testComputeAssignedJobsNoQueryCacheDoesNotExpandInstances() {
+ // exchangeInstanceParallel = 100, much larger than the 5 child
instances
+
Mockito.when(sessionVariable.getExchangeInstanceParallel()).thenReturn(100);
+
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ exchangeToChildJob.put(exchangeNode, createMockUnassignedJob());
+
+ UnassignedShuffleJob job = new UnassignedShuffleJob(statementContext,
fragment, exchangeToChildJob);
+
+ // 5 child jobs, NO query cache
+ ListMultimap<ExchangeNode, AssignedJob> inputJobs =
ArrayListMultimap.create();
+ for (int i = 0; i < 5; i++) {
+ inputJobs.put(exchangeNode, createMockAssignedJob(null,
createMockWorker(i + 1)));
+ }
+
+ DistributeContext distributeContext = createDistributeContext(false);
+ List<AssignedJob> result = job.computeAssignedJobs(distributeContext,
inputJobs);
+
+ // Must stay at child count (5), NOT expand to 100
+ Assertions.assertEquals(5, result.size());
+ }
+
+ /**
+ * Testable subclass that exposes the protected degreeOfParallelism method.
+ */
+ private static class TestableUnassignedShuffleJob extends
UnassignedShuffleJob {
+ public TestableUnassignedShuffleJob(
+ StatementContext statementContext, PlanFragment fragment,
+ ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
+ super(statementContext, fragment, exchangeToChildJob);
+ }
+
+ public int testDegreeOfParallelism(
+ int childInstanceNum, ListMultimap<ExchangeNode, AssignedJob>
inputJobs) {
+ return degreeOfParallelism(childInstanceNum, inputJobs);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]