This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fce51e  KYLIN-4480 runtime non-equi join
1fce51e is described below

commit 1fce51edf5cfb687b77b2a3f41ca3ce382efd923
Author: Xian Li <li.x...@kyligence.io>
AuthorDate: Wed Apr 29 15:40:54 2020 +0800

    KYLIN-4480 runtime non-equi join
---
 .../org/apache/kylin/query/ITKylinQueryTest.java   |   5 +
 .../query/sql_non_equal_join/query_00.sql          |  11 +
 .../query/sql_non_equal_join/query_01.sql          |  12 ++
 .../query/sql_non_equal_join/query_02.sql          |  17 ++
 pom.xml                                            |   2 +-
 .../apache/kylin/query/optrule/OLAPJoinRule.java   |  10 +-
 .../kylin/query/relnode/OLAPNonEquiJoinRel.java    | 224 +++++++++++++++++++++
 7 files changed, 277 insertions(+), 4 deletions(-)

diff --git 
a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java 
b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 07edde2..7ec3cff 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -240,6 +240,11 @@ public class ITKylinQueryTest extends KylinTestBase {
     }
 
     @Test
+    public void testNonEqualJoin() throws Exception {
+        execAndCompQuery(getQueryFolderPrefix() + 
"src/test/resources/query/sql_non_equal_join", null, true);
+    }
+
+    @Test
     public void testUnionQuery() throws Exception {
         execAndCompQuery(getQueryFolderPrefix() + 
"src/test/resources/query/sql_union", null, true);
     }
diff --git a/kylin-it/src/test/resources/query/sql_non_equal_join/query_00.sql 
b/kylin-it/src/test/resources/query/sql_non_equal_join/query_00.sql
new file mode 100644
index 0000000..45b27e2
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_non_equal_join/query_00.sql
@@ -0,0 +1,11 @@
+-- non-equal join
+SELECT * FROM
+(
+SELECT CAL_DT, ORDER_ID FROM TEST_KYLIN_FACT GROUP BY CAL_DT, ORDER_ID
+) FACT
+LEFT JOIN
+(
+SELECT CAL_DT as DT_CAL_DT, WEEK_BEG_DT FROM EDW.TEST_CAL_DT
+) DT
+ON FACT.CAL_DT = DT.DT_CAL_DT AND WEEK_BEG_DT = DATE'2013-03-24'
+
diff --git a/kylin-it/src/test/resources/query/sql_non_equal_join/query_01.sql 
b/kylin-it/src/test/resources/query/sql_non_equal_join/query_01.sql
new file mode 100644
index 0000000..c843695
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_non_equal_join/query_01.sql
@@ -0,0 +1,12 @@
+-- non equal join with equal join
+SELECT ITEM_CNT, FACT.CAL_DT, ORDER_ID, WEEK_BEG_DT FROM
+(
+select sum(ITEM_COUNT) as ITEM_CNT, CAL_DT, TEST_ORDER.ORDER_ID
+FROM TEST_KYLIN_FACT as TEST_KYLIN_FACT
+INNER JOIN TEST_ORDER as TEST_ORDER
+ON TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID
+GROUP BY CAL_DT, TEST_ORDER.ORDER_ID
+) FACT
+LEFT JOIN EDW.TEST_CAL_DT DT
+ON FACT.CAL_DT = DT.CAL_DT AND WEEK_BEG_DT = DATE'2013-03-24'
+
diff --git a/kylin-it/src/test/resources/query/sql_non_equal_join/query_02.sql 
b/kylin-it/src/test/resources/query/sql_non_equal_join/query_02.sql
new file mode 100644
index 0000000..035459f
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_non_equal_join/query_02.sql
@@ -0,0 +1,17 @@
+-- non equal join with complex join conditions
+SELECT ITEM_CNT, FACT.CAL_DT, ORDER_ID, WEEK_BEG_DT FROM
+(
+    SELECT SUM(ITEM_COUNT) AS ITEM_CNT, CAL_DT, TEST_ORDER.ORDER_ID
+    FROM TEST_KYLIN_FACT AS TEST_KYLIN_FACT
+    INNER JOIN TEST_ORDER AS TEST_ORDER
+    ON TEST_KYLIN_FACT.ORDER_ID = TEST_ORDER.ORDER_ID
+    GROUP BY CAL_DT, TEST_ORDER.ORDER_ID
+) FACT
+LEFT JOIN EDW.TEST_CAL_DT DT
+ON FACT.CAL_DT = DT.CAL_DT
+    AND WEEK_BEG_DT = DATE'2013-03-24'
+    OR (WEEK_BEG_DT < DATE'2013-03-24'
+    AND (CASE WHEN ITEM_CNT > 100 THEN ORDER_ID > 10000 ELSE DT.CAL_DT < 
DATE'2013-04-24' END)
+    AND SUBSTRING(CAST(ORDER_ID AS VARCHAR), 1, 2) <> '14')
+
+
diff --git a/pom.xml b/pom.xml
index bd916b0..94406e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,7 +95,7 @@
     <reflections.version>0.9.10</reflections.version>
 
     <!-- Calcite Version, the kylin fork is: 
https://github.com/Kyligence/calcite -->
-    <calcite.version>1.16.0-kylin-r3</calcite.version>
+    <calcite.version>1.16.0-kylin-r4</calcite.version>
     <avatica.version>1.12.0</avatica.version>
 
     <!-- Hadoop Common deps, keep compatible with hadoop2.version -->
diff --git 
a/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java 
b/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java
index d87a0c4..ff6351b 100644
--- a/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java
+++ b/query/src/main/java/org/apache/kylin/query/optrule/OLAPJoinRule.java
@@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.kylin.query.relnode.OLAPFilterRel;
 import org.apache.kylin.query.relnode.OLAPJoinRel;
+import org.apache.kylin.query.relnode.OLAPNonEquiJoinRel;
 import org.apache.kylin.query.relnode.OLAPRel;
 
 /**
@@ -53,9 +54,12 @@ public class OLAPJoinRule extends ConverterRule {
 
         final JoinInfo info = JoinInfo.of(left, right, join.getCondition());
         if (!info.isEqui() && join.getJoinType() != JoinRelType.INNER) {
-            // EnumerableJoinRel only supports equi-join. We can put a filter 
on top
-            // if it is an inner join.
-            return null;
+            try {
+                return new OLAPNonEquiJoinRel(join.getCluster(), traitSet, 
left, right,
+                        join.getCondition(), join.getVariablesSet(), 
join.getJoinType());
+            } catch (InvalidRelException e) {
+                throw new IllegalStateException(e);
+            }
         }
 
         RelOptCluster cluster = join.getCluster();
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java
new file mode 100644
index 0000000..cb25f12
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPNonEquiJoinRel.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.relnode;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableThetaJoin;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class OLAPNonEquiJoinRel extends EnumerableThetaJoin implements OLAPRel 
{
+
+    private OLAPContext context;
+    private ColumnRowType columnRowType;
+    private boolean hasSubQuery;
+
+    private boolean isTopJoin;
+
+    public OLAPNonEquiJoinRel(RelOptCluster cluster, RelTraitSet traits, 
RelNode left, RelNode right, RexNode condition,
+                             Set<CorrelationId> variablesSet, JoinRelType 
joinType) throws InvalidRelException {
+        super(cluster, traits, left, right, condition, variablesSet, joinType);
+        rowType = getRowType();
+    }
+
+    @Override
+    public OLAPContext getContext() {
+        return context;
+    }
+
+    @Override
+    public ColumnRowType getColumnRowType() {
+        return columnRowType;
+    }
+
+    @Override
+    public boolean hasSubQuery() {
+        return hasSubQuery;
+    }
+
+    @Override
+    public RelTraitSet replaceTraitSet(RelTrait trait) {
+        RelTraitSet oldTraitSet = this.traitSet;
+        this.traitSet = this.traitSet.replace(trait);
+        return oldTraitSet;
+    }
+
+    protected boolean isParentMerelyPermutation(OLAPImplementor implementor) {
+        if (implementor.getParentNode() instanceof OLAPProjectRel) {
+            return ((OLAPProjectRel) 
implementor.getParentNode()).isMerelyPermutation();
+        }
+        return false;
+    }
+
+    @Override
+    public void implementOLAP(OLAPImplementor implementor) {
+        // create context for root join
+        if (!(implementor.getParentNode() instanceof OLAPJoinRel)
+                && !(implementor.getParentNode() instanceof OLAPNonEquiJoinRel)
+                && !isParentMerelyPermutation(implementor)) {
+            implementor.allocateContext();
+        }
+
+        //parent context
+        this.context = implementor.getContext();
+        this.isTopJoin = !context.hasJoin;
+        this.context.hasJoin = true;
+        this.hasSubQuery = true;
+
+        // visit and allocate context for left input
+        implementor.fixSharedOlapTableScanOnTheLeft(this);
+        implementor.setNewOLAPContextRequired(true);
+        implementor.visitChild(this.left, this);
+        if (this.context != implementor.getContext()) {
+            implementor.freeContext();
+        }
+
+        // visit and allocate context for right input
+        implementor.fixSharedOlapTableScanOnTheRight(this);
+        implementor.setNewOLAPContextRequired(true);
+        implementor.visitChild(this.right, this);
+        if (this.context != implementor.getContext()) {
+            implementor.freeContext();
+        }
+
+        this.columnRowType = buildColumnRowType();
+
+        if (isTopJoin) {
+            this.context.afterJoin = true;
+        }
+
+        
this.context.subqueryJoinParticipants.addAll(collectJoinColumns(condition));
+    }
+
+    @Override
+    public void implementRewrite(RewriteImplementor implementor) {
+        implementor.visitChild(this, this.left);
+        implementor.visitChild(this, this.right);
+
+        this.rowType = this.deriveRowType();
+        if (this.isTopJoin) {
+
+            // add dynamic field
+            Map<TblColRef, RelDataType> dynFields = this.context.dynamicFields;
+            if (!dynFields.isEmpty()) {
+                List<TblColRef> newCols = 
Lists.newArrayList(this.columnRowType.getAllColumns());
+                List<RelDataTypeField> newFieldList = Lists.newArrayList();
+                int paramIndex = this.rowType.getFieldList().size();
+                for (TblColRef fieldCol : dynFields.keySet()) {
+                    RelDataType fieldType = dynFields.get(fieldCol);
+
+                    RelDataTypeField newField = new 
RelDataTypeFieldImpl(fieldCol.getName(), paramIndex++, fieldType);
+                    newFieldList.add(newField);
+
+                    newCols.add(fieldCol);
+                }
+
+                // rebuild row type
+                RelDataTypeFactory.FieldInfoBuilder fieldInfo = 
getCluster().getTypeFactory().builder();
+                fieldInfo.addAll(this.rowType.getFieldList());
+                fieldInfo.addAll(newFieldList);
+                this.rowType = 
getCluster().getTypeFactory().createStructType(fieldInfo);
+
+                this.columnRowType = new ColumnRowType(newCols);
+            }
+        }
+    }
+
+    @Override
+    public EnumerableRel implementEnumerable(List<EnumerableRel> inputs) {
+        return super.copy(traitSet, condition, inputs.get(0), inputs.get(1), 
joinType, isSemiJoinDone());
+    }
+
+    protected ColumnRowType buildColumnRowType() {
+        List<TblColRef> columns = new ArrayList<TblColRef>();
+
+        OLAPRel olapLeft = (OLAPRel) this.left;
+        ColumnRowType leftColumnRowType = olapLeft.getColumnRowType();
+        columns.addAll(leftColumnRowType.getAllColumns());
+
+        OLAPRel olapRight = (OLAPRel) this.right;
+        ColumnRowType rightColumnRowType = olapRight.getColumnRowType();
+        columns.addAll(rightColumnRowType.getAllColumns());
+
+        if (columns.size() != this.rowType.getFieldCount()) {
+            throw new IllegalStateException(
+                    "RowType=" + this.rowType.getFieldCount() + ", 
ColumnRowType=" + columns.size());
+        }
+        return new ColumnRowType(columns);
+    }
+
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
+        return super.computeSelfCost(planner, mq).multiplyBy(.05);
+    }
+
+    @Override
+    public double estimateRowCount(RelMetadataQuery mq) {
+        return super.estimateRowCount(mq) * 0.1;
+    }
+
+    @Override
+    public EnumerableThetaJoin copy(RelTraitSet traitSet, RexNode condition, 
RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+        try {
+            return new OLAPNonEquiJoinRel(this.getCluster(), traitSet, left, 
right, condition, this.variablesSet, joinType);
+        } catch (InvalidRelException var8) {
+            throw new AssertionError(var8);
+        }
+    }
+
+    private Collection<TblColRef> collectJoinColumns(RexNode condition) {
+        Set<TblColRef> joinColumns = Sets.newHashSet();
+        doCollectJoinColumns(condition, joinColumns);
+        return joinColumns;
+    }
+
+    private void doCollectJoinColumns(RexNode node, Set<TblColRef> 
joinColumns) {
+        if (node instanceof RexCall) {
+            ((RexCall) node).getOperands().forEach(operand -> 
doCollectJoinColumns(operand, joinColumns));
+        } else if (node instanceof RexInputRef) {
+            joinColumns.add(columnRowType.getColumnByIndex(((RexInputRef) 
node).getIndex()));
+        }
+    }
+
+}

Reply via email to