This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e05ceb52f36 branch-4.0: [fix](nereids)set RuntimeFilterInfo only on BE 
which is merge node #57108 (#57199)
e05ceb52f36 is described below

commit e05ceb52f3649768d060695b754671b0d55c105f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 23 13:08:20 2025 +0800

    branch-4.0: [fix](nereids)set RuntimeFilterInfo only on BE which is merge 
node #57108 (#57199)
    
    Cherry-picked from #57108
    
    Co-authored-by: minghong <[email protected]>
---
 .../trees/plans/distribute/worker/BackendWorker.java      |  2 +-
 .../doris/qe/runtime/RuntimeFiltersThriftBuilder.java     | 15 ++++++---------
 .../org/apache/doris/qe/runtime/ThriftPlansBuilder.java   | 11 ++++++++---
 gensrc/thrift/PaloInternalService.thrift                  |  1 +
 4 files changed, 16 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
index 63c73b50edc..e76934cf847 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java
@@ -45,7 +45,7 @@ public class BackendWorker implements DistributedPlanWorker {
 
     @Override
     public String brpcAddress() {
-        return backend.getHost() + brpcPort();
+        return backend.getHost() + ":" + brpcPort();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
index f9ab8e83f07..fca5461fe1c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
@@ -65,7 +65,7 @@ public class RuntimeFiltersThriftBuilder {
         return mergeInstance == instance;
     }
 
-    public void setRuntimeFilterThriftParams(TRuntimeFilterParams 
runtimeFilterParams) {
+    public void populateRuntimeFilterParams(TRuntimeFilterParams 
runtimeFilterParams) {
         for (RuntimeFilter rf : runtimeFilters) {
             List<RuntimeFilterTarget> targets = 
ridToTargets.get(rf.getFilterId());
             if (targets == null) {
@@ -89,8 +89,7 @@ public class RuntimeFiltersThriftBuilder {
                 }
 
                 runtimeFilterParams.putToRidToTargetParamv2(
-                        rf.getFilterId().asInt(), new 
ArrayList<>(targetToParams.values())
-                );
+                        rf.getFilterId().asInt(), new 
ArrayList<>(targetToParams.values()));
             }
         }
         for (Map.Entry<RuntimeFilterId, Integer> entry : 
ridToBuilderNum.entrySet()) {
@@ -122,15 +121,14 @@ public class RuntimeFiltersThriftBuilder {
             PlanFragment fragment = plan.getFragmentJob().getFragment();
             // Transform <fragment, runtimeFilterId> to <runtimeFilterId, 
fragment>
             for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) {
-                List<RuntimeFilterTarget> targetFragments =
-                        ridToTargetParam.computeIfAbsent(rid, k -> new 
ArrayList<>());
+                List<RuntimeFilterTarget> targetFragments = 
ridToTargetParam.computeIfAbsent(rid,
+                        k -> new ArrayList<>());
                 for (AssignedJob instanceJob : plan.getInstanceJobs()) {
                     BackendWorker backendWorker = (BackendWorker) 
instanceJob.getAssignedWorker();
                     Backend backend = backendWorker.getBackend();
                     targetFragments.add(new RuntimeFilterTarget(
                             fragment.getFragmentId().asInt(),
-                            new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort())
-                    ));
+                            new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort())));
                 }
             }
 
@@ -146,8 +144,7 @@ public class RuntimeFiltersThriftBuilder {
         }
         return new RuntimeFiltersThriftBuilder(
                 mergeAddress, runtimeFilters, mergeInstance,
-                broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum
-        );
+                broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum);
     }
 
     public static class RuntimeFilterTarget {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 5fcd14fcb79..394e0cd5b1c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -125,7 +125,7 @@ public class ThriftPlansBuilder {
             // so we can merge and send multiple fragment to a backend use one 
rpc
             for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv : 
workerToCurrentFragment.entrySet()) {
                 TPipelineFragmentParamsList fragments = 
fragmentsGroupByWorker.computeIfAbsent(
-                        kv.getKey(), w -> 
beToThrift(runtimeFiltersThriftBuilder,
+                        kv.getKey(), w -> beToThrift(kv.getKey(), 
runtimeFiltersThriftBuilder,
                                 topNFilterThriftSupplier));
                 fragments.addToParamsList(kv.getValue());
             }
@@ -298,18 +298,23 @@ public class ThriftPlansBuilder {
     }
 
     private static TPipelineFragmentParamsList beToThrift(
+            DistributedPlanWorker worker,
             RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
             Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
         TPipelineFragmentParamsList beParam = new 
TPipelineFragmentParamsList();
         TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
         runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());
 
-        // set for runtime filter
         TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
         
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
+        if 
(worker.host().equals(runtimeFiltersThriftBuilder.mergeAddress.getHostname())
+                && worker.brpcPort() == 
runtimeFiltersThriftBuilder.mergeAddress.getPort()) {
+            // only set following information for merge BE node
+            
runtimeFiltersThriftBuilder.populateRuntimeFilterParams(runtimeFilterParams);
+        }
         runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
-        
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
         beParam.setRuntimeFilterInfo(runtimeFilterInfo);
+
         return beParam;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index f9490fce799..a5a1de2a300 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -441,6 +441,7 @@ struct TRuntimeFilterParams {
   // Runtime filter merge instance address. Used if this filter has a remote 
target
   1: optional Types.TNetworkAddress runtime_filter_merge_addr
 
+  // keep 2/3/4/5 unset if BE is not used for merge 
   // deprecated
   2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to