eldenmoon commented on code in PR #39468: URL: https://github.com/apache/doris/pull/39468#discussion_r1758108723
########## fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java: ########## @@ -159,40 +219,258 @@ public static void directExecuteShortCircuitQuery(StmtExecutor executor, .getMysqlChannel(), null, null); } - private static void updateScanNodeConjuncts(OlapScanNode scanNode, List<Expr> conjunctVals) { - for (int i = 0; i < conjunctVals.size(); ++i) { - BinaryPredicate binaryPredicate = (BinaryPredicate) scanNode.getConjuncts().get(i); - if (binaryPredicate.getChild(0) instanceof LiteralExpr) { - binaryPredicate.setChild(0, conjunctVals.get(i)); - } else if (binaryPredicate.getChild(1) instanceof LiteralExpr) { - binaryPredicate.setChild(1, conjunctVals.get(i)); + /* + * Interface for handling different conjunct types + */ + private interface ConjunctHandler { + int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException; + } + + private static class InPredicateHandler implements ConjunctHandler { + public static final InPredicateHandler INSTANCE = new InPredicateHandler(); + + private InPredicateHandler() { + } + + @Override + public int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException { + InPredicate inPredicate = (InPredicate) expr; + if (inPredicate.isNotIn()) { + throw new AnalysisException("Not support NOT IN predicate in point query"); + } + for (int j = 1; j < inPredicate.getChildren().size(); ++j) { + if (inPredicate.getChild(j) instanceof LiteralExpr) { + inPredicate.setChild(j, conjunctVals.get(handledConjunctVals++)); + } else { + Preconditions.checkState(false, "Should contains literal in " + inPredicate.toSqlImpl()); + } + } + return handledConjunctVals; + } + } + + private static class BinaryPredicateHandler implements ConjunctHandler { + public static final BinaryPredicateHandler INSTANCE = new BinaryPredicateHandler(); + + private BinaryPredicateHandler() { + } + + @Override + public int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException { + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; + Expr left = binaryPredicate.getChild(0); + Expr right = binaryPredicate.getChild(1); + + if (isDeleteSign(left) || isDeleteSign(right)) { + return handledConjunctVals; + } + + if (isLiteralExpr(left)) { + binaryPredicate.setChild(0, conjunctVals.get(handledConjunctVals++)); + } else if (isLiteralExpr(right)) { + binaryPredicate.setChild(1, conjunctVals.get(handledConjunctVals++)); } else { Preconditions.checkState(false, "Should contains literal in " + binaryPredicate.toSqlImpl()); } + return handledConjunctVals; + } + + private boolean isLiteralExpr(Expr expr) { + return expr instanceof LiteralExpr; + } + + private boolean isDeleteSign(Expr expr) { + return expr instanceof SlotRef && ((SlotRef) expr).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN); + } + } + + private static void initHandler() { + handlers.put(InPredicate.class, InPredicateHandler.INSTANCE); + handlers.put(BinaryPredicate.class, BinaryPredicateHandler.INSTANCE); + } + + private static void updateScanNodeConjuncts(OlapScanNode scanNode, List<Expr> conjunctVals) { + List<Expr> conjuncts = scanNode.getConjuncts(); + if (handlers.isEmpty()) { + initHandler(); + } + int handledConjunctVals = 0; + for (Expr expr : conjuncts) { + ConjunctHandler handler = handlers.get(expr.getClass()); + if (handler == null) { + throw new AnalysisException("Not support conjunct type " + expr.getClass().getName()); + } + handledConjunctVals = handler.handle(expr, conjunctVals, handledConjunctVals); } + + Preconditions.checkState(handledConjunctVals == conjunctVals.size()); } public void setTimeout(long timeoutMs) { this.timeoutMs = timeoutMs; } - void addKeyTuples( - InternalService.PTabletKeyLookupRequest.Builder requestBuilder) { - // TODO handle IN predicates - Map<String, Expr> columnExpr = Maps.newHashMap(); - KeyTuple.Builder kBuilder = KeyTuple.newBuilder(); - for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) { + /* + * According to the current ordered key tuple, based on the leftmost partition principle, + * get the partition ID to which it belongs + */ + private Set<Long> getLeftMostPartitionIDs(List<String> orderedKeyTuple, + List<Column> keyColumns) { + Set<Long> leftMostPartitionIDs = Sets.newHashSet(); + for (int i = 0; i < partitionKeyColumns.size(); ++i) { + int colIdx = partitionKeyColumns.get(i); + String partitionColName = keyColumns.get(colIdx).getName(); + try { + ColumnBound partitionKey = ColumnBound.of(LiteralExpr.create(orderedKeyTuple.get(colIdx), + keyColumns.get(colIdx).getType())); + List<Long> partitionIDs = Lists.newArrayList( + Objects.requireNonNullElse( + partitionCol2PartitionID.get(partitionColName).get(partitionKey), + Collections.emptyList() + ) + ); + // Add the first partition column directly + if (i == 0) { + leftMostPartitionIDs.addAll(partitionIDs); + continue; + } + if (leftMostPartitionIDs.isEmpty() || partitionIDs == null) { + break; + } + partitionIDs.retainAll(leftMostPartitionIDs); + if (partitionIDs.isEmpty()) { + break; + } + } catch (Exception e) { + throw new AnalysisException("Failed to create partition key for key tuple: " + orderedKeyTuple); + } + } + return leftMostPartitionIDs; + } + + private void pickCandidateBackends() { + for (Long tabletID : scanTabletIDs) { + roundRobinPickReplica(tabletID); + } + } + + private void roundRobinPickReplica(Long tabletID) { + while (true) { + Backend backend = roundRobinscheduler.next(); + if (backend == null) { + break; + } + Map<Long, Replica> beWithReplica = replicaMetaTable.row(tabletID); + if (!beWithReplica.containsKey(backend.getId())) { + continue; + } + pickedCandidateReplicas + .putIfAbsent(tabletID, + Pair.of(backend, beWithReplica.get(backend.getId()).getId())); + break; + } + } + + // Use the leftmost matching partition to prune the tablet + private void addTabletIDsForKeyTuple(List<String> orderedKeyTuple, List<Column> keyColumns, + OlapTable olapTable, Set<Long> leftMostPartitionIDs) { + List<String> keyTupleForDistributionPrune = Lists.newArrayList(); + for (Integer idx : distributionKeyColumns) { + keyTupleForDistributionPrune.add(orderedKeyTuple.get(idx)); + } + Set<Long> tabletIDs = Sets.newHashSet(); + for (PartitionKey key : distributionKeys2TabletID.keySet()) { + List<String> distributionKeys = Lists.newArrayList(); + for (LiteralExpr expr : key.getKeys()) { + distributionKeys.add(expr.getStringValue()); + } + if (distributionKeys.equals(keyTupleForDistributionPrune)) { Review Comment: add comment to this if ########## fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java: ########## @@ -159,40 +219,258 @@ public static void directExecuteShortCircuitQuery(StmtExecutor executor, .getMysqlChannel(), null, null); } - private static void updateScanNodeConjuncts(OlapScanNode scanNode, List<Expr> conjunctVals) { - for (int i = 0; i < conjunctVals.size(); ++i) { - BinaryPredicate binaryPredicate = (BinaryPredicate) scanNode.getConjuncts().get(i); - if (binaryPredicate.getChild(0) instanceof LiteralExpr) { - binaryPredicate.setChild(0, conjunctVals.get(i)); - } else if (binaryPredicate.getChild(1) instanceof LiteralExpr) { - binaryPredicate.setChild(1, conjunctVals.get(i)); + /* + * Interface for handling different conjunct types + */ + private interface ConjunctHandler { + int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException; + } + + private static class InPredicateHandler implements ConjunctHandler { + public static final InPredicateHandler INSTANCE = new InPredicateHandler(); + + private InPredicateHandler() { + } + + @Override + public int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException { + InPredicate inPredicate = (InPredicate) expr; + if (inPredicate.isNotIn()) { + throw new AnalysisException("Not support NOT IN predicate in point query"); + } + for (int j = 1; j < inPredicate.getChildren().size(); ++j) { + if (inPredicate.getChild(j) instanceof LiteralExpr) { + inPredicate.setChild(j, conjunctVals.get(handledConjunctVals++)); + } else { + Preconditions.checkState(false, "Should contains literal in " + inPredicate.toSqlImpl()); + } + } + return handledConjunctVals; + } + } + + private static class BinaryPredicateHandler implements ConjunctHandler { + public static final BinaryPredicateHandler INSTANCE = new BinaryPredicateHandler(); + + private BinaryPredicateHandler() { + } + + @Override + public int handle(Expr expr, List<Expr> conjunctVals, int handledConjunctVals) throws AnalysisException { + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; + Expr left = binaryPredicate.getChild(0); + Expr right = binaryPredicate.getChild(1); + + if (isDeleteSign(left) || isDeleteSign(right)) { + return handledConjunctVals; + } + + if (isLiteralExpr(left)) { + binaryPredicate.setChild(0, conjunctVals.get(handledConjunctVals++)); + } else if (isLiteralExpr(right)) { + binaryPredicate.setChild(1, conjunctVals.get(handledConjunctVals++)); } else { Preconditions.checkState(false, "Should contains literal in " + binaryPredicate.toSqlImpl()); } + return handledConjunctVals; + } + + private boolean isLiteralExpr(Expr expr) { + return expr instanceof LiteralExpr; + } + + private boolean isDeleteSign(Expr expr) { + return expr instanceof SlotRef && ((SlotRef) expr).getColumnName().equalsIgnoreCase(Column.DELETE_SIGN); + } + } + + private static void initHandler() { + handlers.put(InPredicate.class, InPredicateHandler.INSTANCE); + handlers.put(BinaryPredicate.class, BinaryPredicateHandler.INSTANCE); + } + + private static void updateScanNodeConjuncts(OlapScanNode scanNode, List<Expr> conjunctVals) { + List<Expr> conjuncts = scanNode.getConjuncts(); + if (handlers.isEmpty()) { + initHandler(); + } + int handledConjunctVals = 0; + for (Expr expr : conjuncts) { + ConjunctHandler handler = handlers.get(expr.getClass()); + if (handler == null) { + throw new AnalysisException("Not support conjunct type " + expr.getClass().getName()); + } + handledConjunctVals = handler.handle(expr, conjunctVals, handledConjunctVals); } + + Preconditions.checkState(handledConjunctVals == conjunctVals.size()); } public void setTimeout(long timeoutMs) { this.timeoutMs = timeoutMs; } - void addKeyTuples( - InternalService.PTabletKeyLookupRequest.Builder requestBuilder) { - // TODO handle IN predicates - Map<String, Expr> columnExpr = Maps.newHashMap(); - KeyTuple.Builder kBuilder = KeyTuple.newBuilder(); - for (Expr expr : shortCircuitQueryContext.scanNode.getConjuncts()) { + /* + * According to the current ordered key tuple, based on the leftmost partition principle, + * get the partition ID to which it belongs + */ + private Set<Long> getLeftMostPartitionIDs(List<String> orderedKeyTuple, + List<Column> keyColumns) { + Set<Long> leftMostPartitionIDs = Sets.newHashSet(); + for (int i = 0; i < partitionKeyColumns.size(); ++i) { + int colIdx = partitionKeyColumns.get(i); + String partitionColName = keyColumns.get(colIdx).getName(); + try { + ColumnBound partitionKey = ColumnBound.of(LiteralExpr.create(orderedKeyTuple.get(colIdx), + keyColumns.get(colIdx).getType())); + List<Long> partitionIDs = Lists.newArrayList( + Objects.requireNonNullElse( + partitionCol2PartitionID.get(partitionColName).get(partitionKey), + Collections.emptyList() + ) + ); + // Add the first partition column directly + if (i == 0) { + leftMostPartitionIDs.addAll(partitionIDs); + continue; + } + if (leftMostPartitionIDs.isEmpty() || partitionIDs == null) { + break; + } + partitionIDs.retainAll(leftMostPartitionIDs); + if (partitionIDs.isEmpty()) { + break; + } + } catch (Exception e) { + throw new AnalysisException("Failed to create partition key for key tuple: " + orderedKeyTuple); + } + } + return leftMostPartitionIDs; + } + + private void pickCandidateBackends() { + for (Long tabletID : scanTabletIDs) { + roundRobinPickReplica(tabletID); + } + } + + private void roundRobinPickReplica(Long tabletID) { + while (true) { + Backend backend = roundRobinscheduler.next(); + if (backend == null) { + break; + } + Map<Long, Replica> beWithReplica = replicaMetaTable.row(tabletID); + if (!beWithReplica.containsKey(backend.getId())) { + continue; + } + pickedCandidateReplicas + .putIfAbsent(tabletID, + Pair.of(backend, beWithReplica.get(backend.getId()).getId())); + break; + } + } + + // Use the leftmost matching partition to prune the tablet + private void addTabletIDsForKeyTuple(List<String> orderedKeyTuple, List<Column> keyColumns, + OlapTable olapTable, Set<Long> leftMostPartitionIDs) { + List<String> keyTupleForDistributionPrune = Lists.newArrayList(); + for (Integer idx : distributionKeyColumns) { + keyTupleForDistributionPrune.add(orderedKeyTuple.get(idx)); + } + Set<Long> tabletIDs = Sets.newHashSet(); + for (PartitionKey key : distributionKeys2TabletID.keySet()) { + List<String> distributionKeys = Lists.newArrayList(); + for (LiteralExpr expr : key.getKeys()) { + distributionKeys.add(expr.getStringValue()); + } + if (distributionKeys.equals(keyTupleForDistributionPrune)) { + Set<Long> originTabletIDs = Sets.newHashSet(distributionKeys2TabletID.get(key)); + if (leftMostPartitionIDs.isEmpty()) { Review Comment: add comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org