This is an automated email from the ASF dual-hosted git repository. englefly pushed a commit to branch rf-thrift-poc in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/rf-thrift-poc by this push: new dcc240db083 move rf info from instance level to be level (#49474) dcc240db083 is described below commit dcc240db083b246ae541afeeb9fd71b33c72b1d2 Author: minghong <zhoumingh...@selectdb.com> AuthorDate: Tue Mar 25 18:25:55 2025 +0800 move rf info from instance level to be level (#49474) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- .../rules/analysis/EliminateLogicalSelectHint.java | 3 +- .../nereids/trees/plans/algebra/SetOperation.java | 4 +++ .../doris/qe/runtime/ThriftPlansBuilder.java | 39 ++++++++++++---------- gensrc/thrift/PaloInternalService.thrift | 13 ++++++-- 4 files changed, 38 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java index f7327100006..43ef5755559 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java @@ -66,7 +66,8 @@ public class EliminateLogicalSelectHint extends OneRewriteRuleFactory { } else if (hintName.equalsIgnoreCase("LEADING")) { extractLeading((SelectHintLeading) hint, ctx.cascadesContext, ctx.statementContext, selectHintPlan); - } else if (hintName.equalsIgnoreCase("USE_CBO_RULE")) { + } else if (hintName.equalsIgnoreCase("USE_CBO_RULE") + || hintName.equalsIgnoreCase("NO_USE_CBO_RULE")) { extractRule((SelectHintUseCboRule) hint, ctx.statementContext); } else if (hintName.equalsIgnoreCase("USE_MV")) { extractMv((SelectHintUseMv) hint, ConnectContext.get().getStatementContext()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java index dedd877eead..0407348721f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java @@ -43,4 +43,8 @@ public interface SetOperation { List<List<SlotReference>> getRegularChildrenOutputs(); int getArity(); + + default boolean isDistinct() { + return getQualifier() == Qualifier.DISTINCT; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 9b114d85ea4..475891ec0f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -52,6 +52,7 @@ import org.apache.doris.thrift.TPipelineInstanceParams; import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TPlanFragmentDestination; import org.apache.doris.thrift.TQueryOptions; +import org.apache.doris.thrift.TRuntimeFilterInfo; import org.apache.doris.thrift.TRuntimeFilterParams; import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TTopnFilterDesc; @@ -112,9 +113,7 @@ public class ThriftPlansBuilder { workerProcessInstanceNum, coordinatorContext); TPipelineInstanceParams instanceParam = instanceToThrift( - currentFragmentParam, instanceJob, runtimeFiltersThriftBuilder, - topNFilterThriftSupplier, currentInstanceIndex++ - ); + currentFragmentParam, instanceJob, currentInstanceIndex++); currentFragmentParam.getLocalParams().add(instanceParam); } @@ -122,7 +121,8 @@ public class ThriftPlansBuilder { // so we can merge and send multiple fragment to a backend use one rpc for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv : workerToCurrentFragment.entrySet()) { TPipelineFragmentParamsList fragments = fragmentsGroupByWorker.computeIfAbsent( - kv.getKey(), w -> new TPipelineFragmentParamsList()); + kv.getKey(), w -> beToThrift(runtimeFiltersThriftBuilder, + topNFilterThriftSupplier)); fragments.addToParamsList(kv.getValue()); } } @@ -293,6 +293,22 @@ public class ThriftPlansBuilder { return destination; } + private static TPipelineFragmentParamsList beToThrift( + RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder, + Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) { + TPipelineFragmentParamsList beParam = new TPipelineFragmentParamsList(); + TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo(); + runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get()); + + // set for runtime filter + TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams(); + runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress); + runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams); + runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams); + beParam.setRuntimeFilterInfo(runtimeFilterInfo); + return beParam; + } + private static TPipelineFragmentParams fragmentToThriftIfAbsent( PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob, Map<DistributedPlanWorker, TPipelineFragmentParams> workerToFragmentParams, @@ -401,26 +417,13 @@ public class ThriftPlansBuilder { } private static TPipelineInstanceParams instanceToThrift( - TPipelineFragmentParams currentFragmentParam, AssignedJob instance, - RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder, - Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier, int currentInstanceNum) { + TPipelineFragmentParams currentFragmentParam, AssignedJob instance, int currentInstanceNum) { TPipelineInstanceParams instanceParam = new TPipelineInstanceParams(); instanceParam.setFragmentInstanceId(instance.instanceId()); setScanSourceParam(currentFragmentParam, instance, instanceParam); instanceParam.setSenderId(instance.indexInUnassignedJob()); instanceParam.setBackendNum(currentInstanceNum); - instanceParam.setRuntimeFilterParams(new TRuntimeFilterParams()); - - instanceParam.setTopnFilterDescs(topNFilterThriftSupplier.get()); - - // set for runtime filter - TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams(); - runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress); - instanceParam.setRuntimeFilterParams(runtimeFilterParams); - if (runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) { - runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams); - } boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob; if (isLocalShuffle) { // a fragment in a backend only enable local shuffle once for the first local shuffle instance, diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index d2805ba83cb..17b531d1248 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -470,9 +470,9 @@ struct TPlanFragmentExecParams { 10: optional i32 num_senders 11: optional bool send_query_statistics_with_every_batch // Used to merge and send runtime filter - 12: optional TRuntimeFilterParams runtime_filter_params + 12: optional TRuntimeFilterParams runtime_filter_params //deprecated 13: optional bool group_commit // deprecated - 14: optional list<i32> topn_filter_source_node_ids + 14: optional list<i32> topn_filter_source_node_ids //deprecated } // Global query parameters assigned by the coordinator. @@ -803,6 +803,14 @@ struct TPipelineFragmentParams { 1000: optional bool is_mow_table; } +// pull up runtime filter info from instance level to BE level +struct TRuntimeFilterInfo { + // for join runtime filter and setop runtime filter + 1: optional TRuntimeFilterParams runtime_filter_params + // for topn runtime filter + 2: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs +} + struct TPipelineFragmentParamsList { 1: optional list<TPipelineFragmentParams> params_list; 2: optional Descriptors.TDescriptorTable desc_tbl; @@ -819,4 +827,5 @@ struct TPipelineFragmentParamsList { 11: optional Types.TUniqueId query_id 12: optional list<i32> topn_filter_source_node_ids 13: optional Types.TNetworkAddress runtime_filter_merge_addr + 14: optional TRuntimeFilterInfo runtime_filter_info } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org