morrySnow commented on code in PR #40225: URL: https://github.com/apache/doris/pull/40225#discussion_r1751559686
########## fe/fe-core/pom.xml: ########## @@ -453,6 +454,11 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>com.aliyun.odps</groupId> + <artifactId>odps-sdk-table-api</artifactId> + <version>0.48.8-public</version> Review Comment: better to use a property here to ensure odps deps always with same version ########## fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java: ########## @@ -75,11 +99,290 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value()); TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc(); - if (maxComputeSplit.getPartitionSpec().isPresent()) { - fileDesc.setPartitionSpec(maxComputeSplit.getPartitionSpec().get()); - } + fileDesc.setPartitionSpec("deprecated"); + fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize); + fileDesc.setSessionId(maxComputeSplit.getSessionId()); tableFormatFileDesc.setMaxComputeParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); + rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + maxComputeSplit.getLength() + " ]"); + rangeDesc.setStartOffset(maxComputeSplit.getStart()); + rangeDesc.setSize(maxComputeSplit.getLength()); + } + + void createTableBatchReadSession() throws UserException { + Predicate filterPredicate = convertPredicate(); + + + List<String> requiredPartitionColumns = new ArrayList<>(); + List<String> orderedRequiredDataColumns = new ArrayList<>(); + + Set<String> requiredSlots = + desc.getSlots().stream().map(e -> e.getColumn().getName()).collect(Collectors.toSet()); + + Set<String> partitionColumns = + table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet()); + + for (Column column : table.getColumns()) { + String columnName = column.getName(); + if (!requiredSlots.contains(columnName)) { + continue; + } + if (partitionColumns.contains(columnName)) { + requiredPartitionColumns.add(columnName); + } else { + orderedRequiredDataColumns.add(columnName); + } + } + + + + MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog(); + + try { + TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); + tableBatchReadSession = + scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName())) + .withSettings(mcCatalog.getSettings()) + .withSplitOptions(mcCatalog.getSplitOption()) + .requiredPartitionColumns(requiredPartitionColumns) + .requiredDataColumns(orderedRequiredDataColumns) + .withArrowOptions( + ArrowOptions.newBuilder() + .withDatetimeUnit(TimestampUnit.MILLI) + .withTimestampUnit(TimestampUnit.NANO) + .build() + ) + .withFilterPredicate(filterPredicate) + .buildBatchReadSession(); + } catch (java.io.IOException e) { + throw new RuntimeException(e); + } + + } + + protected Predicate convertPredicate() { + if (conjuncts.isEmpty()) { + return Predicate.NO_PREDICATE; + } + + if (conjuncts.size() == 1) { + try { + return convertExprToOdpsPredicate(conjuncts.get(0)); + } catch (AnalysisException e) { + Log.info("Failed to convert predicate " + conjuncts.get(0) + " to odps predicate"); + Log.info("Reason: " + e.getMessage()); + return Predicate.NO_PREDICATE; + } + } + + com.aliyun.odps.table.optimizer.predicate.CompoundPredicate + filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate( + com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND + ); + + for (Expr predicate : conjuncts) { + try { + filterPredicate.addPredicate(convertExprToOdpsPredicate(predicate)); + } catch (AnalysisException e) { + Log.info("Failed to convert predicate " + predicate); + Log.info("Reason: " + e.getMessage()); + return Predicate.NO_PREDICATE; + } + } + return filterPredicate; + } + + private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException { + Predicate odpsPredicate = null; + if (expr instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + + com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator odpsOp; + switch (compoundPredicate.getOp()) { + case AND: + odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND; + break; + case OR: + odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.OR; + break; + case NOT: + odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.NOT; + break; + default: + throw new AnalysisException("Unknown operator: " + compoundPredicate.getOp()); + } + + List<Predicate> odpsPredicates = new ArrayList<>(); + + odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(0))); + + if (compoundPredicate.getOp() != Operator.NOT) { + odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(1))); + } + odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(odpsOp, odpsPredicates); + + } else if (expr instanceof InPredicate) { + + InPredicate inPredicate = (InPredicate) expr; + if (inPredicate.getChildren().size() > 2) { + return Predicate.NO_PREDICATE; + } + com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator odpsOp = + inPredicate.isNotIn() + ? com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.IN + : com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.NOT_IN; + + String columnName = convertSlotRefToColumnName(expr.getChild(0)); + com.aliyun.odps.OdpsType odpsType = table.getColumnNameToOdpsColumn().get(columnName).getType(); + + StringBuilder stringBuilder = new StringBuilder(); + + + stringBuilder.append(columnName); + stringBuilder.append(" "); + stringBuilder.append(odpsOp.getDescription()); + stringBuilder.append(" ("); + + for (int i = 1; i < inPredicate.getChildren().size(); i++) { + stringBuilder.append(convertLiteralToOdpsValues(odpsType, expr.getChild(i))); + if (i < inPredicate.getChildren().size() - 1) { + stringBuilder.append(", "); + } + } + stringBuilder.append(" )"); + + odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString()); + + } else if (expr instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; + + + com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator odpsOp; + switch (binaryPredicate.getOp()) { + case EQ: { + odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.EQUALS; + break; + } + case NE: { + odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.NOT_EQUALS; + break; + } + case GE: { + odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN_OR_EQUAL; + break; + } + case LE: { + odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN_OR_EQUAL; + break; + } + case LT: { + odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN; + break; + } + case GT: { + odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN; + break; + } + default: { + odpsOp = null; + break; + } + } + + if (odpsOp != null) { + String columnName = convertSlotRefToColumnName(expr.getChild(0)); + com.aliyun.odps.OdpsType odpsType = table.getColumnNameToOdpsColumn().get(columnName).getType(); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(columnName); + stringBuilder.append(" "); + stringBuilder.append(odpsOp.getDescription()); + stringBuilder.append(" "); + stringBuilder.append(convertLiteralToOdpsValues(odpsType, expr.getChild(1))); + + odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString()); + } + } else if (expr instanceof IsNullPredicate) { + IsNullPredicate isNullPredicate = (IsNullPredicate) expr; + com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator odpsOp = + isNullPredicate.isNotNull() + ? com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.NOT_NULL + : com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.IS_NULL; + + odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.UnaryPredicate(odpsOp, + new com.aliyun.odps.table.optimizer.predicate.Attribute( + convertSlotRefToColumnName(expr.getChild(0)) + ) + ); + } + + + if (odpsPredicate == null) { + throw new AnalysisException("Do not support convert [" + + expr.getExprName() + "] in convertExprToOdpsPredicate."); + } + return odpsPredicate; + } + + private String convertSlotRefToColumnName(Expr expr) throws AnalysisException { + if (expr instanceof SlotRef) { + return ((SlotRef) expr).getColumnName(); + } else if (expr instanceof CastExpr) { + if (expr.getChild(0) instanceof SlotRef) { + return ((SlotRef) expr.getChild(0)).getColumnName(); + } + } + + throw new AnalysisException("Do not support convert [" + + expr.getExprName() + "] in convertSlotRefToAttribute."); + + + } + + private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws AnalysisException { + if (!(expr instanceof LiteralExpr)) { + throw new AnalysisException("Do not support convert [" + + expr.getExprName() + "] in convertSlotRefToAttribute."); + } + LiteralExpr literalExpr = (LiteralExpr) expr; + + literalExpr.toString(); Review Comment: why call this here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org