This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a0a21f8482f [fix](regression) Wait for completed profiles in RF 
pruning tests (#64392)
a0a21f8482f is described below

commit a0a21f8482f0e908c17b8f673c677f67e9b311d7
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 16 10:46:25 2026 +0800

    [fix](regression) Wait for completed profiles in RF pruning tests (#64392)
    
    Problem Summary: Runtime-filter partition-pruning regression tests read
    FE query profiles immediately after query completion. Query completion
    does not guarantee that asynchronous BE profile reports have been
    merged, so the tests can see RF pruning counters before their final
    values and fail with zero partition counters. This change exposes a
    profile completion state in the query profile REST list based on
    ExecutionProfile.isCompleted(), persists the terminal state before
    profile spill, and updates the RF pruning profile pollers to wait for
    COMPLETE before reading counter values.
---
 .../doris/common/profile/ExecutionProfile.java     |  66 ++++++---
 .../org/apache/doris/common/profile/Profile.java   |  46 +++++++
 .../doris/common/profile/ProfileManager.java       |  13 +-
 .../doris/common/profile/SummaryProfile.java       |  10 ++
 .../httpv2/controller/QueryProfileController.java  |   3 +-
 .../qe/runtime/PipelineExecutionTaskBuilder.java   |   9 +-
 .../doris/common/profile/ProfileManagerTest.java   | 148 +++++++++++++++++++++
 .../runtime/PipelineExecutionTaskBuilderTest.java  |  96 +++++++++++++
 .../runtime_filter/rf_partition_pruning.groovy     |  14 +-
 .../rf_partition_pruning_type_matrix.groovy        |  12 +-
 10 files changed, 390 insertions(+), 27 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 2f6b0992ef6..47d6fce03ac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -35,9 +35,11 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -71,6 +73,7 @@ public class ExecutionProfile {
 
     // use to merge profile from multi be
     private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> 
multiBeProfile = null;
+    private Map<Integer, Set<TNetworkAddress>> fragmentIdDoneBackends;
     private ReentrantReadWriteLock multiBeProfileLock = new 
ReentrantReadWriteLock();
 
     // Not serialize this property, it is only used to get profile id.
@@ -88,6 +91,7 @@ public class ExecutionProfile {
         root.addChild(fragmentsProfile, true);
         fragmentProfiles = Maps.newHashMap();
         multiBeProfile = Maps.newHashMap();
+        fragmentIdDoneBackends = Maps.newHashMap();
         fragmentIdBeNum = Maps.newHashMap();
         seqNoToFragmentId = Maps.newHashMap();
         int i = 0;
@@ -96,6 +100,7 @@ public class ExecutionProfile {
             fragmentProfiles.put(fragmentId, runtimeProfile);
             fragmentsProfile.addChild(runtimeProfile, true);
             multiBeProfile.put(fragmentId, Maps.newHashMap());
+            fragmentIdDoneBackends.put(fragmentId, new HashSet<>());
             fragmentIdBeNum.put(fragmentId, 0);
             seqNoToFragmentId.put(i, fragmentId);
             ++i;
@@ -147,7 +152,7 @@ public class ExecutionProfile {
     }
 
     protected void setMultiBeProfile(int fragmentId, TNetworkAddress 
backendHBAddress,
-                                List<RuntimeProfile> taskProfile) {
+            List<RuntimeProfile> taskProfile) {
         multiBeProfileLock.writeLock().lock();
         try {
             multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
@@ -156,6 +161,36 @@ public class ExecutionProfile {
         }
     }
 
+    private boolean updateMultiBeProfile(int fragmentId, TNetworkAddress 
backendHBAddress,
+            List<RuntimeProfile> taskProfile, boolean isDone) {
+        multiBeProfileLock.writeLock().lock();
+        try {
+            if (!isDone && 
fragmentIdDoneBackends.get(fragmentId).contains(backendHBAddress)) {
+                return false;
+            }
+            multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
+            if (isDone) {
+                fragmentIdDoneBackends.get(fragmentId).add(backendHBAddress);
+            }
+            return true;
+        } finally {
+            multiBeProfileLock.writeLock().unlock();
+        }
+    }
+
+    private boolean areAllBackendProfilesDone(int fragmentId, int 
expectedBackendNum) {
+        if (expectedBackendNum == 0) {
+            return false;
+        }
+
+        multiBeProfileLock.readLock().lock();
+        try {
+            return fragmentIdDoneBackends.get(fragmentId).size() >= 
expectedBackendNum;
+        } finally {
+            multiBeProfileLock.readLock().unlock();
+        }
+    }
+
     protected RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> 
planNodeMap) {
         RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
         for (int i = 0; i < fragmentProfiles.size(); ++i) {
@@ -233,6 +268,7 @@ public class ExecutionProfile {
             int fragmentId = entry.getKey();
             List<TDetailedReportParams> fragmentProfile = entry.getValue();
             int pipelineIdx = 0;
+            List<RuntimeProfile> profileNodes = Lists.newArrayList();
             List<RuntimeProfile> taskProfile = Lists.newArrayList();
             String suffix = "(host=" + backendHBAddress + ")";
             for (TDetailedReportParams pipelineProfile : fragmentProfile) {
@@ -259,9 +295,14 @@ public class ExecutionProfile {
 
                 profileNode.update(pipelineProfile.profile);
                 profileNode.setIsDone(isDone);
+                profileNodes.add(profileNode);
+            }
+            if (!updateMultiBeProfile(fragmentId, backendHBAddress, 
taskProfile, isDone)) {
+                continue;
+            }
+            for (RuntimeProfile profileNode : profileNodes) {
                 fragmentProfiles.get(fragmentId).addChild(profileNode, true);
             }
-            setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
         }
 
         LOG.info("Profile update finished query: {} fragments: {} isDone: {}",
@@ -277,29 +318,24 @@ public class ExecutionProfile {
     }
 
     public synchronized void addFragmentBackend(PlanFragmentId fragmentId, 
Long backendId) {
-        fragmentIdBeNum.put(fragmentId.asInt(), 
fragmentIdBeNum.get(fragmentId.asInt()) + 1);
+        addFragmentBackend(fragmentId.asInt(), backendId);
+    }
+
+    public synchronized void addFragmentBackend(int fragmentId, Long 
backendId) {
+        fragmentIdBeNum.put(fragmentId, fragmentIdBeNum.get(fragmentId) + 1);
     }
 
     public TUniqueId getQueryId() {
         return queryId;
     }
 
-    // Check all fragments's child, if all finished, then this execution 
profile is finished
+    // Check all fragments' backend reports. A backend can contribute multiple 
profile nodes.
     public boolean isCompleted() {
         for (Entry<Integer, RuntimeProfile> element : 
fragmentProfiles.entrySet()) {
-            RuntimeProfile fragmentProfile = element.getValue();
-            // If any fragment is empty, it means BE does not report the 
profile, then the total
-            // execution profile is not completed.
-            if (fragmentProfile.isEmpty()
-                    || fragmentProfile.getChildList().size() < 
fragmentIdBeNum.get(element.getKey())) {
+            int fragmentId = element.getKey();
+            if (!areAllBackendProfilesDone(fragmentId, 
fragmentIdBeNum.get(fragmentId))) {
                 return false;
             }
-            for (Pair<RuntimeProfile, Boolean> runtimeProfile : 
fragmentProfile.getChildList()) {
-                // If any child instance profile is not ready, then return 
false.
-                if (!(runtimeProfile.first.getIsDone() || 
runtimeProfile.first.getIsCancel())) {
-                    return false;
-                }
-            }
         }
         return true;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 72b74ae1b66..f2f35da51bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -348,6 +348,7 @@ public class Profile {
             LOG.info("DebugPoint:Profile.profileSizeLimit, MAX_PROFILE_SIZE = 
{}", maxProfileSize);
         }
         // add summary to builder
+        updateProfileCompletionStateForDisplay();
         summaryProfile.prettyPrint(builder);
         if (!builder.isTruncated()) {
             getChangedSessionVars(builder);
@@ -599,6 +600,50 @@ public class Profile {
         return !Strings.isNullOrEmpty(profileStoragePath);
     }
 
+    public String getProfileCompletionState() {
+        if (profileHasBeenStored()) {
+            String storedState = 
summaryProfile.getSummary().getInfoString(SummaryProfile.PROFILE_COMPLETION_STATE);
+            if (!Strings.isNullOrEmpty(storedState)) {
+                return storedState;
+            }
+            return SummaryProfile.PROFILE_COMPLETION_STATE_UNKNOWN;
+        }
+
+        if (!isQueryFinished) {
+            return SummaryProfile.PROFILE_COMPLETION_STATE_RUNNING;
+        }
+
+        for (int i = 0; i < executionProfiles.size(); i++) {
+            ExecutionProfile executionProfile = executionProfiles.get(i);
+            if (!executionProfile.isCompleted()) {
+                return SummaryProfile.PROFILE_COMPLETION_STATE_COLLECTING;
+            }
+        }
+        return SummaryProfile.PROFILE_COMPLETION_STATE_COMPLETE;
+    }
+
+    private String getProfileCompletionStateForStorage() {
+        if (!isQueryFinished) {
+            return SummaryProfile.PROFILE_COMPLETION_STATE_RUNNING;
+        }
+
+        for (int i = 0; i < executionProfiles.size(); i++) {
+            ExecutionProfile executionProfile = executionProfiles.get(i);
+            if (!executionProfile.isCompleted()) {
+                return SummaryProfile.PROFILE_COMPLETION_STATE_INCOMPLETE;
+            }
+        }
+        return SummaryProfile.PROFILE_COMPLETION_STATE_COMPLETE;
+    }
+
+    private void updateProfileCompletionStateForStorage() {
+        
summaryProfile.setProfileCompletionState(getProfileCompletionStateForStorage());
+    }
+
+    private void updateProfileCompletionStateForDisplay() {
+        summaryProfile.setProfileCompletionState(getProfileCompletionState());
+    }
+
     // Profile IO threads races with Coordinator threads.
     public void markQueryFinished() {
         try {
@@ -648,6 +693,7 @@ public class Profile {
             DataOutputStream memoryDataStream = new 
DataOutputStream(memoryStream);
 
             // Write summary profile and execution profile content to memory
+            updateProfileCompletionStateForStorage();
             this.summaryProfile.write(memoryDataStream);
 
             SafeStringBuilder builder = new SafeStringBuilder();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
index 5dce9890f00..2034ee931ba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
@@ -234,6 +234,13 @@ public class ProfileManager extends MasterDaemon {
         return getQueryInfoByColumnNameList(SummaryProfile.SUMMARY_KEYS);
     }
 
+    private String getProfileInfoString(ProfileElement profileElement, String 
columnName) {
+        if (SummaryProfile.PROFILE_COMPLETION_STATE.equals(columnName)) {
+            return profileElement.profile.getProfileCompletionState();
+        }
+        return profileElement.infoStrings.get(columnName);
+    }
+
     public List<List<String>> getQueryInfoByColumnNameList(List<String> 
columnNameList) {
         List<List<String>> result = Lists.newArrayList();
         readLock.lock();
@@ -241,10 +248,9 @@ public class ProfileManager extends MasterDaemon {
             PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryFinishTimeDesc();
             while (!queueIdDeque.isEmpty()) {
                 ProfileElement profileElement = queueIdDeque.poll();
-                Map<String, String> infoStrings = profileElement.infoStrings;
                 List<String> row = Lists.newArrayList();
                 for (String str : columnNameList) {
-                    row.add(infoStrings.get(str));
+                    row.add(getProfileInfoString(profileElement, str));
                 }
                 result.add(row);
             }
@@ -1094,7 +1100,7 @@ public class ProfileManager extends MasterDaemon {
                 if 
(infoStrings.get(SummaryProfile.TASK_TYPE).equals(profileType.toString())) {
                     List<String> row = Lists.newArrayList();
                     for (String str : SummaryProfile.SUMMARY_KEYS) {
-                        row.add(infoStrings.get(str));
+                        row.add(getProfileInfoString(profileElement, str));
                     }
                     result.add(row);
                     limit--;
@@ -1235,4 +1241,3 @@ public class ProfileManager extends MasterDaemon {
         }
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 373e81dd1bd..22e12c7ff02 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -62,6 +62,12 @@ public class SummaryProfile {
     public static final String END_TIME = "End Time";
     public static final String TOTAL_TIME = "Total";
     public static final String TASK_STATE = "Task State";
+    public static final String PROFILE_COMPLETION_STATE = "Profile Completion 
State";
+    public static final String PROFILE_COMPLETION_STATE_RUNNING = "RUNNING";
+    public static final String PROFILE_COMPLETION_STATE_COLLECTING = 
"COLLECTING";
+    public static final String PROFILE_COMPLETION_STATE_COMPLETE = "COMPLETE";
+    public static final String PROFILE_COMPLETION_STATE_INCOMPLETE = 
"INCOMPLETE";
+    public static final String PROFILE_COMPLETION_STATE_UNKNOWN = "UNKNOWN";
     public static final String USER = "User";
     public static final String DEFAULT_CATALOG = "Default Catalog";
     public static final String DEFAULT_DB = "Default Db";
@@ -528,6 +534,10 @@ public class SummaryProfile {
         return summaryProfile;
     }
 
+    public void setProfileCompletionState(String profileCompletionState) {
+        summaryProfile.addInfoString(PROFILE_COMPLETION_STATE, 
profileCompletionState);
+    }
+
     public RuntimeProfile getExecutionSummary() {
         return executionSummaryProfile;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java
index 767bad41688..175bdd3cf74 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java
@@ -78,9 +78,10 @@ public class QueryProfileController extends BaseController {
     }
 
     private void addFinishedQueryInfo(Map<String, Object> result) {
-        List<List<String>> finishedQueries = 
ProfileManager.getInstance().getAllQueries();
         List<String> columnHeaders = Lists.newLinkedList();
         columnHeaders.addAll(SummaryProfile.SUMMARY_CAPTIONS);
+        columnHeaders.add(SummaryProfile.PROFILE_COMPLETION_STATE);
+        List<List<String>> finishedQueries = 
ProfileManager.getInstance().getQueryInfoByColumnNameList(columnHeaders);
 
         result.put("column_names", columnHeaders);
         // The first column is profile id, which is also a href column
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
index d9503e3145a..03f6f923dc2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.doris.qe.runtime;
 
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
@@ -86,15 +87,16 @@ public class PipelineExecutionTaskBuilder {
                             backend,
                             backendServiceProxy,
                             serializeFragments,
-                            buildSingleFragmentPipelineTask(backend, 
fragmentParamsList)
+                            buildSingleFragmentPipelineTask(
+                                    coordinatorContext.executionProfile, 
backend, fragmentParamsList)
                     )
             );
         }
         return fragmentTasks;
     }
 
-    private Map<Integer, SingleFragmentPipelineTask> 
buildSingleFragmentPipelineTask(
-            Backend backend, TPipelineFragmentParamsList fragmentParamsList) {
+    static Map<Integer, SingleFragmentPipelineTask> 
buildSingleFragmentPipelineTask(
+            ExecutionProfile executionProfile, Backend backend, 
TPipelineFragmentParamsList fragmentParamsList) {
         Map<Integer, SingleFragmentPipelineTask> tasks = 
Maps.newLinkedHashMap();
         for (TPipelineFragmentParams fragmentParams : 
fragmentParamsList.getParamsList()) {
             int fragmentId = fragmentParams.getFragmentId();
@@ -103,6 +105,7 @@ public class PipelineExecutionTaskBuilder {
                     .map(TPipelineInstanceParams::getFragmentInstanceId)
                     .collect(Collectors.toSet());
             tasks.put(fragmentId, new SingleFragmentPipelineTask(backend, 
fragmentId, instanceIds));
+            executionProfile.addFragmentBackend(fragmentId, backend.getId());
         }
         return tasks;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
index eae1ca4adb4..7c681814f60 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
@@ -20,9 +20,19 @@ package org.apache.doris.common.profile;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.profile.ProfileManager.ProfileElement;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.thrift.TCounter;
+import org.apache.doris.thrift.TDetailedReportParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryProfile;
+import org.apache.doris.thrift.TRuntimeProfileNode;
+import org.apache.doris.thrift.TRuntimeProfileTree;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TUnit;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -49,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @ResourceLock("global")
 class ProfileManagerTest {
     private static final Logger LOG = 
LogManager.getLogger(ProfilePersistentTest.class);
+    private static final String PROFILE_TEST_COUNTER = "ProfileRows";
 
     private static ProfileManager profileManager;
     private File tempDir;
@@ -95,6 +106,143 @@ class ProfileManagerTest {
         return profile;
     }
 
+    private static TDetailedReportParams createReportParam(String name, 
boolean isFragmentLevel) {
+        return createReportParam(name, isFragmentLevel, -1);
+    }
+
+    private static TDetailedReportParams createReportParam(String name, 
boolean isFragmentLevel, long counterValue) {
+        TRuntimeProfileNode node = new TRuntimeProfileNode();
+        node.setName(name);
+        node.setNumChildren(0);
+        node.setCounters(Lists.newArrayList());
+        node.setMetadata(0);
+        node.setIndent(false);
+        node.setInfoStrings(Maps.newHashMap());
+        node.setInfoStringsDisplayOrder(Lists.newArrayList());
+        node.setChildCountersMap(Maps.newHashMap());
+        node.setTimestamp(0);
+        if (counterValue >= 0) {
+            TCounter counter = new TCounter(PROFILE_TEST_COUNTER, TUnit.UNIT, 
counterValue);
+            counter.setLevel(1);
+            node.getCounters().add(counter);
+            node.getChildCountersMap().put(RuntimeProfile.ROOT_COUNTER, 
Sets.newHashSet(PROFILE_TEST_COUNTER));
+        }
+
+        TRuntimeProfileTree tree = new TRuntimeProfileTree();
+        tree.setNodes(Lists.newArrayList(node));
+
+        TDetailedReportParams params = new TDetailedReportParams();
+        params.setProfile(tree);
+        params.setIsFragmentLevel(isFragmentLevel);
+        return params;
+    }
+
+    private static TQueryProfile createQueryProfile(TUniqueId queryId, int 
fragmentId, String suffix) {
+        return createQueryProfile(queryId, fragmentId, suffix, -1);
+    }
+
+    private static TQueryProfile createQueryProfile(TUniqueId queryId, int 
fragmentId, String suffix,
+            long counterValue) {
+        TQueryProfile queryProfile = new TQueryProfile();
+        queryProfile.setQueryId(queryId);
+        queryProfile.putToFragmentIdToProfile(fragmentId, Lists.newArrayList(
+                createReportParam("FragmentLevelProfile-" + suffix, true),
+                createReportParam("PipelineProfile-" + suffix, false, 
counterValue)));
+        return queryProfile;
+    }
+
+    @Test
+    void getProfileCompletionStateInQueryList() {
+        Profile runningProfile = constructProfile("running");
+        profileManager.pushProfile(runningProfile);
+
+        Profile collectingProfile = constructProfile("collecting");
+        collectingProfile.markQueryFinished();
+        UUID collectingTaskId = UUID.randomUUID();
+        TUniqueId collectingQueryId = new 
TUniqueId(collectingTaskId.getMostSignificantBits(),
+                collectingTaskId.getLeastSignificantBits());
+        ExecutionProfile collectingExecutionProfile = new 
ExecutionProfile(collectingQueryId, Lists.newArrayList(0));
+        collectingExecutionProfile.addFragmentBackend(new PlanFragmentId(0), 
1L);
+        collectingProfile.addExecutionProfile(collectingExecutionProfile);
+        profileManager.pushProfile(collectingProfile);
+
+        Profile completeProfile = constructProfile("complete");
+        completeProfile.markQueryFinished();
+        UUID completeTaskId = UUID.randomUUID();
+        TUniqueId completeQueryId = new 
TUniqueId(completeTaskId.getMostSignificantBits(),
+                completeTaskId.getLeastSignificantBits());
+        completeProfile.addExecutionProfile(new 
ExecutionProfile(completeQueryId, Lists.newArrayList()));
+        profileManager.pushProfile(completeProfile);
+
+        List<List<String>> rows = 
profileManager.getQueryInfoByColumnNameList(Lists.newArrayList(
+                SummaryProfile.PROFILE_ID, 
SummaryProfile.PROFILE_COMPLETION_STATE));
+
+        Set<String> checkedProfiles = new HashSet<>();
+        for (List<String> row : rows) {
+            if (row.get(0).equals("running")) {
+                
Assertions.assertEquals(SummaryProfile.PROFILE_COMPLETION_STATE_RUNNING, 
row.get(1));
+                checkedProfiles.add(row.get(0));
+            } else if (row.get(0).equals("collecting")) {
+                
Assertions.assertEquals(SummaryProfile.PROFILE_COMPLETION_STATE_COLLECTING, 
row.get(1));
+                checkedProfiles.add(row.get(0));
+            } else if (row.get(0).equals("complete")) {
+                
Assertions.assertEquals(SummaryProfile.PROFILE_COMPLETION_STATE_COMPLETE, 
row.get(1));
+                checkedProfiles.add(row.get(0));
+            }
+        }
+        Set<String> expectedProfiles = new HashSet<>();
+        expectedProfiles.add("running");
+        expectedProfiles.add("collecting");
+        expectedProfiles.add("complete");
+        Assertions.assertEquals(expectedProfiles, checkedProfiles);
+    }
+
+    @Test
+    void profileCompletionStateWaitsForDistinctBackendReports() {
+        Profile profile = constructProfile("multi-backend");
+        profile.markQueryFinished();
+        UUID taskId = UUID.randomUUID();
+        TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), 
taskId.getLeastSignificantBits());
+        ExecutionProfile executionProfile = new ExecutionProfile(queryId, 
Lists.newArrayList(0));
+        executionProfile.addFragmentBackend(new PlanFragmentId(0), 1L);
+        executionProfile.addFragmentBackend(new PlanFragmentId(0), 2L);
+        profile.addExecutionProfile(executionProfile);
+
+        executionProfile.updateProfile(createQueryProfile(queryId, 0, "be1"),
+                new TNetworkAddress("127.0.0.1", 9060), true);
+        
Assertions.assertEquals(SummaryProfile.PROFILE_COMPLETION_STATE_COLLECTING,
+                profile.getProfileCompletionState());
+
+        executionProfile.updateProfile(createQueryProfile(queryId, 0, "be2"),
+                new TNetworkAddress("127.0.0.2", 9060), true);
+        
Assertions.assertEquals(SummaryProfile.PROFILE_COMPLETION_STATE_COMPLETE,
+                profile.getProfileCompletionState());
+    }
+
+    @Test
+    void completedBackendProfileRejectsLaterRealtimeProfile() {
+        Profile profile = constructProfile("final-profile");
+        profile.markQueryFinished();
+        UUID taskId = UUID.randomUUID();
+        TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), 
taskId.getLeastSignificantBits());
+        ExecutionProfile executionProfile = new ExecutionProfile(queryId, 
Lists.newArrayList(0));
+        executionProfile.addFragmentBackend(new PlanFragmentId(0), 1L);
+        profile.addExecutionProfile(executionProfile);
+
+        TNetworkAddress backendAddress = new TNetworkAddress("127.0.0.1", 
9060);
+        executionProfile.updateProfile(createQueryProfile(queryId, 0, "final", 
7), backendAddress, true);
+        
Assertions.assertEquals(SummaryProfile.PROFILE_COMPLETION_STATE_COMPLETE,
+                profile.getProfileCompletionState());
+
+        executionProfile.updateProfile(createQueryProfile(queryId, 0, "stale", 
1), backendAddress, false);
+        
Assertions.assertEquals(SummaryProfile.PROFILE_COMPLETION_STATE_COMPLETE,
+                profile.getProfileCompletionState());
+
+        String profileText = profile.toString();
+        Assertions.assertTrue(profileText.contains("- " + PROFILE_TEST_COUNTER 
+ ": 7"), profileText);
+        Assertions.assertFalse(profileText.contains("- " + 
PROFILE_TEST_COUNTER + ": 1"), profileText);
+    }
+
     @Test
     void getProfileByOrder() {
         final int normalProfiles = 100;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilderTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilderTest.java
new file mode 100644
index 00000000000..119bd5e2a7b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/PipelineExecutionTaskBuilderTest.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.runtime;
+
+import org.apache.doris.common.profile.ExecutionProfile;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TDetailedReportParams;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineFragmentParamsList;
+import org.apache.doris.thrift.TPipelineInstanceParams;
+import org.apache.doris.thrift.TQueryProfile;
+import org.apache.doris.thrift.TRuntimeProfileNode;
+import org.apache.doris.thrift.TRuntimeProfileTree;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+class PipelineExecutionTaskBuilderTest {
+    @Test
+    void buildSingleFragmentPipelineTaskRegistersBackendWithExecutionProfile() 
{
+        TUniqueId queryId = new TUniqueId(1, 2);
+        ExecutionProfile executionProfile = new ExecutionProfile(queryId, 
Lists.newArrayList(0));
+        Backend backend = new Backend(1L, "127.0.0.1", 9050);
+
+        Map<Integer, SingleFragmentPipelineTask> tasks =
+                PipelineExecutionTaskBuilder.buildSingleFragmentPipelineTask(
+                        executionProfile, backend, 
createFragmentParamsList(0));
+
+        Assertions.assertEquals(1, tasks.size());
+        Assertions.assertTrue(tasks.containsKey(0));
+
+        executionProfile.updateProfile(createQueryProfile(queryId, 0), 
backend.getHeartbeatAddress(), true);
+        Assertions.assertTrue(executionProfile.isCompleted());
+    }
+
+    private static TPipelineFragmentParamsList createFragmentParamsList(int 
fragmentId) {
+        TPipelineInstanceParams instanceParams = new TPipelineInstanceParams();
+        instanceParams.setFragmentInstanceId(new TUniqueId(3, 4));
+
+        TPipelineFragmentParams fragmentParams = new TPipelineFragmentParams();
+        fragmentParams.setFragmentId(fragmentId);
+        fragmentParams.setLocalParams(Lists.newArrayList(instanceParams));
+
+        TPipelineFragmentParamsList paramsList = new 
TPipelineFragmentParamsList();
+        paramsList.setParamsList(Lists.newArrayList(fragmentParams));
+        return paramsList;
+    }
+
+    private static TQueryProfile createQueryProfile(TUniqueId queryId, int 
fragmentId) {
+        TQueryProfile queryProfile = new TQueryProfile();
+        queryProfile.setQueryId(queryId);
+        queryProfile.putToFragmentIdToProfile(fragmentId, 
Lists.newArrayList(createReportParam()));
+        return queryProfile;
+    }
+
+    private static TDetailedReportParams createReportParam() {
+        TRuntimeProfileNode node = new TRuntimeProfileNode();
+        node.setName("PipelineProfile");
+        node.setNumChildren(0);
+        node.setCounters(Lists.newArrayList());
+        node.setMetadata(0);
+        node.setIndent(false);
+        node.setInfoStrings(Maps.newHashMap());
+        node.setInfoStringsDisplayOrder(Lists.newArrayList());
+        node.setChildCountersMap(Maps.newHashMap());
+        node.setTimestamp(0);
+
+        TRuntimeProfileTree tree = new TRuntimeProfileTree();
+        tree.setNodes(Lists.newArrayList(node));
+
+        TDetailedReportParams params = new TDetailedReportParams();
+        params.setProfile(tree);
+        params.setIsFragmentLevel(false);
+        return params;
+    }
+}
diff --git 
a/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy 
b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy
index cee2ce96820..37453ecfbd7 100644
--- a/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy
+++ b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning.groovy
@@ -33,18 +33,28 @@ suite("rf_partition_pruning", "nonConcurrent") {
 
     // ---- Profile utilities ----
     def profileAction = new ProfileAction(context)
+    def profileCompletionStateName = "Profile Completion State"
+    def profileCompletionStateComplete = "COMPLETE"
 
     def getProfileByToken = { String token, List requiredCounters = [] ->
         String profileContent = ""
+        String profileState = ""
         for (int attempt = 0; attempt < 60; attempt++) {
             List profileData = profileAction.getProfileList()
             for (final def profileItem in profileData) {
                 if (profileItem["Sql Statement"].toString().contains(token)) {
-                    profileContent = 
profileAction.getProfile(profileItem["Profile ID"].toString())
+                    profileState = 
profileItem[profileCompletionStateName]?.toString()
+                    def currentProfileContent = 
profileAction.getProfile(profileItem["Profile ID"].toString())
+                    if (currentProfileContent != "") {
+                        profileContent = currentProfileContent
+                    }
                     break
                 }
             }
-            if (profileContent != "" && requiredCounters.every { 
profileContent.contains(it) }) break
+            if (profileState == profileCompletionStateComplete && 
profileContent != ""
+                    && requiredCounters.every { profileContent.contains(it) }) 
{
+                break
+            }
             Thread.sleep(500)
         }
         return profileContent
diff --git 
a/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning_type_matrix.groovy
 
b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning_type_matrix.groovy
index f59dbb0282a..29e44cec8f9 100644
--- 
a/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning_type_matrix.groovy
+++ 
b/regression-test/suites/query_p0/runtime_filter/rf_partition_pruning_type_matrix.groovy
@@ -30,18 +30,26 @@ suite("rf_partition_pruning_type_matrix", "nonConcurrent") {
 
     def profileAction = new ProfileAction(context)
     def rfPruningCounterNames = ["TotalPartitionsForRFPruning", 
"PartitionsPrunedByRuntimeFilter"]
+    def profileCompletionStateName = "Profile Completion State"
+    def profileCompletionStateComplete = "COMPLETE"
 
     def getProfileByToken = { String token ->
         String profileContent = ""
+        String profileState = ""
         for (int attempt = 0; attempt < 60; attempt++) {
             List profileData = profileAction.getProfileList()
             for (final def profileItem in profileData) {
                 if (profileItem["Sql Statement"].toString().contains(token)) {
-                    profileContent = 
profileAction.getProfile(profileItem["Profile ID"].toString())
+                    profileState = 
profileItem[profileCompletionStateName]?.toString()
+                    def currentProfileContent = 
profileAction.getProfile(profileItem["Profile ID"].toString())
+                    if (currentProfileContent != "") {
+                        profileContent = currentProfileContent
+                    }
                     break
                 }
             }
-            if (profileContent != "" && rfPruningCounterNames.every { 
profileContent.contains(it) }) {
+            if (profileState == profileCompletionStateComplete && 
profileContent != ""
+                    && rfPruningCounterNames.every { 
profileContent.contains(it) }) {
                 break
             }
             Thread.sleep(500)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to