This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f66b85f0d286ce6b71f100b47da90288f63ab5b0
Author: Jiale He <jialeheb...@gmail.com>
AuthorDate: Wed Feb 1 17:41:37 2023 +0800

    KYLIN-5521 fix JoinsGraph
---
 .../apache/kylin/metadata/model/JoinsGraph.java    | 607 ---------------------
 .../apache/kylin/metadata/model/NDataModel.java    |   1 +
 .../model/graph/DefaultJoinEdgeMatcher.java        |  69 +++
 .../apache/kylin/metadata/model/graph/Edge.java    | 148 +++++
 .../metadata/model/graph/IJoinEdgeMatcher.java     |  27 +
 .../kylin/metadata/model/graph/JoinsGraph.java     | 506 +++++++++++++++++
 .../metadata/model/util/ComputedColumnUtil.java    |   4 +-
 .../kylin/metadata/model/JoinsGraphTest.java       |  89 ++-
 .../kylin/metadata/model/MockJoinGraphBuilder.java |   1 +
 .../apache/kylin/query/relnode/OLAPContext.java    |   4 +-
 .../kylin/query/routing/RealizationChooser.java    |   2 +-
 .../apache/kylin/query/util/QueryAliasMatcher.java |   5 +-
 12 files changed, 819 insertions(+), 644 deletions(-)

diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsGraph.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsGraph.java
deleted file mode 100644
index 7df0ddbd6e..0000000000
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsGraph.java
+++ /dev/null
@@ -1,607 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.metadata.model;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import lombok.Getter;
-import lombok.NonNull;
-import lombok.Setter;
-
-public class JoinsGraph implements Serializable {
-
-    public class Edge implements Serializable {
-
-        @Getter
-        private final JoinDesc join;
-        private final ColumnDesc[] leftCols;
-        private final ColumnDesc[] rightCols;
-        private final NonEquiJoinCondition nonEquiJoinCondition;
-
-        private Edge(JoinDesc join) {
-            this.join = join;
-
-            leftCols = new ColumnDesc[join.getForeignKeyColumns().length];
-            int i = 0;
-            for (TblColRef colRef : join.getForeignKeyColumns()) {
-                leftCols[i++] = colRef.getColumnDesc();
-            }
-
-            rightCols = new ColumnDesc[join.getPrimaryKeyColumns().length];
-            i = 0;
-            for (TblColRef colRef : join.getPrimaryKeyColumns()) {
-                rightCols[i++] = colRef.getColumnDesc();
-            }
-
-            nonEquiJoinCondition = join.getNonEquiJoinCondition();
-        }
-
-        public boolean isJoinMatched(JoinDesc other) {
-            return join.equals(other);
-        }
-
-        public boolean isNonEquiJoin() {
-            return nonEquiJoinCondition != null;
-        }
-
-        public boolean isLeftJoin() {
-            return !join.isLeftOrInnerJoin() && join.isLeftJoin();
-        }
-
-        public boolean isLeftOrInnerJoin() {
-            return join.isLeftOrInnerJoin();
-        }
-
-        public boolean isInnerJoin() {
-            return !join.isLeftOrInnerJoin() && join.isInnerJoin();
-        }
-
-        private TableRef left() {
-            return join.getFKSide();
-        }
-
-        private TableRef right() {
-            return join.getPKSide();
-        }
-
-        private boolean isFkSide(TableRef tableRef) {
-            return join.getFKSide().equals(tableRef);
-        }
-
-        private boolean isPkSide(TableRef tableRef) {
-            return join.getPKSide().equals(tableRef);
-        }
-
-        private TableRef other(TableRef tableRef) {
-            if (left().equals(tableRef)) {
-                return right();
-            } else if (right().equals(tableRef)) {
-                return left();
-            }
-            throw new IllegalArgumentException("table " + tableRef + " is not 
on the edge " + this);
-        }
-
-        @Override
-        public boolean equals(Object that) {
-            if (that == null)
-                return false;
-
-            if (this.getClass() != that.getClass())
-                return false;
-
-            return joinEdgeMatcher.matches(this, (Edge) that);
-        }
-
-        @Override
-        public int hashCode() {
-            if (this.isLeftJoin()) {
-                return Objects.hash(isLeftJoin(), leftCols, rightCols);
-            } else {
-                if (Arrays.hashCode(leftCols) < Arrays.hashCode(rightCols)) {
-                    return Objects.hash(isLeftJoin(), leftCols, rightCols);
-                } else {
-                    return Objects.hash(isLeftJoin(), rightCols, leftCols);
-                }
-            }
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder("Edge: ");
-            sb.append(left())
-                    .append(isLeftJoin() ? " LEFT JOIN "
-                            : isLeftOrInnerJoin() ? " LEFT OR INNER JOIN " : " 
INNER JOIN ")
-                    .append(right()).append(" ON ")
-                    
.append(Arrays.toString(Arrays.stream(leftCols).map(ColumnDesc::getName).toArray())).append("
 = ")
-                    
.append(Arrays.toString(Arrays.stream(rightCols).map(ColumnDesc::getName).toArray()));
-            return sb.toString();
-        }
-    }
-
-    private Edge edgeOf(JoinDesc join) {
-        return new Edge(join);
-    }
-
-    private static final IJoinEdgeMatcher DEFAULT_JOIN_EDGE_MATCHER = new 
DefaultJoinEdgeMatcher();
-    @Setter
-    private IJoinEdgeMatcher joinEdgeMatcher = DEFAULT_JOIN_EDGE_MATCHER;
-
-    /**
-     * compare:
-     * 1. JoinType
-     * 2. Columns on both sides
-     */
-    public interface IJoinEdgeMatcher extends Serializable {
-        boolean matches(@NonNull Edge join1, @NonNull Edge join2);
-    }
-
-    public static class DefaultJoinEdgeMatcher implements IJoinEdgeMatcher {
-        @Override
-        public boolean matches(@NonNull Edge join1, @NonNull Edge join2) {
-            if (join1.isLeftJoin() != join2.isLeftJoin() && 
!join1.isLeftOrInnerJoin() && !join2.isLeftOrInnerJoin()) {
-                return false;
-            }
-
-            if (!Objects.equals(join1.nonEquiJoinCondition, 
join2.nonEquiJoinCondition)) {
-                return false;
-            }
-
-            if (join1.isLeftJoin()) {
-                return columnDescEquals(join1.leftCols, join2.leftCols)
-                        && columnDescEquals(join1.rightCols, join2.rightCols);
-            } else {
-                return (columnDescEquals(join1.leftCols, join2.leftCols)
-                        && columnDescEquals(join1.rightCols, join2.rightCols))
-                        || (columnDescEquals(join1.leftCols, join2.rightCols)
-                                && columnDescEquals(join1.rightCols, 
join2.leftCols));
-            }
-        }
-
-        private boolean columnDescEquals(ColumnDesc[] a, ColumnDesc[] b) {
-            if (a.length != b.length)
-                return false;
-
-            for (int i = 0; i < a.length; i++) {
-                if (!columnDescEquals(a[i], b[i]))
-                    return false;
-            }
-            return true;
-        }
-
-        protected boolean columnDescEquals(ColumnDesc a, ColumnDesc b) {
-            return Objects.equals(a, b);
-        }
-    }
-
-    @Getter
-    private final TableRef center;
-    private final Map<String, TableRef> nodes = new HashMap<>();
-    private final Set<Edge> edges = new HashSet<>();
-    private final Map<TableRef, List<Edge>> edgesFromNode = new HashMap<>();
-    private final Map<TableRef, List<Edge>> edgesToNode = new HashMap<>();
-
-    /**
-     * For model there's always a center, if there's only one tableScan it's 
the center.
-     * Otherwise the center is not determined, it's a linked graph, hard to 
tell the center.
-     */
-    public JoinsGraph(TableRef root, List<JoinDesc> joins) {
-        this.center = root;
-        addNode(root);
-
-        for (JoinDesc join : joins) {
-            
Preconditions.checkState(Arrays.stream(join.getForeignKeyColumns()).allMatch(TblColRef::isQualified));
-            
Preconditions.checkState(Arrays.stream(join.getPrimaryKeyColumns()).allMatch(TblColRef::isQualified));
-            addAsEdge(join);
-        }
-
-        validate(joins);
-    }
-
-    private void addNode(TableRef table) {
-        Preconditions.checkNotNull(table);
-        String alias = table.getAlias();
-        TableRef node = nodes.get(alias);
-        if (node != null) {
-            Preconditions.checkArgument(node.equals(table), "[%s]'s Alias 
\"%s\" has conflict with [%s].", table, alias,
-                    node);
-        } else {
-            nodes.put(alias, table);
-        }
-    }
-
-    private void addAsEdge(JoinDesc join) {
-        TableRef fkTable = join.getFKSide();
-        TableRef pkTable = join.getPKSide();
-        addNode(pkTable);
-
-        Edge edge = edgeOf(join);
-        edgesFromNode.computeIfAbsent(fkTable, fk -> Lists.newArrayList());
-        edgesFromNode.get(fkTable).add(edge);
-        edgesToNode.computeIfAbsent(pkTable, pk -> Lists.newArrayList());
-        edgesToNode.get(pkTable).add(edge);
-        if (!edge.isLeftJoin()) {
-            // inner join is reversible
-            edgesFromNode.computeIfAbsent(pkTable, pk -> Lists.newArrayList());
-            edgesFromNode.get(pkTable).add(edge);
-            edgesToNode.computeIfAbsent(fkTable, fk -> Lists.newArrayList());
-            edgesToNode.get(fkTable).add(edge);
-        }
-        edges.add(edge);
-    }
-
-    public void setJoinToLeftOrInner(JoinDesc join) {
-        if (!join.isLeftJoin()) {
-            join.setLeftOrInner(true);
-            return;
-        }
-
-        join.setLeftOrInner(true);
-        TableRef fkTable = join.getFKSide();
-        TableRef pkTable = join.getPKSide();
-        Edge edge = edges.stream().filter(e -> 
e.isJoinMatched(join)).findFirst().orElse(null);
-        if (edge == null) {
-            return;
-        }
-        edgesFromNode.computeIfAbsent(pkTable, pk -> Lists.newArrayList());
-        edgesFromNode.get(pkTable).add(edge);
-        edgesToNode.computeIfAbsent(fkTable, fk -> Lists.newArrayList());
-        edgesToNode.get(fkTable).add(edge);
-    }
-
-    private void validate(List<JoinDesc> joins) {
-        for (JoinDesc join : joins) {
-            TableRef fkTable = join.getFKSide();
-            Preconditions.checkNotNull(nodes.get(fkTable.getAlias()));
-            
Preconditions.checkState(nodes.get(fkTable.getAlias()).equals(fkTable));
-        }
-        Preconditions.checkState(nodes.size() == joins.size() + 1);
-    }
-
-    public boolean match(JoinsGraph pattern, Map<String, String> matchAlias) {
-        return match(pattern, matchAlias, false);
-    }
-
-    public boolean match(JoinsGraph pattern, Map<String, String> matchAlias, 
boolean matchPatial) {
-        return match(pattern, matchAlias, matchPatial, false);
-    }
-
-    public boolean match(JoinsGraph pattern, Map<String, String> matchAlias, 
boolean matchPatial,
-            boolean matchPartialNonEquiJoin) {
-        if (pattern == null || pattern.center == null) {
-            throw new IllegalArgumentException("pattern(model) should have a 
center: " + pattern);
-        }
-
-        List<TableRef> candidatesOfQCenter = 
searchCenterByIdentity(pattern.center);
-        if (CollectionUtils.isEmpty(candidatesOfQCenter)) {
-            return false;
-        }
-
-        for (TableRef queryCenter : candidatesOfQCenter) {
-            // query <-> pattern
-            Map<TableRef, TableRef> trialMatch = Maps.newHashMap();
-            trialMatch.put(queryCenter, pattern.center);
-
-            if (!checkInnerJoinNum(pattern, queryCenter, pattern.center, 
matchPatial)) {
-                continue;
-            }
-
-            AtomicReference<Map<TableRef, TableRef>> finalMatchRef = new 
AtomicReference<>();
-            innerMatch(pattern, trialMatch, matchPatial, finalMatchRef);
-            if (finalMatchRef.get() != null
-                    && (matchPartialNonEquiJoin || 
checkNonEquiJoinMatches(finalMatchRef.get(), pattern))) {
-                matchAlias.clear();
-                matchAlias.putAll(finalMatchRef.get().entrySet().stream()
-                        .collect(Collectors.toMap(e -> e.getKey().getAlias(), 
e -> e.getValue().getAlias())));
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static JoinsGraph normalizeJoinGraph(JoinsGraph joinsGraph) {
-        for (Edge edge : joinsGraph.edges) {
-            if (!edge.isLeftJoin() || edge.isLeftOrInnerJoin()) {
-                TableRef leftTable = edge.left();
-                List<Edge> edgeList = joinsGraph.edgesToNode.get(leftTable);
-                if (CollectionUtils.isEmpty(edgeList)) {
-                    continue;
-                }
-                for (Edge targetEdge : edgeList) {
-                    if (!edge.equals(targetEdge) && 
leftTable.equals(targetEdge.right())
-                            && !targetEdge.isLeftOrInnerJoin()) {
-                        joinsGraph.setJoinToLeftOrInner(targetEdge.join);
-                        normalizeJoinGraph(joinsGraph);
-                    }
-                }
-            }
-        }
-        return joinsGraph;
-    }
-
-    public List<TableRef> getAllTblRefNodes() {
-        return Lists.newArrayList(nodes.values());
-    }
-
-    /**
-     * check if any non-equi join is missed in the pattern
-     * if so, we cannot match the current graph with the the pattern graph.
-     * set `kylin.query.match-partial-non-equi-join-model` to skip this 
checking
-     * @param matches
-     * @return
-     */
-    private boolean checkNonEquiJoinMatches(Map<TableRef, TableRef> matches, 
JoinsGraph pattern) {
-        HashSet<TableRef> patternGraphTables = new 
HashSet<>(pattern.nodes.values());
-
-        for (TableRef patternTable : patternGraphTables) {
-            List<Edge> outgoingEdges = pattern.getEdgesByFKSide(patternTable);
-            // for all outgoing non-equi join edges
-            // if there is no match found for the right side table in the 
current graph
-            // return false
-            for (Edge outgoingEdge : outgoingEdges) {
-                if (outgoingEdge.isNonEquiJoin()) {
-                    if (!matches.containsValue(patternTable) || 
!matches.containsValue(outgoingEdge.right())) {
-                        return false;
-                    }
-                }
-            }
-        }
-        return true;
-    }
-
-    private boolean isAllJoinInner(JoinsGraph joinsGraph, TableRef tableRef) {
-        List<Edge> edgesFromNode = joinsGraph.edgesFromNode.get(tableRef);
-        List<Edge> edgesToNode = joinsGraph.edgesToNode.get(tableRef);
-
-        if (edgesFromNode == null) {
-            return false;
-        }
-
-        if (edgesToNode == null) {
-            return false;
-        }
-
-        if (edgesToNode.size() != edgesFromNode.size()) {
-            return false;
-        }
-
-        for (Edge edge : edgesFromNode) {
-            if (edge.join.isLeftJoin()) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    private boolean checkInnerJoinNum(JoinsGraph pattern, TableRef 
queryTableRef, TableRef patternTableRef,
-            boolean matchPartial) {
-        if (matchPartial) {
-            return true;
-        }
-        // fully match: unmatched if extra inner join edge on either graph
-        //  matched if:   query graph join count:   model graph join count:
-        //  1)                        inner join <= inner join
-        //  2)   inner join + left or inner join >= inner join
-        List<Edge> innerQueryEdges = 
this.edgesFrom(queryTableRef).stream().filter(Edge::isInnerJoin)
-                .collect(Collectors.toList());
-        List<Edge> notLeftQueryEdges = 
this.edgesFrom(queryTableRef).stream().filter(e -> !e.isLeftJoin())
-                .collect(Collectors.toList());
-        List<Edge> innerPatternEdges = 
pattern.edgesFrom(patternTableRef).stream().filter(Edge::isInnerJoin)
-                .collect(Collectors.toList());
-
-        // if all joins are inner joins, compare sum of both sides
-        if (isAllJoinInner(this, queryTableRef) && isAllJoinInner(pattern, 
patternTableRef)) {
-            int cntInnerQueryEdges = innerQueryEdges.size();
-            int cntNotLeftQueryEdges = notLeftQueryEdges.size();
-            int cntInnerPatternEdges = innerPatternEdges.size();
-            return cntInnerQueryEdges <= cntInnerPatternEdges && 
cntNotLeftQueryEdges >= cntInnerPatternEdges;
-        }
-
-        // if not all joins are inner, compare left side and right side 
separately
-        //  Calculate join count in query graph
-        int cntLeftSideInnerQueryEdges = (int) innerQueryEdges.stream()
-                .filter(edge -> edge.right().equals(queryTableRef)).count();
-        int cntRightSideInnerQueryEdges = (int) innerQueryEdges.stream()
-                .filter(edge -> edge.left().equals(queryTableRef)).count();
-        int cntLeftSideNotLeftQueryEdges = (int) notLeftQueryEdges.stream()
-                .filter(edge -> edge.right().equals(queryTableRef)).count();
-        int cntRightSideNotLeftQueryEdges = (int) notLeftQueryEdges.stream()
-                .filter(edge -> edge.left().equals(queryTableRef)).count();
-        // Calculate join count in model graph
-        int cntLeftSideInnerPatternEdges = (int) innerPatternEdges.stream()
-                .filter(edge -> edge.right().equals(patternTableRef)).count();
-        int cntRightSideInnerPatternEdges = (int) innerPatternEdges.stream()
-                .filter(edge -> edge.left().equals(patternTableRef)).count();
-
-        boolean isLeftEqual = cntLeftSideInnerQueryEdges <= 
cntLeftSideInnerPatternEdges
-                && cntLeftSideNotLeftQueryEdges >= 
cntLeftSideInnerPatternEdges;
-        boolean isRightEqual = cntRightSideInnerQueryEdges <= 
cntRightSideInnerPatternEdges
-                && cntRightSideNotLeftQueryEdges >= 
cntRightSideInnerPatternEdges;
-        return isLeftEqual && isRightEqual;
-    }
-
-    private void innerMatch(JoinsGraph pattern, Map<TableRef, TableRef> 
trialMatches, boolean matchPartial,
-            AtomicReference<Map<TableRef, TableRef>> finalMatch) {
-        if (trialMatches.size() == nodes.size()) {
-            //match is found
-            finalMatch.set(trialMatches);
-            return;
-        }
-
-        Preconditions.checkState(nodes.size() > trialMatches.size());
-        Optional<Pair<Edge, TableRef>> toMatch = trialMatches.keySet().stream()
-                .map(t -> edgesFrom(t).stream().filter(e -> 
!trialMatches.containsKey(e.other(t))).findFirst()
-                        .map(edge -> new Pair<>(edge, 
edge.other(t))).orElse(null))
-                .filter(Objects::nonNull).findFirst();
-
-        Preconditions.checkState(toMatch.isPresent());
-        Edge toMatchQueryEdge = toMatch.get().getFirst();
-        TableRef toMatchQueryNode = toMatch.get().getSecond();
-        TableRef matchedQueryNode = toMatchQueryEdge.other(toMatchQueryNode);
-        TableRef matchedPatternNode = trialMatches.get(matchedQueryNode);
-
-        List<TableRef> toMatchPatternNodeCandidates = Lists.newArrayList();
-        for (Edge patternEdge : pattern.edgesFrom(matchedPatternNode)) {
-            TableRef toMatchPatternNode = 
patternEdge.other(matchedPatternNode);
-            if 
(!toMatchQueryNode.getTableIdentity().equals(toMatchPatternNode.getTableIdentity())
-                    || !toMatchQueryEdge.equals(patternEdge) || 
trialMatches.containsValue(toMatchPatternNode)
-                    || !checkInnerJoinNum(pattern, toMatchQueryNode, 
toMatchPatternNode, matchPartial)) {
-                continue;
-            }
-            toMatchPatternNodeCandidates.add(toMatchPatternNode);
-        }
-
-        for (TableRef toMatchPatternNode : toMatchPatternNodeCandidates) {
-            Map<TableRef, TableRef> newTrialMatches = Maps.newHashMap();
-            newTrialMatches.putAll(trialMatches);
-            newTrialMatches.put(toMatchQueryNode, toMatchPatternNode);
-            innerMatch(pattern, newTrialMatches, matchPartial, finalMatch);
-            if (finalMatch.get() != null) {
-                //get out of recursive invoke chain straightly
-                return;
-            }
-        }
-    }
-
-    public List<Edge> unmatched(JoinsGraph pattern) {
-        List<Edge> unmatched = Lists.newArrayList();
-        Set<Edge> all = 
edgesFromNode.values().stream().flatMap(List::stream).collect(Collectors.toSet());
-        for (Edge edge : all) {
-            List<JoinDesc> joins = getJoinsPathByPKSide(edge.right());
-            JoinsGraph subGraph = new JoinsGraph(center, joins);
-            if (subGraph.match(pattern, Maps.newHashMap())) {
-                continue;
-            }
-            unmatched.add(edge);
-        }
-        return unmatched;
-    }
-
-    private List<TableRef> searchCenterByIdentity(final TableRef table) {
-        // special case: several same nodes in a JoinGraph
-        return nodes.values().stream().filter(node -> 
node.getTableIdentity().equals(table.getTableIdentity()))
-                .filter(node -> {
-                    List<JoinDesc> path2Center = getJoinsPathByPKSide(node);
-                    return 
path2Center.stream().noneMatch(JoinDesc::isLeftJoin);
-                }).collect(Collectors.toList());
-    }
-
-    private List<Edge> edgesFrom(TableRef thisSide) {
-        return edgesFromNode.getOrDefault(thisSide, Lists.newArrayList());
-    }
-
-    public Map<String, String> matchAlias(JoinsGraph joinsGraph, KylinConfig 
kylinConfig) {
-        Map<String, String> matchAlias = Maps.newHashMap();
-        match(joinsGraph, matchAlias, 
kylinConfig.isQueryMatchPartialInnerJoinModel(),
-                kylinConfig.partialMatchNonEquiJoins());
-        return matchAlias;
-    }
-
-    public Map<String, String> matchAlias(JoinsGraph joinsGraph, boolean 
matchPartial) {
-        Map<String, String> matchAlias = Maps.newHashMap();
-        match(joinsGraph, matchAlias, matchPartial);
-        return matchAlias;
-    }
-
-    public List<Edge> getEdgesByFKSide(TableRef table) {
-        if (!edgesFromNode.containsKey(table)) {
-            return Lists.newArrayList();
-        }
-        return edgesFromNode.get(table).stream().filter(e -> 
e.isFkSide(table)).collect(Collectors.toList());
-    }
-
-    private Edge getEdgeByPKSide(TableRef table) {
-        if (!edgesToNode.containsKey(table)) {
-            return null;
-        }
-        List<Edge> edgesByPkSide = edgesToNode.get(table).stream().filter(e -> 
e.isPkSide(table))
-                .collect(Collectors.toList());
-        if (edgesByPkSide.isEmpty()) {
-            return null;
-        }
-        Preconditions.checkState(edgesByPkSide.size() == 1, "%s is allowed to 
be Join PK side once", table);
-        return edgesByPkSide.get(0);
-    }
-
-    public JoinDesc getJoinByPKSide(TableRef table) {
-        Edge edge = getEdgeByPKSide(table);
-        return edge != null ? edge.join : null;
-    }
-
-    private List<JoinDesc> getJoinsPathByPKSide(TableRef table) {
-        List<JoinDesc> pathToRoot = Lists.newArrayList();
-        TableRef pkSide = table; // start from leaf
-        while (pkSide != null) {
-            JoinDesc subJoin = getJoinByPKSide(pkSide);
-            if (subJoin != null) {
-                pathToRoot.add(subJoin);
-                pkSide = subJoin.getFKSide();
-            } else {
-                pkSide = null;
-            }
-        }
-        return Lists.reverse(pathToRoot);
-    }
-
-    public JoinsGraph getSubgraphByAlias(Set<String> aliasSets) {
-        TableRef subGraphRoot = this.center;
-        Set<JoinDesc> subGraphJoin = Sets.newHashSet();
-        for (String alias : aliasSets) {
-            subGraphJoin.addAll(getJoinsPathByPKSide(nodes.get(alias)));
-        }
-        return new JoinsGraph(subGraphRoot, Lists.newArrayList(subGraphJoin));
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder graphStrBuilder = new StringBuilder();
-        graphStrBuilder.append("Root: ").append(center);
-        List<Edge> nextEdges = getEdgesByFKSide(center);
-        nextEdges.forEach(e -> buildGraphStr(graphStrBuilder, e, 1));
-        return graphStrBuilder.toString();
-    }
-
-    private void buildGraphStr(StringBuilder sb, @NonNull Edge edge, int 
indent) {
-        sb.append('\n');
-        for (int i = 0; i < indent; i++) {
-            sb.append("  ");
-        }
-        sb.append(edge);
-        List<Edge> nextEdges = getEdgesByFKSide(edge.right());
-        nextEdges.forEach(e -> buildGraphStr(sb, e, indent + 1));
-    }
-}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/NDataModel.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/NDataModel.java
index 9c6209ec59..3d7cffbd17 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/NDataModel.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/NDataModel.java
@@ -55,6 +55,7 @@ import 
org.apache.kylin.common.scheduler.SchedulerEventNotifier;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.graph.JoinsGraph;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.model.util.ComputedColumnUtil;
 import org.apache.kylin.metadata.project.NProjectManager;
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/DefaultJoinEdgeMatcher.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/DefaultJoinEdgeMatcher.java
new file mode 100644
index 0000000000..a1e0402e45
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/DefaultJoinEdgeMatcher.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.model.graph;
+
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.metadata.model.ColumnDesc;
+
+import lombok.NonNull;
+
+public class DefaultJoinEdgeMatcher implements IJoinEdgeMatcher {
+
+    @Override
+    public boolean matches(@NonNull Edge join1, @NonNull Edge join2) {
+        if (join1.isLeftJoin() != join2.isLeftJoin() && 
!join1.isLeftOrInnerJoin() && !join2.isLeftOrInnerJoin()) {
+            return false;
+        }
+
+        if (!Objects.equals(join1.nonEquiJoinCondition, 
join2.nonEquiJoinCondition)) {
+            return false;
+        }
+
+        if (join1.isLeftJoin()) {
+            return columnDescEquals(join1.leftCols, join2.leftCols)
+                    && columnDescEquals(join1.rightCols, join2.rightCols);
+        } else {
+            return (columnDescEquals(join1.leftCols, join2.leftCols)
+                    && columnDescEquals(join1.rightCols, join2.rightCols))
+                    || (columnDescEquals(join1.leftCols, join2.rightCols)
+                            && columnDescEquals(join1.rightCols, 
join2.leftCols));
+        }
+    }
+
+    private boolean columnDescEquals(ColumnDesc[] a, ColumnDesc[] b) {
+        if (a.length != b.length) {
+            return false;
+        }
+
+        List<ColumnDesc> oneList = Lists.newArrayList(a);
+        List<ColumnDesc> anotherList = Lists.newArrayList(b);
+
+        for (ColumnDesc obj : oneList) {
+            anotherList.removeIf(dual -> columnDescEquals(obj, dual));
+        }
+        return anotherList.isEmpty();
+    }
+
+    protected boolean columnDescEquals(ColumnDesc a, ColumnDesc b) {
+        return Objects.equals(a, b);
+    }
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/Edge.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/Edge.java
new file mode 100644
index 0000000000..58fcdafe51
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/Edge.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.model.graph;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Objects;
+
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.NonEquiJoinCondition;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import lombok.Getter;
+import lombok.Setter;
+
+public class Edge implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public final JoinDesc join;
+    public final ColumnDesc[] leftCols;
+    public final ColumnDesc[] rightCols;
+    public final NonEquiJoinCondition nonEquiJoinCondition;
+    @Setter
+    private IJoinEdgeMatcher joinEdgeMatcher = new DefaultJoinEdgeMatcher();
+    @Setter
+    @Getter
+    private boolean swapJoin;
+
+    public Edge(JoinDesc join) {
+        this(join, false);
+    }
+
+    public Edge(JoinDesc join, boolean swapJoin) {
+        this.join = join;
+
+        int i = 0;
+        leftCols = new ColumnDesc[join.getForeignKeyColumns().length];
+        for (TblColRef colRef : join.getForeignKeyColumns()) {
+            leftCols[i++] = colRef.getColumnDesc();
+        }
+
+        i = 0;
+        rightCols = new ColumnDesc[join.getPrimaryKeyColumns().length];
+        for (TblColRef colRef : join.getPrimaryKeyColumns()) {
+            rightCols[i++] = colRef.getColumnDesc();
+        }
+
+        nonEquiJoinCondition = join.getNonEquiJoinCondition();
+        this.swapJoin = swapJoin;
+    }
+
+    public boolean isJoinMatched(JoinDesc other) {
+        return join.equals(other);
+    }
+
+    public boolean isNonEquiJoin() {
+        return nonEquiJoinCondition != null;
+    }
+
+    public boolean isLeftJoin() {
+        return !join.isLeftOrInnerJoin() && join.isLeftJoin();
+    }
+
+    public boolean isLeftOrInnerJoin() {
+        return join.isLeftOrInnerJoin();
+    }
+
+    public TableRef pkSide() {
+        return join.getPKSide();
+    }
+
+    public TableRef fkSide() {
+        return join.getFKSide();
+    }
+
+    public boolean isFkSide(TableRef tableRef) {
+        return fkSide().equals(tableRef);
+    }
+
+    public boolean isPkSide(TableRef tableRef) {
+        return pkSide().equals(tableRef);
+    }
+
+    public TableRef otherSide(TableRef tableRef) {
+        if (isFkSide(tableRef)) {
+            return pkSide();
+        } else if (isPkSide(tableRef)) {
+            return fkSide();
+        }
+        throw new IllegalArgumentException("table " + tableRef + " is not on 
the edge: " + this);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == null) {
+            return false;
+        }
+
+        if (this.getClass() != other.getClass()) {
+            return false;
+        }
+        return joinEdgeMatcher.matches(this, (Edge) other);
+    }
+
+    @Override
+    public int hashCode() {
+        if (this.isLeftJoin()) {
+            return Objects.hash(isLeftJoin(), leftCols, rightCols);
+        }
+        if (Arrays.hashCode(leftCols) < Arrays.hashCode(rightCols)) {
+            return Objects.hash(isLeftJoin(), leftCols, rightCols);
+        }
+        return Objects.hash(isLeftJoin(), rightCols, leftCols);
+    }
+
+    @Override
+    public String toString() {
+        // Edge: TableRef[TEST_KYLIN_FACT] LEFT JOIN TableRef[TEST_ORDER] ON 
[ORDER_ID] = [ORDER_ID]
+        return "Edge: " + join.getFKSide() + getJoinTypeStr() + 
join.getPKSide() + " ON "
+                + 
Arrays.toString(Arrays.stream(leftCols).map(ColumnDesc::getName).toArray()) + " 
= "
+                + 
Arrays.toString(Arrays.stream(rightCols).map(ColumnDesc::getName).toArray());
+    }
+
+    private String getJoinTypeStr() {
+        if (isLeftJoin()) {
+            return " LEFT JOIN ";
+        }
+        return isLeftOrInnerJoin() ? " LEFT OR INNER JOIN " : " INNER JOIN ";
+    }
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/IJoinEdgeMatcher.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/IJoinEdgeMatcher.java
new file mode 100644
index 0000000000..be5376b90a
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/IJoinEdgeMatcher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.model.graph;
+
+import java.io.Serializable;
+
+import lombok.NonNull;
+
+public interface IJoinEdgeMatcher extends Serializable {
+    boolean matches(@NonNull Edge join1, @NonNull Edge join2);
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/JoinsGraph.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/JoinsGraph.java
new file mode 100644
index 0000000000..b6342fd476
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/graph/JoinsGraph.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.model.graph;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.val;
+
+public class JoinsGraph implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @Getter
+    private final TableRef center;
+    @Getter
+    private final Map<String, TableRef> vertexMap = Maps.newLinkedHashMap();
+    private final Map<TableRef, VertexInfo<Edge>> vertexInfoMap = 
Maps.newHashMap();
+    private final Set<Edge> edges = Sets.newHashSet();
+
+    /**
+     * Creates a graph
+     */
+    public JoinsGraph(TableRef root, List<JoinDesc> joins) {
+        this(root, joins, true);
+    }
+
+    public JoinsGraph(TableRef root, List<JoinDesc> joins, boolean 
needSwapJoin) {
+        this.center = root;
+        addVertex(root);
+
+        List<Pair<JoinDesc, Boolean>> newJoinsPair = swapJoinDescs(joins, 
needSwapJoin);
+        for (Pair<JoinDesc, Boolean> pair : newJoinsPair) {
+            JoinDesc join = pair.getFirst();
+            Boolean isSwap = pair.getSecond();
+
+            
Preconditions.checkState(Arrays.stream(join.getForeignKeyColumns()).allMatch(TblColRef::isQualified));
+            
Preconditions.checkState(Arrays.stream(join.getPrimaryKeyColumns()).allMatch(TblColRef::isQualified));
+            addVertex(join.getPKSide());
+            addVertex(join.getFKSide());
+            addEdge(join, isSwap);
+        }
+        validate(joins);
+    }
+
+    static class VertexInfo<E> {
+        final List<E> outEdges = new ArrayList<>();
+        final List<E> inEdges = new ArrayList<>();
+    }
+
+    public void addVertex(TableRef table) {
+        if (vertexMap.containsKey(table.getAlias())) {
+            return;
+        }
+        vertexMap.put(table.getAlias(), table);
+        vertexInfoMap.computeIfAbsent(table, f -> new VertexInfo<>());
+    }
+
+    public void addEdge(JoinDesc join, boolean swapJoin) {
+        Edge edge = new Edge(join, swapJoin);
+        vertexInfoMap.get(join.getPKSide()).inEdges.add(edge);
+        vertexInfoMap.get(join.getFKSide()).outEdges.add(edge);
+        edges.add(edge);
+    }
+
+    private void validate(List<JoinDesc> joins) {
+        for (JoinDesc join : joins) {
+            TableRef fkSide = join.getFKSide();
+            Preconditions.checkNotNull(vertexMap.get(fkSide.getAlias()));
+            
Preconditions.checkState(vertexMap.get(fkSide.getAlias()).equals(fkSide));
+        }
+        Preconditions.checkState(vertexMap.size() == joins.size() + 1);
+    }
+
+    private List<Pair<JoinDesc, Boolean>> swapJoinDescs(List<JoinDesc> joins, 
boolean needSwapJoin) {
+        List<Pair<JoinDesc, Boolean>> newJoins = Lists.newArrayList();
+        for (JoinDesc join : joins) {
+            // add origin joinDesc
+            newJoins.add(Pair.newPair(join, false));
+            // inner / leftOrInner => swap joinDesc
+            if ((join.isInnerJoin() || join.isLeftOrInnerJoin()) && 
needSwapJoin) {
+                newJoins.add(Pair.newPair(swapJoinDesc(join), true));
+            }
+        }
+        return newJoins;
+    }
+
+    private JoinDesc swapJoinDesc(JoinDesc originJoinDesc) {
+        JoinDesc swapedJoinDesc = new JoinDesc();
+        swapedJoinDesc.setType(originJoinDesc.getType());
+        swapedJoinDesc.setPrimaryKey(originJoinDesc.getForeignKey());
+        swapedJoinDesc.setForeignKey(originJoinDesc.getPrimaryKey());
+        
swapedJoinDesc.setNonEquiJoinCondition(originJoinDesc.getNonEquiJoinCondition());
+        swapedJoinDesc.setPrimaryTable(originJoinDesc.getForeignTable());
+        swapedJoinDesc.setForeignTable(originJoinDesc.getPrimaryTable());
+        
swapedJoinDesc.setPrimaryKeyColumns(originJoinDesc.getForeignKeyColumns());
+        
swapedJoinDesc.setForeignKeyColumns(originJoinDesc.getPrimaryKeyColumns());
+        swapedJoinDesc.setPrimaryTableRef(originJoinDesc.getForeignTableRef());
+        swapedJoinDesc.setForeignTableRef(originJoinDesc.getPrimaryTableRef());
+        // swap left join, the current join must be convertible to LeftOrInner
+        swapedJoinDesc.setLeftOrInner(originJoinDesc.isLeftOrInnerJoin());
+        return swapedJoinDesc;
+    }
+
+    public void setJoinEdgeMatcher(IJoinEdgeMatcher joinEdgeMatcher) {
+        edges.forEach(edge -> edge.setJoinEdgeMatcher(joinEdgeMatcher));
+    }
+
+    public List<Edge> outwardEdges(TableRef table) {
+        return Lists.newArrayList(vertexInfoMap.get(table).outEdges);
+    }
+
+    public List<Edge> inwardEdges(TableRef table) {
+        return Lists.newArrayList(vertexInfoMap.get(table).inEdges);
+    }
+
+    public String getCenterTableIdentity() {
+        return center.getTableIdentity();
+    }
+
+    public boolean match(JoinsGraph pattern, Map<String, String> 
matchAliasMap) {
+        return match(pattern, matchAliasMap, false);
+    }
+
+    public boolean match(JoinsGraph pattern, Map<String, String> 
matchAliasMap, boolean matchPartial) {
+        return match(pattern, matchAliasMap, matchPartial, false);
+    }
+
+    public boolean match(JoinsGraph pattern, Map<String, String> 
matchAliasMap, boolean matchPartial,
+            boolean matchPartialNonEquiJoin) {
+        if (Objects.isNull(pattern) || Objects.isNull(pattern.center)) {
+            throw new IllegalArgumentException("pattern(model) should have a 
center: " + pattern);
+        }
+
+        List<TableRef> candidatesOfQCenter = 
searchCenter(pattern.getCenterTableIdentity());
+        for (TableRef candidateCenter : candidatesOfQCenter) {
+            List<Edge> unmatchedPatternOutEdges = Lists.newArrayList();
+            Map<TableRef, TableRef> allMatchedTableMap = Maps.newHashMap();
+            List<Pair<TableRef, TableRef>> toMatchTableList = 
Lists.newArrayList();
+            toMatchTableList.add(Pair.newPair(candidateCenter, 
pattern.center));
+
+            match0(pattern, toMatchTableList, unmatchedPatternOutEdges, 
allMatchedTableMap);
+
+            if (!toMatchTableList.isEmpty() || allMatchedTableMap.size() != 
this.vertexMap.size()) {
+                continue;
+            }
+
+            // There are three cases of match, the first two cases are exactly 
match.
+            // 1. pattern out edges is matched.
+            // 2. unmatched out edges of pattern is not empty, but all of them 
are left join.
+            // 3. unmatched out edges of pattern is not empty, but match 
partial.
+            if ((unmatchedPatternOutEdges.isEmpty() || 
unmatchedPatternOutEdges.stream().allMatch(Edge::isLeftJoin)
+                    || matchPartial) && !allMatchedTableMap.isEmpty()
+                    && (matchPartialNonEquiJoin || 
checkNonEquiJoinMatches(allMatchedTableMap, pattern))) {
+                matchAliasMap.clear();
+                matchAliasMap.putAll(allMatchedTableMap.entrySet().stream()
+                        .collect(Collectors.toMap(e -> e.getKey().getAlias(), 
e -> e.getValue().getAlias())));
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Match this joinGraph with a pattern's joinGraph. All matched tables 
will be put
+     * into the parameter {@code  matchedTableMap}. The pattern's unmatched 
out edges
+     * will be put into the parameter {@code unmatchedOutEdgesOfP}.
+     *
+     * @param pattern                    the joinGraph to compare, usually the 
model's joinGraph.
+     * @param toMatchTableList           tables to match, the keys from this 
graph, the values from the joinGraph of pattern.
+     * @param unmatchedPatternOutEdges unmatched out edges of the pattern.
+     * @param matchedTableMap            All matched tables come from the 
toMatchMap.
+     */
+    private void match0(JoinsGraph pattern, List<Pair<TableRef, TableRef>> 
toMatchTableList,
+            List<Edge> unmatchedPatternOutEdges, Map<TableRef, TableRef> 
matchedTableMap) {
+
+        // The toMatchTableList will add nodes to be matched in the loop.
+        // Traverse toMatchTableList to match each table
+        for (int i = 0; i < toMatchTableList.size(); i++) {
+
+            Pair<TableRef, TableRef> pair = toMatchTableList.get(i);
+            TableRef queryFKSide = pair.getFirst();
+            TableRef patternFKSide = pair.getSecond();
+            List<Edge> queryOutEdges = this.outwardEdges(queryFKSide);
+            Set<Edge> patternOutEdges = 
Sets.newHashSet(pattern.outwardEdges(patternFKSide));
+
+            // Traverse queryOutEdgesIter to match each edge from 
patternOutEdges
+            // Edges that match successfully will be deleted from 
queryOutEdges and patternOutEdges
+            // When all outgoing edges of the query table can be matched in 
the outgoing edges of the pattern table
+            Iterator<Edge> queryOutEdgesIter = queryOutEdges.iterator();
+            while (queryOutEdgesIter.hasNext()) {
+
+                Edge queryOutEdge = queryOutEdgesIter.next();
+                TableRef queryPKSide = queryOutEdge.otherSide(queryFKSide);
+                Edge matchedPatternEdge = findOutEdgeFromDualTable(pattern, 
patternOutEdges, queryPKSide, queryOutEdge);
+                boolean patternEdgeNotMatch = 
Objects.isNull(matchedPatternEdge);
+
+                // query:   A leftOrInner B (left or inner -> leftOrInner)
+                // pattern: A left B
+                // edgeBA is not found in patternOutEdges, but this case can 
be matched
+                if (queryOutEdge.isLeftOrInnerJoin() && 
(patternOutEdges.isEmpty() || patternEdgeNotMatch)) {
+                    queryOutEdgesIter.remove();
+                } else {
+                    // can't find the same edge
+                    if (patternEdgeNotMatch) {
+                        break;
+                    }
+                    queryOutEdgesIter.remove();
+                    patternOutEdges.remove(matchedPatternEdge);
+                    // both out edge pk side tables add to toMatchTableList 
for the next round of the loop
+                    TableRef patternPKSide = 
matchedPatternEdge.otherSide(patternFKSide);
+                    addIfAbsent(toMatchTableList, Pair.newPair(queryPKSide, 
patternPKSide));
+                }
+            }
+
+            // All queryOutEdges are not matched in patternOutEdges
+            if (CollectionUtils.isNotEmpty(queryOutEdges)) {
+                break;
+            }
+            // All queryOutEdges are successfully matched in patternOutEdges.
+            // put queryFKSide and patternFKSide to matchedTableMap
+            matchedTableMap.put(queryFKSide, patternFKSide);
+            // The remaining edges in patternOutEdges are all put into 
unmatchedPatternOutEdges
+            unmatchedPatternOutEdges.addAll(patternOutEdges);
+        }
+        // intersect the table to be matched with the table already matched
+        val matchedList = matchedTableMap.entrySet().stream().map(e -> 
Pair.newPair(e.getKey(), e.getValue()))
+                .collect(Collectors.toList());
+        toMatchTableList.removeAll(matchedList);
+    }
+
+    /**
+     * Find the out edge from the dual {@code TableRef}.
+     *
+     * @return the dual table's out edge in joinGraph of pattern.
+     */
+    private Edge findOutEdgeFromDualTable(JoinsGraph pattern, Set<Edge> 
patternOutEdges, TableRef queryPKSide,
+            Edge queryOutEdge) {
+        Set<Edge> matchedEdges = patternOutEdges.stream()
+                .filter(outPatternEdge -> 
StringUtils.equals(queryPKSide.getTableIdentity(),
+                        outPatternEdge.pkSide().getTableIdentity()) && 
queryOutEdge.equals(outPatternEdge))
+                .collect(Collectors.toSet());
+        if (matchedEdges.size() == 1) {
+            return matchedEdges.iterator().next();
+        }
+        for (Edge matchedEdge : matchedEdges) {
+            TableRef patternPKSide = matchedEdge.pkSide();
+            int queryOutEdgeSize = 
this.vertexInfoMap.get(queryPKSide).outEdges.size();
+            int patternOutEdgeSize = 
pattern.vertexInfoMap.get(patternPKSide).outEdges.size();
+            if (queryOutEdgeSize != patternOutEdgeSize) {
+                continue;
+            }
+            return matchedEdge;
+        }
+        return null;
+    }
+
+    /**
+     * Sometimes one table may be joined more than one time.
+     *
+     * @param tableIdentity table identity(db.table_name) to search
+     * @return desired references of the input table identity
+     */
+    private List<TableRef> searchCenter(String tableIdentity) {
+        return vertexInfoMap.keySet().stream()
+                .filter(table -> StringUtils.equals(table.getTableIdentity(), 
tableIdentity))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Check if any non-equi join is missed in the pattern
+     * if so, we cannot match the current graph with the pattern graph.
+     * set `kylin.query.match-partial-non-equi-join-model` to skip this 
checking
+     */
+    public boolean checkNonEquiJoinMatches(Map<TableRef, TableRef> matches, 
JoinsGraph pattern) {
+        for (Map.Entry<TableRef, VertexInfo<Edge>> entry : 
pattern.vertexInfoMap.entrySet()) {
+            TableRef table = entry.getKey();
+            List<Edge> outEdges = entry.getValue().outEdges;
+            // for all outgoing non-equi join edges
+            // if there is no match found for the right side table in the 
current graph
+            // return false
+            for (Edge outgoingEdge : outEdges) {
+                if (outgoingEdge.isNonEquiJoin()
+                        && (!matches.containsValue(table) || 
!matches.containsValue(outgoingEdge.pkSide()))) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    public List<TableRef> getAllTblRefNodes() {
+        return Lists.newArrayList(vertexMap.values());
+    }
+
+    /**
+     * Normalize joinsGraph. <p>
+     * 1. Find a path to the Inner or LeftOrInner edge. <p>
+     * 2. Recursively change all Left edges on the path to LeftOrInner edges.
+     * <p>
+     * Example:<p>
+     * query1: A Left B, B Left C, C Inner D        =>  A LeftOrInner B, B 
LeftOrInner C, C Inner D <p>
+     * query2: A Left B, B Left C, C LeftOrInner D  =>  A LeftOrInner B, B 
LeftOrInner C, C LeftOrInner D
+     * <p>
+     * If the PK table of Left Join has a non-null filter condition, then this 
Left Join has the same semantics as Inner Join <p>
+     * query: A Left B ON A.a = B.b Where B.c1 IS NOT NULL   => A LeftOrInner B
+     */
+    public void normalize() {
+        Set<Edge> edgeSet = edges.stream().filter(e -> 
!e.isSwapJoin()).collect(Collectors.toSet());
+        for (Edge edge : edgeSet) {
+            if (!edge.isLeftJoin() || edge.isLeftOrInnerJoin()) {
+                TableRef fkSide = edge.fkSide();
+                List<Edge> edgeList = inwardEdges(fkSide);
+                if (CollectionUtils.isEmpty(edgeList)) {
+                    continue;
+                }
+                for (Edge targetEdge : edgeList) {
+                    if (!edge.equals(targetEdge) && 
fkSide.equals(targetEdge.pkSide())
+                            && !targetEdge.isLeftOrInnerJoin() && 
targetEdge.isLeftJoin()) {
+                        setJoinToLeftOrInner(targetEdge.join);
+                        normalize();
+                    }
+                }
+            }
+        }
+    }
+
+    public void setJoinToLeftOrInner(JoinDesc join) {
+        if (!join.isLeftJoin()) {
+            join.setLeftOrInner(true);
+            // inner -> leftOrInner, need swap another join
+            Edge swapEdge = edges.stream().filter(e -> 
e.isJoinMatched(swapJoinDesc(join))).findFirst().orElse(null);
+            if (swapEdge == null) {
+                return;
+            }
+            swapEdge.join.setLeftOrInner(true);
+            return;
+        }
+
+        Edge edge = edges.stream().filter(e -> 
e.isJoinMatched(join)).findFirst().orElse(null);
+        if (edge == null) {
+            return;
+        }
+
+        // change current join from left join to leftOrInner join
+        join.setLeftOrInner(true);
+        // left -> leftOrInner need swap join to create new edge
+        JoinDesc swapJoin = swapJoinDesc(join);
+        Edge swapEdge = new Edge(swapJoin, true);
+        vertexInfoMap.computeIfAbsent(swapEdge.pkSide(), f -> new 
VertexInfo<>());
+        vertexInfoMap.computeIfAbsent(swapEdge.fkSide(), f -> new 
VertexInfo<>());
+        addIfAbsent(vertexInfoMap.get(swapEdge.fkSide()).outEdges, swapEdge);
+        addIfAbsent(vertexInfoMap.get(swapEdge.pkSide()).inEdges, swapEdge);
+        if (edges.stream().noneMatch(e -> e.isJoinMatched(swapJoin))) {
+            edges.add(swapEdge);
+        }
+    }
+
+    public Map<String, String> matchAlias(JoinsGraph joinsGraph, KylinConfig 
kylinConfig) {
+        Map<String, String> matchAliasMap = Maps.newHashMap();
+        match(joinsGraph, matchAliasMap, 
kylinConfig.isQueryMatchPartialInnerJoinModel(),
+                kylinConfig.partialMatchNonEquiJoins());
+        return matchAliasMap;
+    }
+
+    public Map<String, String> matchAlias(JoinsGraph joinsGraph, boolean 
matchPartial) {
+        Map<String, String> matchAliasMap = Maps.newHashMap();
+        match(joinsGraph, matchAliasMap, matchPartial);
+        return matchAliasMap;
+    }
+
+    public JoinDesc getJoinByPKSide(TableRef pkTable) {
+        Edge edge = getEdgeByPKSide(pkTable);
+        return Objects.nonNull(edge) ? edge.join : null;
+    }
+
+    public List<Edge> getEdgesByFKSide(TableRef table) {
+        if (!vertexInfoMap.containsKey(table)) {
+            return Collections.emptyList();
+        }
+        return outwardEdges(table);
+    }
+
+    /**
+     * Get edges by primary key side at most get one edge.
+     *
+     * @param pkTable the pkSide table
+     * @return the obtained edge
+     */
+    private Edge getEdgeByPKSide(TableRef pkTable) {
+        if (!vertexInfoMap.containsKey(pkTable)) {
+            return null;
+        }
+        List<Edge> inEdges = inwardEdges(pkTable).stream().filter(edge -> 
!edge.isSwapJoin())
+                .collect(Collectors.toList());
+        if (inEdges.size() != 1) {
+            return null;
+        }
+        return inEdges.get(0);
+    }
+
+    public List<JoinDesc> getJoinsPathByPKSide(TableRef table) {
+        List<JoinDesc> pathToRoot = Lists.newArrayList();
+        TableRef pkSide = table;
+        while (pkSide != null) {
+            JoinDesc subJoin = getJoinByPKSide(pkSide);
+            if (Objects.isNull(subJoin)) {
+                pkSide = null;
+            } else {
+                pathToRoot.add(subJoin);
+                pkSide = subJoin.getFKSide();
+            }
+        }
+        return Lists.reverse(pathToRoot);
+    }
+
+    public JoinsGraph getSubGraphByAlias(Set<String> aliasSets) {
+        Set<JoinDesc> subJoins = Sets.newHashSet();
+        for (String alias : aliasSets) {
+            TableRef table = vertexMap.get(alias);
+            subJoins.addAll(getJoinsPathByPKSide(table));
+        }
+        return new JoinsGraph(this.center, Lists.newArrayList(subJoins));
+    }
+
+    public List<Edge> unmatched(JoinsGraph pattern) {
+        List<Edge> unmatched = Lists.newArrayList();
+        Set<Edge> all = vertexInfoMap.values().stream().map(m -> 
m.outEdges).flatMap(List::stream)
+                .collect(Collectors.toSet());
+        for (Edge edge : all) {
+            JoinsGraph subGraph = 
getSubGraphByAlias(Sets.newHashSet(edge.pkSide().getAlias()));
+            if (!subGraph.match(pattern, Maps.newHashMap())) {
+                unmatched.add(edge);
+            }
+        }
+        return unmatched;
+    }
+
+    /**
+     * Root: TableRef[TEST_KYLIN_FACT]
+     *   Edge: TableRef[TEST_KYLIN_FACT] LEFT JOIN TableRef[TEST_ORDER] ON 
[ORDER_ID] = [ORDER_ID]
+     *     Edge: TableRef[TEST_ORDER] LEFT JOIN 
TableRef[BUYER_ACCOUNT:TEST_ACCOUNT] ON [BUYER_ID] = [ACCOUNT_ID]
+     *   Edge: TableRef[TEST_KYLIN_FACT] LEFT JOIN 
TableRef[TEST_SELLER_TYPE_DIM] ON [SLR_SEGMENT_CD] = [SELLER_TYPE_CD]
+     */
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Root: ").append(center);
+        // build next edges
+        getEdgesByFKSide(center).forEach(e -> buildGraphStr(sb, e, 1));
+        return sb.toString();
+    }
+
+    private void buildGraphStr(StringBuilder sb, @NonNull Edge edge, int 
indent) {
+        sb.append(IntStream.range(0, indent).mapToObj(i -> "  ")
+                .collect(Collectors.joining("", "\n", String.valueOf(edge))));
+        // build next edges
+        getEdgesByFKSide(edge.pkSide()).forEach(e -> buildGraphStr(sb, e, 
indent + 1));
+    }
+
+    private <T> void addIfAbsent(List<T> edges, T edge) {
+        if (edges.contains(edge)) {
+            return;
+        }
+        edges.add(edge);
+    }
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/util/ComputedColumnUtil.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/util/ComputedColumnUtil.java
index 492a05723e..773afd993d 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/util/ComputedColumnUtil.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/util/ComputedColumnUtil.java
@@ -51,12 +51,12 @@ import org.apache.kylin.metadata.model.BadModelException;
 import org.apache.kylin.metadata.model.BadModelException.CauseType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ComputedColumnDesc;
-import org.apache.kylin.metadata.model.JoinsGraph;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.alias.AliasDeduce;
 import org.apache.kylin.metadata.model.alias.AliasMapping;
 import org.apache.kylin.metadata.model.alias.ExpressionComparator;
+import org.apache.kylin.metadata.model.graph.JoinsGraph;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -329,7 +329,7 @@ public class ComputedColumnUtil {
         if (cc.getTableAlias() != null) {
             aliasSets.add(cc.getTableAlias());
         }
-        return model.getJoinsGraph().getSubgraphByAlias(aliasSets);
+        return model.getJoinsGraph().getSubGraphByAlias(aliasSets);
     }
 
     public static boolean isSameName(ComputedColumnDesc col1, 
ComputedColumnDesc col2) {
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsGraphTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsGraphTest.java
index 4ec5d9d145..65a359e63f 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsGraphTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/JoinsGraphTest.java
@@ -18,13 +18,19 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.function.Supplier;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.metadata.model.graph.DefaultJoinEdgeMatcher;
+import org.apache.kylin.metadata.model.graph.JoinsGraph;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.junit.Assert;
 import org.junit.Before;
@@ -54,13 +60,13 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
                 .innerJoin(new String[] { "TEST_ORDER.ORDER_ID" }, new 
String[] { "TEST_KYLIN_FACT.ORDER_ID" }).build();
         JoinsGraph factIJOrderGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                 .innerJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" }).build();
-        Assert.assertTrue(orderIJFactGraph.match(factIJOrderGraph, new 
HashMap<String, String>()));
+        Assert.assertTrue(orderIJFactGraph.match(factIJOrderGraph, new 
HashMap<>()));
 
         JoinsGraph orderLJfactGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_ORDER")
                 .leftJoin(new String[] { "TEST_ORDER.ORDER_ID" }, new String[] 
{ "TEST_KYLIN_FACT.ORDER_ID" }).build();
         JoinsGraph factLJorderGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                 .leftJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" }).build();
-        Assert.assertFalse(orderLJfactGraph.match(factLJorderGraph, new 
HashMap<String, String>()));
+        Assert.assertFalse(orderLJfactGraph.match(factLJorderGraph, new 
HashMap<>()));
     }
 
     @Test
@@ -74,10 +80,10 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
                 .innerJoin(new String[] { "TEST_ORDER.ORDER_ID" }, new 
String[] { "BUYER_ACCOUNT.ACCOUNT_ID" })
                 .innerJoin(new String[] { "BUYER_ACCOUNT.ACCOUNT_COUNTRY" }, 
new String[] { "BUYER_COUNTRY.COUNTRY" })
                 .build();
-        Assert.assertFalse(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<String, String>(),
+        Assert.assertFalse(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<>(),
                 
KylinConfig.getInstanceFromEnv().isQueryMatchPartialInnerJoinModel()));
         overwriteSystemProp("kylin.query.match-partial-inner-join-model", 
"true");
-        Assert.assertTrue(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<String, String>(),
+        Assert.assertTrue(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<>(),
                 
KylinConfig.getInstanceFromEnv().isQueryMatchPartialInnerJoinModel()));
     }
 
@@ -92,7 +98,7 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
                 .innerJoin(new String[] { "TEST_ORDER.ORDER_ID" }, new 
String[] { "BUYER_ACCOUNT.ACCOUNT_ID" })
                 .innerJoin(new String[] { "BUYER_ACCOUNT.ACCOUNT_COUNTRY" }, 
new String[] { "BUYER_COUNTRY.COUNTRY" })
                 .build();
-        Assert.assertFalse(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<String, String>(),
+        Assert.assertFalse(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<>(),
                 
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject("default").getConfig()
                         .isQueryMatchPartialInnerJoinModel()));
 
@@ -102,7 +108,7 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
             }
         });
 
-        Assert.assertTrue(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<String, String>(),
+        Assert.assertTrue(innerJoinGraph.match(innerAndInnerJoinGraph, new 
HashMap<>(),
                 
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject("default").getConfig()
                         .isQueryMatchPartialInnerJoinModel()));
 
@@ -116,7 +122,8 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
 
     private void overrideProjectConfig(Map<String, String> overrideKylinProps) 
{
         
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).updateProject("default",
 copyForWrite -> {
-            
copyForWrite.getOverrideKylinProps().putAll(KylinConfig.trimKVFromMap(overrideKylinProps));
+            LinkedHashMap<String, String> map = 
KylinConfig.trimKVFromMap(overrideKylinProps);
+            copyForWrite.getOverrideKylinProps().putAll(map);
         });
     }
 
@@ -135,8 +142,8 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
                 .innerJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new 
String[] { "BUYER_ACCOUNT.ACCOUNT_ID" })
                 .innerJoin(new String[] { "BUYER_ACCOUNT.ACCOUNT_COUNTRY" }, 
new String[] { "BUYER_COUNTRY.COUNTRY" })
                 .build();
-        Assert.assertTrue(graph1.match(graph2, new HashMap<String, String>()));
-        Assert.assertTrue(graph2.match(graph1, new HashMap<String, String>()));
+        Assert.assertTrue(graph1.match(graph2, new HashMap<>()));
+        Assert.assertTrue(graph2.match(graph1, new HashMap<>()));
 
         JoinsGraph graph3 = new MockJoinGraphBuilder(modelDesc, "TEST_ORDER")
                 .leftJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new String[] 
{ "BUYER_ACCOUNT.ACCOUNT_ID" })
@@ -147,8 +154,8 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
                 .leftJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new String[] 
{ "BUYER_ACCOUNT.ACCOUNT_ID" })
                 .leftJoin(new String[] { "BUYER_ACCOUNT.ACCOUNT_COUNTRY" }, 
new String[] { "BUYER_COUNTRY.COUNTRY" })
                 .build();
-        Assert.assertTrue(graph3.match(graph4, new HashMap<String, String>()));
-        Assert.assertTrue(graph4.match(graph3, new HashMap<String, String>()));
+        Assert.assertTrue(graph3.match(graph4, new HashMap<>()));
+        Assert.assertTrue(graph4.match(graph3, new HashMap<>()));
     }
 
     @Test
@@ -158,24 +165,24 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
         JoinsGraph modelJoinsGraph = modelDesc.getJoinsGraph();
 
         JoinsGraph singleTblGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT").build();
-        Assert.assertTrue(modelJoinsGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
-        Assert.assertTrue(singleTblGraph.match(singleTblGraph, new 
HashMap<String, String>()));
-        Assert.assertTrue(singleTblGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
-        Assert.assertFalse(modelJoinsGraph.match(singleTblGraph, new 
HashMap<String, String>()));
+        Assert.assertTrue(modelJoinsGraph.match(modelJoinsGraph, new 
HashMap<>()));
+        Assert.assertTrue(singleTblGraph.match(singleTblGraph, new 
HashMap<>()));
+        Assert.assertTrue(singleTblGraph.match(modelJoinsGraph, new 
HashMap<>()));
+        Assert.assertFalse(modelJoinsGraph.match(singleTblGraph, new 
HashMap<>()));
 
         JoinsGraph noFactGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_ORDER")
                 .leftJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new String[] 
{ "BUYER_ACCOUNT.ACCOUNT_ID" }).build();
-        Assert.assertFalse(noFactGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
+        Assert.assertFalse(noFactGraph.match(modelJoinsGraph, new 
HashMap<>()));
 
         JoinsGraph factJoinGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                 .leftJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" })
                 .leftJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new String[] 
{ "BUYER_ACCOUNT.ACCOUNT_ID" }).build();
-        Assert.assertTrue(factJoinGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
+        Assert.assertTrue(factJoinGraph.match(modelJoinsGraph, new 
HashMap<>()));
 
         JoinsGraph joinedFactGraph = new MockJoinGraphBuilder(modelDesc, 
"BUYER_ACCOUNT")
                 .leftJoin(new String[] { "BUYER_ACCOUNT.ACCOUNT_ID" }, new 
String[] { "TEST_ORDER.BUYER_ID" })
                 .leftJoin(new String[] { "TEST_ORDER.ORDER_ID" }, new String[] 
{ "TEST_KYLIN_FACT.ORDER_ID" }).build();
-        Assert.assertFalse(joinedFactGraph.match(factJoinGraph, new 
HashMap<String, String>()));
+        Assert.assertFalse(joinedFactGraph.match(factJoinGraph, new 
HashMap<>()));
     }
 
     @Test
@@ -185,24 +192,24 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
         JoinsGraph modelJoinsGraph = modelDesc.getJoinsGraph();
 
         JoinsGraph singleTblGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT").build();
-        Assert.assertTrue(modelJoinsGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
-        Assert.assertTrue(singleTblGraph.match(singleTblGraph, new 
HashMap<String, String>()));
-        Assert.assertFalse(singleTblGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
-        Assert.assertFalse(modelJoinsGraph.match(singleTblGraph, new 
HashMap<String, String>()));
+        Assert.assertTrue(modelJoinsGraph.match(modelJoinsGraph, new 
HashMap<>()));
+        Assert.assertTrue(singleTblGraph.match(singleTblGraph, new 
HashMap<>()));
+        Assert.assertFalse(singleTblGraph.match(modelJoinsGraph, new 
HashMap<>()));
+        Assert.assertFalse(modelJoinsGraph.match(singleTblGraph, new 
HashMap<>()));
 
         JoinsGraph noFactGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_ORDER")
                 .innerJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new 
String[] { "BUYER_ACCOUNT.ACCOUNT_ID" }).build();
-        Assert.assertFalse(noFactGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
+        Assert.assertFalse(noFactGraph.match(modelJoinsGraph, new 
HashMap<>()));
 
         JoinsGraph factJoinGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                 .innerJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" })
                 .innerJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new 
String[] { "BUYER_ACCOUNT.ACCOUNT_ID" }).build();
-        Assert.assertFalse(factJoinGraph.match(modelJoinsGraph, new 
HashMap<String, String>()));
+        Assert.assertFalse(factJoinGraph.match(modelJoinsGraph, new 
HashMap<>()));
 
         JoinsGraph joinedFactGraph = new MockJoinGraphBuilder(modelDesc, 
"BUYER_ACCOUNT")
                 .innerJoin(new String[] { "BUYER_ACCOUNT.ACCOUNT_ID" }, new 
String[] { "TEST_ORDER.BUYER_ID" })
                 .innerJoin(new String[] { "TEST_ORDER.ORDER_ID" }, new 
String[] { "TEST_KYLIN_FACT.ORDER_ID" }).build();
-        Assert.assertTrue(joinedFactGraph.match(factJoinGraph, new 
HashMap<String, String>()));
+        Assert.assertTrue(joinedFactGraph.match(factJoinGraph, new 
HashMap<>()));
     }
 
     @Test
@@ -214,7 +221,7 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
         JoinsGraph factJoinGraph = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                 .innerJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" })
                 .innerJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new 
String[] { "BUYER_ACCOUNT.ACCOUNT_ID" }).build();
-        Assert.assertTrue(factJoinGraph.match(modelJoinsGraph, new 
HashMap<String, String>(), true));
+        Assert.assertTrue(factJoinGraph.match(modelJoinsGraph, new 
HashMap<>(), true));
     }
 
     @Test
@@ -229,7 +236,7 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
             JoinsGraph graph2 = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                     .leftJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" })
                     .nonEquiLeftJoin("BUYER_ACCOUNT", "TEST_ORDER", 
"TEST_ORDER.BUYER_ID").build();
-            Assert.assertTrue(graph1.match(graph2, new HashMap<String, 
String>()));
+            Assert.assertTrue(graph1.match(graph2, new HashMap<>()));
         }
 
         {
@@ -240,7 +247,7 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
                     .leftJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" })
                     .leftJoin(new String[] { "TEST_ORDER.BUYER_ID" }, new 
String[] { "SELLER_ACCOUNT.ACCOUNT_ID" })
                     .nonEquiLeftJoin("BUYER_ACCOUNT", "TEST_ORDER", 
"TEST_ORDER.BUYER_ID").build();
-            Assert.assertTrue(graph1.match(graph2, new HashMap<String, 
String>()));
+            Assert.assertTrue(graph1.match(graph2, new HashMap<>()));
         }
 
         {
@@ -250,7 +257,7 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
             JoinsGraph graph2 = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                     .leftJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" })
                     .nonEquiLeftJoin("BUYER_ACCOUNT", "TEST_ORDER", 
"TEST_ORDER.BUYER_ID").build();
-            Assert.assertFalse(graph1.match(graph2, new HashMap<String, 
String>()));
+            Assert.assertFalse(graph1.match(graph2, new HashMap<>()));
         }
 
         {
@@ -258,7 +265,7 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
             JoinsGraph graph2 = new MockJoinGraphBuilder(modelDesc, 
"TEST_KYLIN_FACT")
                     .leftJoin(new String[] { "TEST_KYLIN_FACT.ORDER_ID" }, new 
String[] { "TEST_ORDER.ORDER_ID" })
                     .nonEquiLeftJoin("BUYER_ACCOUNT", "TEST_ORDER", 
"TEST_ORDER.BUYER_ID").build();
-            Assert.assertFalse(graph1.match(graph2, new HashMap<String, 
String>()));
+            Assert.assertFalse(graph1.match(graph2, new HashMap<>()));
         }
 
         {
@@ -339,4 +346,26 @@ public class JoinsGraphTest extends 
NLocalFileMetadataTestCase {
             Assert.assertTrue(graph1.match(graph2, matchesMapSupplier.get(), 
true, true));
         }
     }
+
+    @Test
+    public void testColumnDescEquals() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+        NTableMetadataManager manager = 
NTableMetadataManager.getInstance(getTestConfig(), "default");
+        TableDesc tableDesc = manager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+        DefaultJoinEdgeMatcher matcher = new DefaultJoinEdgeMatcher();
+        ColumnDesc one = new ColumnDesc();
+        one.setTable(tableDesc);
+        one.setName("one");
+        ColumnDesc two = new ColumnDesc();
+        two.setTable(tableDesc);
+        two.setName("two");
+        ColumnDesc copyOfOne = new ColumnDesc(one);
+        ColumnDesc copyOfTwo = new ColumnDesc(two);
+        ColumnDesc[] a = new ColumnDesc[] { one, two };
+        ColumnDesc[] b = new ColumnDesc[] { copyOfTwo, copyOfOne };
+        Method method = 
matcher.getClass().getDeclaredMethod("columnDescEquals", ColumnDesc[].class,
+                ColumnDesc[].class);
+        Unsafe.changeAccessibleObject(method, true);
+        Object invoke = method.invoke(matcher, a, b);
+        Assert.assertEquals(true, invoke);
+    }
 }
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/MockJoinGraphBuilder.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/MockJoinGraphBuilder.java
index 927bc5408c..e47dc65f3a 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/MockJoinGraphBuilder.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/MockJoinGraphBuilder.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.calcite.sql.JoinType;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.kylin.metadata.model.graph.JoinsGraph;
 import org.junit.Assert;
 
 import com.google.common.collect.Lists;
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 9713500115..9473509173 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -43,11 +43,11 @@ import 
org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
 import org.apache.kylin.metadata.cube.realization.HybridRealization;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.JoinsGraph;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.graph.JoinsGraph;
 import org.apache.kylin.metadata.query.NativeQueryRealization;
 import org.apache.kylin.metadata.query.QueryMetrics;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -535,7 +535,7 @@ public class OLAPContext {
     }
 
     public void matchJoinWithEnhancementTransformation() {
-        this.setJoinsGraph(JoinsGraph.normalizeJoinGraph(joinsGraph));
+        joinsGraph.normalize();
     }
 
     public RexInputRef createUniqueInputRefContextTables(OLAPTableScan table, 
int columnIdx) {
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
index 54f89276bb..21908533a2 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java
@@ -79,7 +79,6 @@ import org.apache.kylin.metadata.model.FusionModelManager;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
-import org.apache.kylin.metadata.model.JoinsGraph;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
@@ -88,6 +87,7 @@ import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.graph.JoinsGraph;
 import org.apache.kylin.metadata.project.NProjectLoader;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
index 5e7bc10c46..8cc71a523c 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryAliasMatcher.java
@@ -50,12 +50,13 @@ import 
org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.model.ColExcludedChecker;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.JoinsGraph;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.alias.ExpressionComparator;
+import org.apache.kylin.metadata.model.graph.DefaultJoinEdgeMatcher;
+import org.apache.kylin.metadata.model.graph.JoinsGraph;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.relnode.ColumnRowType;
@@ -239,7 +240,7 @@ public class QueryAliasMatcher {
         return null;
     }
 
-    private static class CCJoinEdgeMatcher extends 
JoinsGraph.DefaultJoinEdgeMatcher {
+    private static class CCJoinEdgeMatcher extends DefaultJoinEdgeMatcher {
         transient QueryAliasMatchInfo matchInfo;
         boolean compareCCExpr;
 

Reply via email to