This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 1fce51e KYLIN-4480 runtime non-equi join 1fce51e is described below commit 1fce51edf5cfb687b77b2a3f41ca3ce382efd923 Author: Xian Li <li.x...@kyligence.io> AuthorDate: Wed Apr 29 15:40:54 2020 +0800 KYLIN-4480 runtime non-equi join --- .../org/apache/kylin/query/ITKylinQueryTest.java | 5 + .../query/sql_non_equal_join/query_00.sql | 11 + .../query/sql_non_equal_join/query_01.sql | 12 ++ .../query/sql_non_equal_join/query_02.sql | 17 ++ pom.xml | 2 +- .../apache/kylin/query/optrule/OLAPJoinRule.java | 10 +- .../kylin/query/relnode/OLAPNonEquiJoinRel.java | 224 +++++++++++++++++++++ 7 files changed, 277 insertions(+), 4 deletions(-) diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 07edde2..7ec3cff 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -240,6 +240,11 @@ public class ITKylinQueryTest extends KylinTestBase { } @Test + public void testNonEqualJoin() throws Exception { + execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_non_equal_join", null, true); + } + + @Test public void testUnionQuery() throws Exception { execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_union", null, true); } diff --git a/kylin-it/src/test/resources/query/sql_non_equal_join/query_00.sql b/kylin-it/src/test/resources/query/sql_non_equal_join/query_00.sql new file mode 100644 index 0000000..45b27e2 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_non_equal_join/query_00.sql @@ -0,0 +1,11 @@ +-- non-equal join +SELECT * FROM +( +SELECT CAL_DT, ORDER_ID FROM TEST_KYLIN_FACT GROUP BY CAL_DT, ORDER_ID +) FACT +LEFT JOIN +( +SELECT CAL_DT as DT_CAL_DT, WEEK_BEG_DT FROM EDW.TEST_CAL_DT +) DT +ON FACT.CAL_DT = DT.DT_CAL_DT AND WEEK_BEG_DT = DATE'2013-03-24' + diff --git a/kylin-it/src/test/resources/query/sql_non_equal_join/query_01.sql b/kylin-it/src/test/resources/query/sql_non_equal_join/query_01.sql new file mode 100644 index 0000000..c843695 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_non_equal_join/query_01.sql @@ -0,0 +1,12 @@ +-- non equal join with equal join +SELECT ITEM_CNT, FACT.CAL_DT, ORDER_ID, WEEK_BEG_DT FROM +( +select sum(ITEM_COUNT) as ITEM_CNT, CAL_DT, TEST_ORDER.ORDER_ID +FROM TEST_KYLIN_FACT as TEST_KYLIN_FACT +INNER JOIN TEST_ORDER as TEST_ORDER +ON TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID +GROUP BY CAL_DT, TEST_ORDER.ORDER_ID +) FACT +LEFT JOIN EDW.TEST_CAL_DT DT +ON FACT.CAL_DT = DT.CAL_DT AND WEEK_BEG_DT = DATE'2013-03-24' + diff --git a/kylin-it/src/test/resources/query/sql_non_equal_join/query_02.sql b/kylin-it/src/test/resources/query/sql_non_equal_join/query_02.sql new file mode 100644 index 0000000..035459f --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_non_equal_join/query_02.sql @@ -0,0 +1,17 @@ +-- non equal join with complex join conditions +SELECT ITEM_CNT, FACT.CAL_DT, ORDER_ID, WEEK_BEG_DT FROM +( + SELECT SUM(ITEM_COUNT) AS ITEM_CNT, CAL_DT, TEST_ORDER.ORDER_ID + FROM TEST_KYLIN_FACT AS TEST_KYLIN_FACT + INNER JOIN TEST_ORDER AS TEST_ORDER + ON TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID + GROUP BY CAL_DT, TEST_ORDER.ORDER_ID +) FACT +LEFT JOIN EDW.TEST_CAL_DT DT +ON FACT.CAL_DT = DT.CAL_DT + AND WEEK_BEG_DT = DATE'2013-03-24' + OR (WEEK_BEG_DT < DATE'2013-03-24' + AND (CASE WHEN ITEM_CNT > 100 THEN ORDER_ID > 10000 ELSE DT.CAL_DT < DATE'2013-04-24' END) + AND SUBSTRING(CAST(ORDER_ID AS VARCHAR), 1, 2) <> '14') + + diff --git a/pom.xml b/pom.xml index bd916b0..94406e5 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ <reflections.version>0.9.10</reflections.version> <!-- Calcite Version, the kylin fork is: https://github.com/Kyligence/calcite --> - <calcite.version>1.16.0-kylin-r3</calcite.version> + <calcite.version>1.16.0-kylin-r4</calcite.version> <avatica.version>1.12.0</avatica.version> <!-- Hadoop Common deps, keep compatible with hadoop2.version --> diff --git a/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java b/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java index d87a0c4..ff6351b 100644 --- a/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java +++ b/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java @@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.kylin.query.relnode.OLAPFilterRel; import org.apache.kylin.query.relnode.OLAPJoinRel; +import org.apache.kylin.query.relnode.OLAPNonEquiJoinRel; import org.apache.kylin.query.relnode.OLAPRel; /** @@ -53,9 +54,12 @@ public class OLAPJoinRule extends ConverterRule { final JoinInfo info = JoinInfo.of(left, right, join.getCondition()); if (!info.isEqui() && join.getJoinType() != JoinRelType.INNER) { - // EnumerableJoinRel only supports equi-join. We can put a filter on top - // if it is an inner join. - return null; + try { + return new OLAPNonEquiJoinRel(join.getCluster(), traitSet, left, right, + join.getCondition(), join.getVariablesSet(), join.getJoinType()); + } catch (InvalidRelException e) { + throw new IllegalStateException(e); + } } RelOptCluster cluster = join.getCluster(); diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java new file mode 100644 index 0000000..cb25f12 --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.relnode; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableThetaJoin; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.InvalidRelException; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.kylin.metadata.model.TblColRef; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class OLAPNonEquiJoinRel extends EnumerableThetaJoin implements OLAPRel { + + private OLAPContext context; + private ColumnRowType columnRowType; + private boolean hasSubQuery; + + private boolean isTopJoin; + + public OLAPNonEquiJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, + Set<CorrelationId> variablesSet, JoinRelType joinType) throws InvalidRelException { + super(cluster, traits, left, right, condition, variablesSet, joinType); + rowType = getRowType(); + } + + @Override + public OLAPContext getContext() { + return context; + } + + @Override + public ColumnRowType getColumnRowType() { + return columnRowType; + } + + @Override + public boolean hasSubQuery() { + return hasSubQuery; + } + + @Override + public RelTraitSet replaceTraitSet(RelTrait trait) { + RelTraitSet oldTraitSet = this.traitSet; + this.traitSet = this.traitSet.replace(trait); + return oldTraitSet; + } + + protected boolean isParentMerelyPermutation(OLAPImplementor implementor) { + if (implementor.getParentNode() instanceof OLAPProjectRel) { + return ((OLAPProjectRel) implementor.getParentNode()).isMerelyPermutation(); + } + return false; + } + + @Override + public void implementOLAP(OLAPImplementor implementor) { + // create context for root join + if (!(implementor.getParentNode() instanceof OLAPJoinRel) + && !(implementor.getParentNode() instanceof OLAPNonEquiJoinRel) + && !isParentMerelyPermutation(implementor)) { + implementor.allocateContext(); + } + + //parent context + this.context = implementor.getContext(); + this.isTopJoin = !context.hasJoin; + this.context.hasJoin = true; + this.hasSubQuery = true; + + // visit and allocate context for left input + implementor.fixSharedOlapTableScanOnTheLeft(this); + implementor.setNewOLAPContextRequired(true); + implementor.visitChild(this.left, this); + if (this.context != implementor.getContext()) { + implementor.freeContext(); + } + + // visit and allocate context for right input + implementor.fixSharedOlapTableScanOnTheRight(this); + implementor.setNewOLAPContextRequired(true); + implementor.visitChild(this.right, this); + if (this.context != implementor.getContext()) { + implementor.freeContext(); + } + + this.columnRowType = buildColumnRowType(); + + if (isTopJoin) { + this.context.afterJoin = true; + } + + this.context.subqueryJoinParticipants.addAll(collectJoinColumns(condition)); + } + + @Override + public void implementRewrite(RewriteImplementor implementor) { + implementor.visitChild(this, this.left); + implementor.visitChild(this, this.right); + + this.rowType = this.deriveRowType(); + if (this.isTopJoin) { + + // add dynamic field + Map<TblColRef, RelDataType> dynFields = this.context.dynamicFields; + if (!dynFields.isEmpty()) { + List<TblColRef> newCols = Lists.newArrayList(this.columnRowType.getAllColumns()); + List<RelDataTypeField> newFieldList = Lists.newArrayList(); + int paramIndex = this.rowType.getFieldList().size(); + for (TblColRef fieldCol : dynFields.keySet()) { + RelDataType fieldType = dynFields.get(fieldCol); + + RelDataTypeField newField = new RelDataTypeFieldImpl(fieldCol.getName(), paramIndex++, fieldType); + newFieldList.add(newField); + + newCols.add(fieldCol); + } + + // rebuild row type + RelDataTypeFactory.FieldInfoBuilder fieldInfo = getCluster().getTypeFactory().builder(); + fieldInfo.addAll(this.rowType.getFieldList()); + fieldInfo.addAll(newFieldList); + this.rowType = getCluster().getTypeFactory().createStructType(fieldInfo); + + this.columnRowType = new ColumnRowType(newCols); + } + } + } + + @Override + public EnumerableRel implementEnumerable(List<EnumerableRel> inputs) { + return super.copy(traitSet, condition, inputs.get(0), inputs.get(1), joinType, isSemiJoinDone()); + } + + protected ColumnRowType buildColumnRowType() { + List<TblColRef> columns = new ArrayList<TblColRef>(); + + OLAPRel olapLeft = (OLAPRel) this.left; + ColumnRowType leftColumnRowType = olapLeft.getColumnRowType(); + columns.addAll(leftColumnRowType.getAllColumns()); + + OLAPRel olapRight = (OLAPRel) this.right; + ColumnRowType rightColumnRowType = olapRight.getColumnRowType(); + columns.addAll(rightColumnRowType.getAllColumns()); + + if (columns.size() != this.rowType.getFieldCount()) { + throw new IllegalStateException( + "RowType=" + this.rowType.getFieldCount() + ", ColumnRowType=" + columns.size()); + } + return new ColumnRowType(columns); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(.05); + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + return super.estimateRowCount(mq) * 0.1; + } + + @Override + public EnumerableThetaJoin copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { + try { + return new OLAPNonEquiJoinRel(this.getCluster(), traitSet, left, right, condition, this.variablesSet, joinType); + } catch (InvalidRelException var8) { + throw new AssertionError(var8); + } + } + + private Collection<TblColRef> collectJoinColumns(RexNode condition) { + Set<TblColRef> joinColumns = Sets.newHashSet(); + doCollectJoinColumns(condition, joinColumns); + return joinColumns; + } + + private void doCollectJoinColumns(RexNode node, Set<TblColRef> joinColumns) { + if (node instanceof RexCall) { + ((RexCall) node).getOperands().forEach(operand -> doCollectJoinColumns(operand, joinColumns)); + } else if (node instanceof RexInputRef) { + joinColumns.add(columnRowType.getColumnByIndex(((RexInputRef) node).getIndex())); + } + } + +}