This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 1126aec3b6a branch-4.1: [Fix](query cache) support partition-based 
instance parallelism #60974 (#61438)
1126aec3b6a is described below

commit 1126aec3b6afd6e80262ef15c8eb686f48c5bf1e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Mar 25 14:09:32 2026 +0800

    branch-4.1: [Fix](query cache) support partition-based instance parallelism 
#60974 (#61438)
    
    Cherry-picked from #60974
    
    Co-authored-by: 924060929 <[email protected]>
---
 .../job/UnassignedScanSingleOlapTableJob.java      | 141 ++++++++
 .../job/UnassignedScanSingleOlapTableJobTest.java  | 380 +++++++++++++++++++++
 .../cache/test_partition_instance_query_cache.out  |   6 +
 .../test_partition_instance_query_cache.groovy     | 152 +++++++++
 4 files changed, 679 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
index 649e2fa9bb2..fa72f8c0105 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJob.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
@@ -25,16 +28,27 @@ import 
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector
 import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TScanRangeParams;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ListMultimap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /** UnassignedScanSingleOlapTableJob */
 public class UnassignedScanSingleOlapTableJob extends 
AbstractUnassignedScanJob {
+    private static final Logger LOG = 
LogManager.getLogger(UnassignedScanSingleOlapTableJob.class);
+
     private OlapScanNode olapScanNode;
     private final ScanWorkerSelector scanWorkerSelector;
 
@@ -81,9 +95,136 @@ public class UnassignedScanSingleOlapTableJob extends 
AbstractUnassignedScanJob
         //        instance 5: olapScanNode1: ScanRanges([tablet_10007])
         //    ],
         // }
+        if (usePartitionParallelismForQueryCache(workerToScanRanges, 
distributeContext)) {
+            try {
+                // Best effort optimization for query cache: keep tablets in 
same partition
+                // on the same instance to reduce BE concurrency pressure.
+                List<AssignedJob> partitionInstances = 
insideMachineParallelizationByPartition(workerToScanRanges);
+                if (partitionInstances != null) {
+                    return partitionInstances;
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to assign query cache instances by partition, 
fallback to default planning",
+                        e);
+            }
+        }
+
         return super.insideMachineParallelization(workerToScanRanges, 
inputJobs, distributeContext);
     }
 
+    private List<AssignedJob> insideMachineParallelizationByPartition(
+            Map<DistributedPlanWorker, UninstancedScanSource> 
workerToScanRanges) {
+        List<Long> selectedPartitionIds = new 
ArrayList<>(olapScanNode.getSelectedPartitionIds());
+        Map<Long, Long> tabletToPartitionId = 
buildTabletToPartitionId(selectedPartitionIds);
+        if (tabletToPartitionId.size() != 
olapScanNode.getScanTabletIds().size()) {
+            return null;
+        }
+
+        ConnectContext context = statementContext.getConnectContext();
+        List<AssignedJob> instances = new ArrayList<>();
+        for (Map.Entry<DistributedPlanWorker, UninstancedScanSource> entry : 
workerToScanRanges.entrySet()) {
+            DistributedPlanWorker worker = entry.getKey();
+            ScanSource scanSource = entry.getValue().scanSource;
+            if (!(scanSource instanceof DefaultScanSource)) {
+                return null;
+            }
+
+            DefaultScanSource defaultScanSource = (DefaultScanSource) 
scanSource;
+            ScanRanges scanRanges = 
defaultScanSource.scanNodeToScanRanges.get(olapScanNode);
+            if (scanRanges == null) {
+                return null;
+            }
+            if (scanRanges.params.isEmpty()) {
+                continue;
+            }
+
+            Map<Long, ScanRanges> partitionToScanRanges = 
splitScanRangesByPartition(scanRanges, tabletToPartitionId);
+            if (partitionToScanRanges == null) {
+                return null;
+            }
+
+            // One partition on one BE maps to one instance. Different BEs may 
miss some partitions.
+            for (Long partitionId : selectedPartitionIds) {
+                ScanRanges partitionScanRanges = 
partitionToScanRanges.remove(partitionId);
+                if (partitionScanRanges == null || 
partitionScanRanges.params.isEmpty()) {
+                    continue;
+                }
+                instances.add(assignWorkerAndDataSources(
+                        instances.size(), context.nextInstanceId(), worker,
+                        new DefaultScanSource(ImmutableMap.of(olapScanNode, 
partitionScanRanges))));
+            }
+
+            if (!partitionToScanRanges.isEmpty()) {
+                return null;
+            }
+        }
+        return instances;
+    }
+
+    private boolean usePartitionParallelismForQueryCache(
+            Map<DistributedPlanWorker, UninstancedScanSource> 
workerToScanRanges,
+            DistributeContext distributeContext) {
+        if (fragment.queryCacheParam == null || workerToScanRanges.isEmpty()) {
+            return false;
+        }
+
+        ConnectContext context = statementContext.getConnectContext();
+        if (context == null || 
useLocalShuffleToAddParallel(distributeContext)) {
+            return false;
+        }
+
+        long totalTabletNum = olapScanNode.getScanTabletIds().size();
+        int parallelPipelineTaskNum = Math.max(
+                context.getSessionVariable().getParallelExecInstanceNum(
+                        olapScanNode.getScanContext().getClusterName()), 1);
+        long threshold = (long) parallelPipelineTaskNum * 
workerToScanRanges.size();
+        return totalTabletNum > threshold;
+    }
+
+    private Map<Long, Long> buildTabletToPartitionId(List<Long> 
selectedPartitionIds) {
+        long selectedIndexId = olapScanNode.getSelectedIndexId();
+        if (selectedIndexId == -1) {
+            selectedIndexId = olapScanNode.getOlapTable().getBaseIndexId();
+        }
+
+        Set<Long> scanTabletIds = new 
LinkedHashSet<>(olapScanNode.getScanTabletIds());
+        Map<Long, Long> tabletToPartitionId = new 
LinkedHashMap<>(scanTabletIds.size());
+        for (Long partitionId : selectedPartitionIds) {
+            Partition partition = 
olapScanNode.getOlapTable().getPartition(partitionId);
+            if (partition == null) {
+                continue;
+            }
+            MaterializedIndex index = partition.getIndex(selectedIndexId);
+            if (index == null) {
+                continue;
+            }
+            for (Tablet tablet : index.getTablets()) {
+                long tabletId = tablet.getId();
+                if (scanTabletIds.contains(tabletId)) {
+                    tabletToPartitionId.put(tabletId, partitionId);
+                }
+            }
+        }
+        return tabletToPartitionId;
+    }
+
+    private Map<Long, ScanRanges> splitScanRangesByPartition(
+            ScanRanges scanRanges, Map<Long, Long> tabletToPartitionId) {
+        Map<Long, ScanRanges> partitionToScanRanges = new LinkedHashMap<>();
+        for (int i = 0; i < scanRanges.params.size(); i++) {
+            TScanRangeParams scanRangeParams = scanRanges.params.get(i);
+            long tabletId = 
scanRangeParams.getScanRange().getPaloScanRange().getTabletId();
+            Long partitionId = tabletToPartitionId.get(tabletId);
+            if (partitionId == null) {
+                return null;
+            }
+            partitionToScanRanges
+                    .computeIfAbsent(partitionId, id -> new ScanRanges())
+                    .addScanRange(scanRangeParams, scanRanges.bytes.get(i));
+        }
+        return partitionToScanRanges;
+    }
+
     @Override
     protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> 
assignedJobs,
             DistributedPlanWorkerManager workerManager, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
new file mode 100644
index 00000000000..097e7930959
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.catalog.LocalTablet;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.ScanContext;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.thrift.TPaloScanRange;
+import org.apache.doris.thrift.TQueryCacheParam;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class UnassignedScanSingleOlapTableJobTest {
+    @Test
+    public void testQueryCacheAssignByPartition() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(1, 1));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        long partitionOne = 100L;
+        long partitionTwo = 200L;
+        long selectedIndexId = 10L;
+        Map<Long, Long> tabletToPartition = ImmutableMap.of(
+                1L, partitionOne,
+                2L, partitionOne,
+                3L, partitionOne,
+                4L, partitionTwo,
+                5L, partitionTwo,
+                6L, partitionTwo
+        );
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        OlapTable olapTable = Mockito.mock(OlapTable.class);
+        Mockito.when(olapScanNode.getSelectedPartitionIds())
+                .thenReturn(Arrays.asList(partitionOne, partitionTwo));
+        
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+        Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+        
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+        Mockito.when(olapScanNode.getScanTabletIds())
+                .thenReturn(new ArrayList<>(tabletToPartition.keySet()));
+
+        Partition firstPartition = Mockito.mock(Partition.class);
+        MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+        
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+        Mockito.when(firstIndex.getTablets()).thenReturn(ImmutableList.of(
+                tablet(1L), tablet(2L), tablet(3L)
+        ));
+
+        Partition secondPartition = Mockito.mock(Partition.class);
+        MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition);
+        
Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex);
+        Mockito.when(secondIndex.getTablets()).thenReturn(ImmutableList.of(
+                tablet(4L), tablet(5L), tablet(6L)
+        ));
+
+        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, 
DataPartition.RANDOM);
+        fragment.queryCacheParam = new TQueryCacheParam();
+
+        DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+        DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+        Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+                = new LinkedHashMap<>();
+        // Same partition tablets on one BE should be grouped into one 
instance.
+        workerToScanSources.put(worker1, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+        workerToScanSources.put(worker2, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+        Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+                Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+        )).thenReturn(workerToScanSources);
+
+        UnassignedScanSingleOlapTableJob unassignedJob = new 
UnassignedScanSingleOlapTableJob(
+                statementContext,
+                fragment,
+                olapScanNode,
+                ArrayListMultimap.create(),
+                scanWorkerSelector
+        );
+        DistributeContext distributeContext = new DistributeContext(
+                Mockito.mock(DistributedPlanWorkerManager.class),
+                true
+        );
+
+        List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+                distributeContext, ArrayListMultimap.create());
+
+        Assertions.assertEquals(4, assignedJobs.size());
+
+        Map<Long, Set<Set<Long>>> workerToInstanceTablets = new HashMap<>();
+        for (AssignedJob assignedJob : assignedJobs) {
+            DefaultScanSource defaultScanSource = (DefaultScanSource) 
assignedJob.getScanSource();
+            ScanRanges ranges = 
defaultScanSource.scanNodeToScanRanges.get(olapScanNode);
+            Set<Long> tabletIds = ranges.params.stream()
+                    .map(param -> 
param.getScanRange().getPaloScanRange().getTabletId())
+                    .collect(Collectors.toCollection(HashSet::new));
+            Set<Long> partitionIds = tabletIds.stream()
+                    .map(tabletToPartition::get)
+                    .collect(Collectors.toSet());
+
+            // Every instance must only contain tablets from one partition.
+            Assertions.assertEquals(1, partitionIds.size());
+
+            workerToInstanceTablets.computeIfAbsent(
+                    assignedJob.getAssignedWorker().id(), k -> new HashSet<>()
+            ).add(tabletIds);
+        }
+
+        Map<Long, Set<Set<Long>>> expected = new HashMap<>();
+        expected.put(1L, new HashSet<>(Arrays.asList(
+                new HashSet<>(Arrays.asList(1L, 2L)),
+                new HashSet<>(Arrays.asList(4L))
+        )));
+        expected.put(2L, new HashSet<>(Arrays.asList(
+                new HashSet<>(Arrays.asList(3L)),
+                new HashSet<>(Arrays.asList(5L, 6L))
+        )));
+
+        // Different partitions are split into different instances on each BE.
+        Assertions.assertEquals(expected, workerToInstanceTablets);
+    }
+
+    @Test
+    public void 
testQueryCacheFallbackToDefaultWhenPartitionMappingIncomplete() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(2, 2));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        long partitionOne = 100L;
+        long selectedIndexId = 10L;
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        OlapTable olapTable = Mockito.mock(OlapTable.class);
+        // Intentionally miss partitionTwo to trigger fallback.
+        Mockito.when(olapScanNode.getSelectedPartitionIds())
+                .thenReturn(ImmutableList.of(partitionOne));
+        
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+        Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+        
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+        Mockito.when(olapScanNode.getScanTabletIds())
+                .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 
5L, 6L)));
+
+        Partition firstPartition = Mockito.mock(Partition.class);
+        MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+        
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+        Mockito.when(firstIndex.getTablets())
+                .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), 
tablet(3L)));
+
+        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, 
DataPartition.RANDOM);
+        fragment.queryCacheParam = new TQueryCacheParam();
+
+        DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+        DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+        Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+                = new LinkedHashMap<>();
+        workerToScanSources.put(worker1, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+        workerToScanSources.put(worker2, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+        Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+                Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+        )).thenReturn(workerToScanSources);
+
+        UnassignedScanSingleOlapTableJob unassignedJob = new 
UnassignedScanSingleOlapTableJob(
+                statementContext,
+                fragment,
+                olapScanNode,
+                ArrayListMultimap.create(),
+                scanWorkerSelector
+        );
+
+        List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+                new 
DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true),
+                ArrayListMultimap.create());
+
+        // query cache default planning uses one instance per tablet.
+        Assertions.assertEquals(6, assignedJobs.size());
+    }
+
+    @Test
+    public void testNonQueryCacheUseDefaultPlanning() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(3, 3));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        long partitionOne = 100L;
+        long partitionTwo = 200L;
+        long selectedIndexId = 10L;
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        OlapTable olapTable = Mockito.mock(OlapTable.class);
+        Mockito.when(olapScanNode.getSelectedPartitionIds())
+                .thenReturn(Arrays.asList(partitionOne, partitionTwo));
+        
Mockito.when(olapScanNode.getSelectedIndexId()).thenReturn(selectedIndexId);
+        Mockito.when(olapScanNode.getOlapTable()).thenReturn(olapTable);
+        
Mockito.when(olapScanNode.getScanContext()).thenReturn(ScanContext.EMPTY);
+        Mockito.when(olapScanNode.getScanTabletIds())
+                .thenReturn(new ArrayList<>(ImmutableList.of(1L, 2L, 3L, 4L, 
5L, 6L)));
+
+        Partition firstPartition = Mockito.mock(Partition.class);
+        MaterializedIndex firstIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionOne)).thenReturn(firstPartition);
+        
Mockito.when(firstPartition.getIndex(selectedIndexId)).thenReturn(firstIndex);
+        Mockito.when(firstIndex.getTablets())
+                .thenReturn(ImmutableList.of(tablet(1L), tablet(2L), 
tablet(3L)));
+
+        Partition secondPartition = Mockito.mock(Partition.class);
+        MaterializedIndex secondIndex = Mockito.mock(MaterializedIndex.class);
+        
Mockito.when(olapTable.getPartition(partitionTwo)).thenReturn(secondPartition);
+        
Mockito.when(secondPartition.getIndex(selectedIndexId)).thenReturn(secondIndex);
+        Mockito.when(secondIndex.getTablets())
+                .thenReturn(ImmutableList.of(tablet(4L), tablet(5L), 
tablet(6L)));
+
+        PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), null, 
DataPartition.RANDOM);
+        // No query cache param, must use default planning.
+        fragment.setParallelExecNum(10);
+
+        DistributedPlanWorker worker1 = new TestWorker(1L, "be1");
+        DistributedPlanWorker worker2 = new TestWorker(2L, "be2");
+        Map<DistributedPlanWorker, UninstancedScanSource> workerToScanSources
+                = new LinkedHashMap<>();
+        workerToScanSources.put(worker1, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(1L, 2L, 4L)))));
+        workerToScanSources.put(worker2, new UninstancedScanSource(new 
DefaultScanSource(
+                ImmutableMap.of(olapScanNode, scanRanges(3L, 5L, 6L)))));
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+        Mockito.when(scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+                Mockito.eq(olapScanNode), Mockito.eq(connectContext)
+        )).thenReturn(workerToScanSources);
+
+        UnassignedScanSingleOlapTableJob unassignedJob = new 
UnassignedScanSingleOlapTableJob(
+                statementContext,
+                fragment,
+                olapScanNode,
+                ArrayListMultimap.create(),
+                scanWorkerSelector
+        );
+
+        List<AssignedJob> assignedJobs = unassignedJob.computeAssignedJobs(
+                new 
DistributeContext(Mockito.mock(DistributedPlanWorkerManager.class), true),
+                ArrayListMultimap.create());
+
+        // default planning splits by tablet count when parallelExecNum is 
large enough.
+        Assertions.assertEquals(6, assignedJobs.size());
+    }
+
+    private static Tablet tablet(long tabletId) {
+        return new LocalTablet(tabletId);
+    }
+
+    private static ScanRanges scanRanges(long... tabletIds) {
+        ScanRanges scanRanges = new ScanRanges();
+        for (long tabletId : tabletIds) {
+            TPaloScanRange paloScanRange = new TPaloScanRange();
+            paloScanRange.setTabletId(tabletId);
+            TScanRange scanRange = new TScanRange();
+            scanRange.setPaloScanRange(paloScanRange);
+            TScanRangeParams scanRangeParams = new TScanRangeParams();
+            scanRangeParams.setScanRange(scanRange);
+            scanRanges.addScanRange(scanRangeParams, 1L);
+        }
+        return scanRanges;
+    }
+
+    private static class TestWorker implements DistributedPlanWorker {
+        private final long id;
+        private final String address;
+
+        private TestWorker(long id, String address) {
+            this.id = id;
+            this.address = address;
+        }
+
+        @Override
+        public long getCatalogId() {
+            return 0;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public String address() {
+            return address;
+        }
+
+        @Override
+        public String host() {
+            return address;
+        }
+
+        @Override
+        public int port() {
+            return 0;
+        }
+
+        @Override
+        public String brpcAddress() {
+            return address;
+        }
+
+        @Override
+        public int brpcPort() {
+            return 0;
+        }
+
+        @Override
+        public boolean available() {
+            return true;
+        }
+    }
+}
diff --git 
a/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out 
b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out
new file mode 100644
index 00000000000..111f7a196fa
--- /dev/null
+++ 
b/regression-test/data/query_p0/cache/test_partition_instance_query_cache.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !partition_instance_query_result --
+/a     75
+/b     105
+/c     135
+/d     165
diff --git 
a/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
 
b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
new file mode 100644
index 00000000000..6d2281e3ef6
--- /dev/null
+++ 
b/regression-test/suites/query_p0/cache/test_partition_instance_query_cache.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partition_instance_query_cache") {
+    def tableName = "test_partition_instance"
+    def querySql = """
+        SELECT
+            url,
+            SUM(cost) AS total_cost
+        FROM ${tableName}
+        WHERE dt >= '2026-01-01'
+          AND dt < '2026-01-15'
+        GROUP BY url
+    """
+
+    sql "set enable_nereids_planner=true"
+    sql "set enable_nereids_distribute_planner=true"
+    sql "set enable_query_cache=true"
+    sql "set parallel_pipeline_task_num=3"
+    sql "set enable_sql_cache=false"
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+        CREATE TABLE ${tableName} (
+            dt DATE,
+            user_id INT,
+            url STRING,
+            cost BIGINT
+        )
+        ENGINE=OLAP
+        DUPLICATE KEY(dt, user_id)
+        PARTITION BY RANGE(dt)
+        (
+            PARTITION p20260101 VALUES LESS THAN ("2026-01-05"),
+            PARTITION p20260105 VALUES LESS THAN ("2026-01-10"),
+            PARTITION p20260110 VALUES LESS THAN ("2026-01-15")
+        )
+        DISTRIBUTED BY HASH(user_id) BUCKETS 3
+        PROPERTIES
+        (
+            "replication_num" = "1"
+        )
+    """
+
+    sql """
+        INSERT INTO ${tableName} VALUES
+        ('2026-01-01',1,'/a',10),
+        ('2026-01-01',2,'/b',20),
+        ('2026-01-02',3,'/c',30),
+        ('2026-01-03',4,'/d',40),
+
+        ('2026-01-06',1,'/a',15),
+        ('2026-01-06',2,'/b',25),
+        ('2026-01-07',3,'/c',35),
+        ('2026-01-08',4,'/d',45),
+
+        ('2026-01-11',1,'/a',50),
+        ('2026-01-11',2,'/b',60),
+        ('2026-01-12',3,'/c',70),
+        ('2026-01-13',4,'/d',80)
+    """
+
+    order_qt_partition_instance_query_result """
+        ${querySql}
+        ORDER BY url
+    """
+
+    def normalize = { rows ->
+        return rows.collect { row -> row.collect { col -> String.valueOf(col) 
}.join("|") }.sort()
+    }
+
+    def baseline = normalize(sql(querySql))
+    for (int i = 0; i < 3; i++) {
+        assertEquals(baseline, normalize(sql(querySql)))
+    }
+
+    explain {
+        sql(querySql)
+        contains("DIGEST")
+    }
+
+    def distributedRows = sql("EXPLAIN DISTRIBUTED PLAN ${querySql}")
+    def distributedPlan = distributedRows.collect { it[0].toString() 
}.join("\n")
+    assertTrue(distributedPlan.contains("UnassignedScanSingleOlapTableJob"))
+
+    def partitionMatcher = (distributedPlan =~ /partitions=(\d+)\/(\d+)/)
+    assertTrue(partitionMatcher.find())
+    int partitionCount = Integer.parseInt(partitionMatcher.group(1))
+
+    int scanFragmentBegin = distributedPlan.indexOf("fragmentJob: 
UnassignedScanSingleOlapTableJob")
+    assertTrue(scanFragmentBegin > 0)
+    def scanFragment = distributedPlan.substring(scanFragmentBegin)
+
+    int scanInstanceCount = (scanFragment =~ /StaticAssignedJob\(/).count
+    assertEquals(partitionCount, scanInstanceCount)
+
+    def instanceToTablets = [:].withDefault { [] }
+    String currentInstance = null
+    scanFragment.eachLine { line ->
+        def instanceMatcher = (line =~ /instanceId:\s*([0-9a-f\-]+)/)
+        if (instanceMatcher.find()) {
+            currentInstance = instanceMatcher.group(1)
+            instanceToTablets[currentInstance] = []
+        }
+
+        def tabletMatcher = (line =~ /tablet\s+(\d+)/)
+        if (tabletMatcher.find() && currentInstance != null) {
+            instanceToTablets[currentInstance] << tabletMatcher.group(1)
+        }
+    }
+
+    assertEquals(partitionCount, instanceToTablets.size())
+    instanceToTablets.each { _, tablets ->
+        assertTrue(tablets.size() > 0)
+    }
+
+    def tabletToInstance = [:]
+    instanceToTablets.each { instanceId, tablets ->
+        tablets.each { tabletId ->
+            tabletToInstance[tabletId] = instanceId
+        }
+    }
+
+    ["p20260101", "p20260105", "p20260110"].each { partitionName ->
+        def partitionTabletRows = sql("SHOW TABLETS FROM ${tableName} 
PARTITION(${partitionName})")
+        def partitionTabletIds = partitionTabletRows.collect { 
it[0].toString() }
+        assertTrue(partitionTabletIds.size() > 0)
+
+        partitionTabletIds.each { tabletId ->
+            assertTrue(tabletToInstance.containsKey(tabletId))
+        }
+
+        def partitionInstanceIds = partitionTabletIds.collect { tabletId -> 
tabletToInstance[tabletId] }.toSet()
+        assertEquals(1, partitionInstanceIds.size())
+    }
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to