This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch rf-thrift-poc
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/rf-thrift-poc by this push:
     new dcc240db083 move rf info from instance level to be level (#49474)
dcc240db083 is described below

commit dcc240db083b246ae541afeeb9fd71b33c72b1d2
Author: minghong <zhoumingh...@selectdb.com>
AuthorDate: Tue Mar 25 18:25:55 2025 +0800

    move rf info from instance level to be level (#49474)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../rules/analysis/EliminateLogicalSelectHint.java |  3 +-
 .../nereids/trees/plans/algebra/SetOperation.java  |  4 +++
 .../doris/qe/runtime/ThriftPlansBuilder.java       | 39 ++++++++++++----------
 gensrc/thrift/PaloInternalService.thrift           | 13 ++++++--
 4 files changed, 38 insertions(+), 21 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
index f7327100006..43ef5755559 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
@@ -66,7 +66,8 @@ public class EliminateLogicalSelectHint extends 
OneRewriteRuleFactory {
                 } else if (hintName.equalsIgnoreCase("LEADING")) {
                     extractLeading((SelectHintLeading) hint, 
ctx.cascadesContext,
                             ctx.statementContext, selectHintPlan);
-                } else if (hintName.equalsIgnoreCase("USE_CBO_RULE")) {
+                } else if (hintName.equalsIgnoreCase("USE_CBO_RULE")
+                        || hintName.equalsIgnoreCase("NO_USE_CBO_RULE")) {
                     extractRule((SelectHintUseCboRule) hint, 
ctx.statementContext);
                 } else if (hintName.equalsIgnoreCase("USE_MV")) {
                     extractMv((SelectHintUseMv) hint, 
ConnectContext.get().getStatementContext());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
index dedd877eead..0407348721f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
@@ -43,4 +43,8 @@ public interface SetOperation {
     List<List<SlotReference>> getRegularChildrenOutputs();
 
     int getArity();
+
+    default boolean isDistinct() {
+        return getQualifier() == Qualifier.DISTINCT;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 9b114d85ea4..475891ec0f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -52,6 +52,7 @@ import org.apache.doris.thrift.TPipelineInstanceParams;
 import org.apache.doris.thrift.TPlanFragment;
 import org.apache.doris.thrift.TPlanFragmentDestination;
 import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TRuntimeFilterInfo;
 import org.apache.doris.thrift.TRuntimeFilterParams;
 import org.apache.doris.thrift.TScanRangeParams;
 import org.apache.doris.thrift.TTopnFilterDesc;
@@ -112,9 +113,7 @@ public class ThriftPlansBuilder {
                         workerProcessInstanceNum, coordinatorContext);
 
                 TPipelineInstanceParams instanceParam = instanceToThrift(
-                        currentFragmentParam, instanceJob, 
runtimeFiltersThriftBuilder,
-                        topNFilterThriftSupplier, currentInstanceIndex++
-                );
+                        currentFragmentParam, instanceJob, 
currentInstanceIndex++);
                 currentFragmentParam.getLocalParams().add(instanceParam);
             }
 
@@ -122,7 +121,8 @@ public class ThriftPlansBuilder {
             // so we can merge and send multiple fragment to a backend use one 
rpc
             for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv : 
workerToCurrentFragment.entrySet()) {
                 TPipelineFragmentParamsList fragments = 
fragmentsGroupByWorker.computeIfAbsent(
-                        kv.getKey(), w -> new TPipelineFragmentParamsList());
+                        kv.getKey(), w -> 
beToThrift(runtimeFiltersThriftBuilder,
+                                topNFilterThriftSupplier));
                 fragments.addToParamsList(kv.getValue());
             }
         }
@@ -293,6 +293,22 @@ public class ThriftPlansBuilder {
         return destination;
     }
 
+    private static TPipelineFragmentParamsList beToThrift(
+            RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
+            Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
+        TPipelineFragmentParamsList beParam = new 
TPipelineFragmentParamsList();
+        TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
+        runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());
+
+        // set for runtime filter
+        TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
+        
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
+        runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
+        
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
+        beParam.setRuntimeFilterInfo(runtimeFilterInfo);
+        return beParam;
+    }
+
     private static TPipelineFragmentParams fragmentToThriftIfAbsent(
             PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob,
             Map<DistributedPlanWorker, TPipelineFragmentParams> 
workerToFragmentParams,
@@ -401,26 +417,13 @@ public class ThriftPlansBuilder {
     }
 
     private static TPipelineInstanceParams instanceToThrift(
-            TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
-            RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
-            Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier, int 
currentInstanceNum) {
+            TPipelineFragmentParams currentFragmentParam, AssignedJob 
instance, int currentInstanceNum) {
         TPipelineInstanceParams instanceParam = new TPipelineInstanceParams();
         instanceParam.setFragmentInstanceId(instance.instanceId());
         setScanSourceParam(currentFragmentParam, instance, instanceParam);
 
         instanceParam.setSenderId(instance.indexInUnassignedJob());
         instanceParam.setBackendNum(currentInstanceNum);
-        instanceParam.setRuntimeFilterParams(new TRuntimeFilterParams());
-
-        instanceParam.setTopnFilterDescs(topNFilterThriftSupplier.get());
-
-        // set for runtime filter
-        TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
-        
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
-        instanceParam.setRuntimeFilterParams(runtimeFilterParams);
-        if 
(runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
-            
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
-        }
         boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
         if (isLocalShuffle) {
             // a fragment in a backend only enable local shuffle once for the 
first local shuffle instance,
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index d2805ba83cb..17b531d1248 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -470,9 +470,9 @@ struct TPlanFragmentExecParams {
   10: optional i32 num_senders
   11: optional bool send_query_statistics_with_every_batch
   // Used to merge and send runtime filter
-  12: optional TRuntimeFilterParams runtime_filter_params
+  12: optional TRuntimeFilterParams runtime_filter_params //deprecated
   13: optional bool group_commit // deprecated
-  14: optional list<i32> topn_filter_source_node_ids
+  14: optional list<i32> topn_filter_source_node_ids //deprecated
 }
 
 // Global query parameters assigned by the coordinator.
@@ -803,6 +803,14 @@ struct TPipelineFragmentParams {
   1000: optional bool is_mow_table;
 }
 
+// pull up runtime filter info from instance level to BE level
+struct TRuntimeFilterInfo {
+  // for join runtime filter and setop runtime filter
+  1: optional TRuntimeFilterParams runtime_filter_params
+  // for topn runtime filter
+  2: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs
+}
+
 struct TPipelineFragmentParamsList {
   1: optional list<TPipelineFragmentParams> params_list;
   2: optional Descriptors.TDescriptorTable desc_tbl;
@@ -819,4 +827,5 @@ struct TPipelineFragmentParamsList {
   11: optional Types.TUniqueId query_id
   12: optional list<i32> topn_filter_source_node_ids
   13: optional Types.TNetworkAddress runtime_filter_merge_addr
+  14: optional TRuntimeFilterInfo runtime_filter_info
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to