This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 65cb91e60e [Chore](agg-state) add sessionvariable enable_agg_state (#21373) 65cb91e60e is described below commit 65cb91e60e0496454649f250bac2dcfc8a301af1 Author: Pxl <pxl...@qq.com> AuthorDate: Tue Jul 4 14:25:21 2023 +0800 [Chore](agg-state) add sessionvariable enable_agg_state (#21373) add sessionvariable enable_agg_state --- .../java/org/apache/doris/analysis/Analyzer.java | 14 ---------- .../java/org/apache/doris/analysis/ColumnDef.java | 6 ++++ .../apache/doris/common/util/VectorizedUtil.java | 30 -------------------- .../glue/translator/RuntimeFilterTranslator.java | 5 ++-- .../processor/post/RuntimeFilterGenerator.java | 6 ++-- .../rewrite/BuildCTEAnchorAndCTEProducer.java | 2 +- .../doris/nereids/rules/rewrite/InlineCTE.java | 2 +- .../apache/doris/planner/DistributedPlanner.java | 3 +- .../org/apache/doris/planner/OlapScanNode.java | 4 +-- .../main/java/org/apache/doris/qe/Coordinator.java | 5 ++-- .../java/org/apache/doris/qe/SessionVariable.java | 32 ++++++++++++++++++---- .../java/org/apache/doris/qe/StmtExecutor.java | 6 ++-- .../test_agg_state_group_concat.groovy | 1 + .../agg_state/max/test_agg_state_max.groovy | 1 + .../nereids/test_agg_state_nereids.groovy | 1 + .../datatype_p0/agg_state/test_agg_state.groovy | 1 + 16 files changed, 53 insertions(+), 66 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index d229d4b246..0faa84d2e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -2294,20 +2294,6 @@ public class Analyzer { return globalState.context.getSessionVariable().isEnableFoldConstantByBe(); } - /** - * Returns true if predicate 'e' can be correctly evaluated by a tree materializing - * 'tupleIds', otherwise false: - * - the predicate needs to be bound by tupleIds - * - a Where clause predicate can only be correctly evaluated if for all outer-joined - * referenced tids the last join to outer-join this tid has been materialized - * - an On clause predicate against the non-nullable side of an Outer Join clause - * can only be correctly evaluated by the join node that materializes the - * Outer Join clause - */ - private boolean canEvalPredicate(PlanNode node, Expr e) { - return canEvalPredicate(node.getTblRefIds(), e); - } - /** * Returns true if predicate 'e' can be correctly evaluated by a tree materializing * 'tupleIds', otherwise false: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java index 9833236a89..ab1a1bed6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java @@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.qe.SessionVariable; import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; @@ -303,6 +304,11 @@ public class ColumnDef { throw new AnalysisException(String.format("Aggregate type %s is not compatible with primitive type %s", toString(), type.toSql())); } + if (aggregateType == AggregateType.GENERIC_AGGREGATION) { + if (!SessionVariable.enableAggState()) { + throw new AnalysisException("agg state not enable, need set enable_agg_state=true"); + } + } } if (type.getPrimitiveType() == PrimitiveType.FLOAT || type.getPrimitiveType() == PrimitiveType.DOUBLE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java deleted file mode 100644 index f45bffcfa5..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import org.apache.doris.qe.ConnectContext; - -public class VectorizedUtil { - public static boolean isPipeline() { - ConnectContext connectContext = ConnectContext.get(); - if (connectContext == null) { - return false; - } - return connectContext.getSessionVariable().enablePipelineEngine(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 3308ed3580..bbb9997f2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -36,7 +36,7 @@ import org.apache.doris.planner.HashJoinNode.DistributionMode; import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget; import org.apache.doris.planner.ScanNode; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.collect.ImmutableList; @@ -175,8 +175,7 @@ public class RuntimeFilterTranslator { origFilter.extractTargetsPosition(); // Number of parallel instances are large for pipeline engine, so we prefer bloom filter. if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM - && ConnectContext.get() != null - && ConnectContext.get().getSessionVariable().enablePipelineEngine()) { + && SessionVariable.enablePipelineEngine()) { origFilter.setType(TRuntimeFilterType.BLOOM); } return origFilter; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 524ba17ba0..5b4a1b6ade 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -313,7 +313,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } else { // in-filter is not friendly to pipeline if (type == TRuntimeFilterType.IN_OR_BLOOM - && ctx.getSessionVariable().enablePipelineEngine() + && ctx.getSessionVariable().getEnablePipelineEngine() && hasRemoteTarget(join, scan)) { type = TRuntimeFilterType.BLOOM; } @@ -363,7 +363,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { } PhysicalRelation scan = aliasTransferMap.get(origSlot).first; if (type == TRuntimeFilterType.IN_OR_BLOOM - && ctx.getSessionVariable().enablePipelineEngine() + && ctx.getSessionVariable().getEnablePipelineEngine() && hasRemoteTarget(join, scan)) { type = TRuntimeFilterType.BLOOM; } @@ -563,7 +563,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { PhysicalHashJoin join = innerEntry.getValue(); Preconditions.checkState(join != null); TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM; - if (ctx.getSessionVariable().enablePipelineEngine()) { + if (ctx.getSessionVariable().getEnablePipelineEngine()) { type = TRuntimeFilterType.BLOOM; } EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java index b171d44931..dab88e686e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/BuildCTEAnchorAndCTEProducer.java @@ -52,7 +52,7 @@ public class BuildCTEAnchorAndCTEProducer extends OneRewriteRuleFactory { CTEId id = logicalCTE.findCTEId(s.getAlias()); if (cascadesContext.cteReferencedCount(id) <= ConnectContext.get().getSessionVariable().inlineCTEReferencedThreshold - || !ConnectContext.get().getSessionVariable().enablePipelineEngine) { + || !ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) { continue; } LogicalCTEProducer logicalCTEProducer = new LogicalCTEProducer( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java index 4c9e8d904d..a50f42a2c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/InlineCTE.java @@ -47,7 +47,7 @@ public class InlineCTE extends OneRewriteRuleFactory { * Current we only implement CTE Materialize on pipeline engine and only materialize those CTE whose * refCount > NereidsRewriter.INLINE_CTE_REFERENCED_THRESHOLD. */ - if (ConnectContext.get().getSessionVariable().enablePipelineEngine + if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine() && ConnectContext.get().getSessionVariable().enableCTEMaterialize && refCount > INLINE_CTE_REFERENCED_THRESHOLD) { return cteConsumer; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 561a771e3c..6a73952e20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -39,6 +39,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Preconditions; @@ -935,7 +936,7 @@ public class DistributedPlanner { childFragment.addPlanRoot(node); childFragment.setHasColocatePlanNode(true); return childFragment; - } else if (ConnectContext.get().getSessionVariable().enablePipelineEngine() + } else if (SessionVariable.enablePipelineEngine() && childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo()) && childFragment.getPlanRoot() instanceof OlapScanNode) { childFragment.getPlanRoot().setShouldColoScan(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6fa4955a31..b520e058e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1229,7 +1229,7 @@ public class OlapScanNode extends ScanNode { public int getNumInstances() { // In pipeline exec engine, the instance num equals be_num * parallel instance. // so here we need count distinct be_num to do the work. make sure get right instance - if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) { + if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) { int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); long numBackend = scanRangeLocations.stream().flatMap(rangeLoc -> rangeLoc.getLocations().stream()) .map(loc -> loc.backend_id).distinct().count(); @@ -1241,7 +1241,7 @@ public class OlapScanNode extends ScanNode { @Override public boolean shouldColoAgg(AggregateInfo aggregateInfo) { distributionColumnIds.clear(); - if (ConnectContext.get().getSessionVariable().enablePipelineEngine() + if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine() && ConnectContext.get().getSessionVariable().enableColocateScan()) { List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs(); List<SlotDescriptor> slots = desc.getSlots(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index d4f78b0d91..a18248a017 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -35,7 +35,6 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListUtil; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; import org.apache.doris.nereids.stats.StatsErrorEstimator; @@ -311,7 +310,7 @@ public class Coordinator { this.returnedAllResults = false; this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin; - this.enablePipelineEngine = context.getSessionVariable().enablePipelineEngine; + this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine(); initQueryOptions(context); setFromUserProperty(context); @@ -379,7 +378,7 @@ public class Coordinator { private void initQueryOptions(ConnectContext context) { this.queryOptions = context.getSessionVariable().toThrift(); - this.queryOptions.setEnablePipelineEngine(VectorizedUtil.isPipeline()); + this.queryOptions.setEnablePipelineEngine(SessionVariable.enablePipelineEngine()); this.queryOptions.setBeExecVersion(Config.be_exec_version); this.queryOptions.setQueryTimeout(context.getExecTimeout()); this.queryOptions.setExecutionTimeout(context.getExecTimeout()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 44788aa2e4..3cc2adf5b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -191,6 +191,9 @@ public class SessionVariable implements Serializable, Writable { public static final long MIN_INSERT_VISIBLE_TIMEOUT_MS = 1000; public static final String ENABLE_PIPELINE_ENGINE = "enable_pipeline_engine"; + + public static final String ENABLE_AGG_STATE = "enable_agg_state"; + public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline"; public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt"; @@ -613,7 +616,10 @@ public class SessionVariable implements Serializable, Writable { public boolean enableVectorizedEngine = true; @VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true, expType = ExperimentalType.EXPERIMENTAL) - public boolean enablePipelineEngine = true; + private boolean enablePipelineEngine = true; + + @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, expType = ExperimentalType.EXPERIMENTAL) + public boolean enableAggState = false; @VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE) public boolean enableParallelOutfile = false; @@ -1636,10 +1642,6 @@ public class SessionVariable implements Serializable, Writable { this.runtimeFilterMaxInNum = runtimeFilterMaxInNum; } - public boolean enablePipelineEngine() { - return enablePipelineEngine; - } - public void setEnablePipelineEngine(boolean enablePipelineEngine) { this.enablePipelineEngine = enablePipelineEngine; } @@ -2338,4 +2340,24 @@ public class SessionVariable implements Serializable, Writable { public boolean isEnableUnifiedLoad() { return enableUnifiedLoad; } + + public boolean getEnablePipelineEngine() { + return enablePipelineEngine; + } + + public static boolean enablePipelineEngine() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return false; + } + return connectContext.getSessionVariable().enablePipelineEngine; + } + + public static boolean enableAggState() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return true; + } + return connectContext.getSessionVariable().enableAggState; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index aaaf105904..9cb5a7fe2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -316,7 +316,7 @@ public class StmtExecutor { builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum())); builder.traceId(context.getSessionVariable().getTraceId()); builder.isNereids(context.getState().isNereids ? "Yes" : "No"); - builder.isPipeline(context.getSessionVariable().enablePipelineEngine ? "Yes" : "No"); + builder.isPipeline(context.getSessionVariable().getEnablePipelineEngine() ? "Yes" : "No"); return builder.build(); } @@ -564,7 +564,7 @@ public class StmtExecutor { private void handleQueryWithRetry(TUniqueId queryId) throws Exception { // queue query here if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue - && context.getSessionVariable().enablePipelineEngine()) { + && context.getSessionVariable().getEnablePipelineEngine()) { this.queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); try { this.offerRet = queryQueue.offer(); @@ -1358,7 +1358,7 @@ public class StmtExecutor { // 2. If this is a query, send the result expr fields first, and send result data back to client. RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); - if (Config.enable_workload_group && context.sessionVariable.enablePipelineEngine()) { + if (Config.enable_workload_group && context.sessionVariable.getEnablePipelineEngine()) { coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context)); } QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), diff --git a/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy b/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy index 6926a16aba..9dc1e86009 100644 --- a/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy +++ b/regression-test/suites/datatype_p0/agg_state/group_concat/test_agg_state_group_concat.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_agg_state_group_concat") { + sql "set enable_agg_state=true" sql """ DROP TABLE IF EXISTS a_table; """ sql """ create table a_table( diff --git a/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy b/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy index bce225a7cd..3397ccf3bd 100644 --- a/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy +++ b/regression-test/suites/datatype_p0/agg_state/max/test_agg_state_max.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_agg_state_max") { + sql "set enable_agg_state=true" sql """ DROP TABLE IF EXISTS a_table; """ sql """ create table a_table( diff --git a/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy b/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy index 001b890647..1cadc48e2b 100644 --- a/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy +++ b/regression-test/suites/datatype_p0/agg_state/nereids/test_agg_state_nereids.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_agg_state_nereids") { + sql "set enable_agg_state=true" sql "set enable_nereids_planner=true;" sql "set enable_fallback_to_original_planner=false;" diff --git a/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy b/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy index 4c4797d9d1..d507a8cacf 100644 --- a/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy +++ b/regression-test/suites/datatype_p0/agg_state/test_agg_state.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_agg_state") { + sql "set enable_agg_state=true" sql """ DROP TABLE IF EXISTS d_table; """ sql """ create table d_table( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org