This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4f915129a90920c5936b1b874c7e6710e57f2970 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Sat Jan 27 10:24:36 2024 +0800 [pipelineX](localexchange) Add local exchange before TabletFunction (#30446) * [pipelineX](localexchange) Add local exchange before TabletFunction * update --- be/src/pipeline/exec/table_function_operator.h | 4 ++++ .../src/main/java/org/apache/doris/qe/Coordinator.java | 12 ++++++++---- .../main/java/org/apache/doris/qe/SessionVariable.java | 16 ++++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index a6c90e3dec4..c8f64a447a5 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -96,6 +96,10 @@ public: local_state._child_source_state != SourceState::FINISHED; } + DataDistribution required_data_distribution() const override { + return {ExchangeType::PASSTHROUGH}; + } + Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state) const override { auto& local_state = get_local_state(state); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 099aedd7e17..cca9014a715 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2042,10 +2042,12 @@ public class Coordinator implements CoordInterface { boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean ignoreStorageDataDistribution = scanNodes.stream() + boolean forceToLocalShuffle = context != null + && context.getSessionVariable().isForceToLocalShuffle(); + boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId()) - .scanRangeAssignment.size())) && useNereids; + .scanRangeAssignment.size())) && useNereids); if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2917,13 +2919,15 @@ public class Coordinator implements CoordInterface { * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. * 2. Use Nereids planner. */ - boolean ignoreStorageDataDistribution = scanNodes.stream() + boolean forceToLocalShuffle = context != null + && context.getSessionVariable().isForceToLocalShuffle(); + boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, fragmentExecParamsMap.get(node.getFragment().getFragmentId()) .scanRangeAssignment.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; - }) && useNereids; + }) && useNereids); FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 84809f31b85..cef6babc805 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -235,6 +235,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; + public static final String FORCE_TO_LOCAL_SHUFFLE = "force_to_local_shuffle"; + public static final String ENABLE_AGG_STATE = "enable_agg_state"; public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline"; @@ -845,6 +847,12 @@ public class SessionVariable implements Serializable, Writable { "Whether to enable local shuffle on pipelineX engine."}) private boolean enableLocalShuffle = true; + @VariableMgr.VarAttr( + name = FORCE_TO_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + description = {"是否在pipelineX引擎上强制开启local shuffle优化", + "Whether to force to local shuffle on pipelineX engine."}) + private boolean forceToLocalShuffle = false; + @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) public boolean enableAggState = false; @@ -3303,4 +3311,12 @@ public class SessionVariable implements Serializable, Writable { public void setForceJniScanner(boolean force) { forceJniScanner = force; } + + public boolean isForceToLocalShuffle() { + return getEnablePipelineXEngine() && enableLocalShuffle && enableNereidsPlanner && forceToLocalShuffle; + } + + public void setForceToLocalShuffle(boolean forceToLocalShuffle) { + this.forceToLocalShuffle = forceToLocalShuffle; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org