morrySnow commented on code in PR #40225:
URL: https://github.com/apache/doris/pull/40225#discussion_r1751559686


##########
fe/fe-core/pom.xml:
##########
@@ -453,6 +454,11 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-table-api</artifactId>
+            <version>0.48.8-public</version>

Review Comment:
   better to use a property here to ensure odps deps always with same version



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java:
##########
@@ -75,11 +99,290 @@ private void setScanParams(TFileRangeDesc rangeDesc, 
MaxComputeSplit maxComputeS
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
         TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
-        if (maxComputeSplit.getPartitionSpec().isPresent()) {
-            
fileDesc.setPartitionSpec(maxComputeSplit.getPartitionSpec().get());
-        }
+        fileDesc.setPartitionSpec("deprecated");
+        fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
+        fileDesc.setSessionId(maxComputeSplit.getSessionId());
         tableFormatFileDesc.setMaxComputeParams(fileDesc);
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
+        rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + 
maxComputeSplit.getLength() + " ]");
+        rangeDesc.setStartOffset(maxComputeSplit.getStart());
+        rangeDesc.setSize(maxComputeSplit.getLength());
+    }
+
+    void createTableBatchReadSession() throws UserException {
+        Predicate filterPredicate = convertPredicate();
+
+
+        List<String> requiredPartitionColumns = new ArrayList<>();
+        List<String> orderedRequiredDataColumns = new ArrayList<>();
+
+        Set<String> requiredSlots =
+                desc.getSlots().stream().map(e -> 
e.getColumn().getName()).collect(Collectors.toSet());
+
+        Set<String> partitionColumns =
+                
table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet());
+
+        for (Column column : table.getColumns()) {
+            String columnName =  column.getName();
+            if (!requiredSlots.contains(columnName)) {
+                continue;
+            }
+            if (partitionColumns.contains(columnName)) {
+                requiredPartitionColumns.add(columnName);
+            } else {
+                orderedRequiredDataColumns.add(columnName);
+            }
+        }
+
+
+
+        MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) 
table.getCatalog();
+
+        try {
+            TableReadSessionBuilder scanBuilder = new 
TableReadSessionBuilder();
+            tableBatchReadSession =
+                    
scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName()))
+                            .withSettings(mcCatalog.getSettings())
+                            .withSplitOptions(mcCatalog.getSplitOption())
+                            .requiredPartitionColumns(requiredPartitionColumns)
+                            .requiredDataColumns(orderedRequiredDataColumns)
+                            .withArrowOptions(
+                                    ArrowOptions.newBuilder()
+                                            
.withDatetimeUnit(TimestampUnit.MILLI)
+                                            
.withTimestampUnit(TimestampUnit.NANO)
+                                            .build()
+                            )
+                            .withFilterPredicate(filterPredicate)
+                            .buildBatchReadSession();
+        } catch (java.io.IOException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    protected Predicate convertPredicate() {
+        if (conjuncts.isEmpty()) {
+            return Predicate.NO_PREDICATE;
+        }
+
+        if (conjuncts.size() == 1) {
+            try {
+                return convertExprToOdpsPredicate(conjuncts.get(0));
+            } catch (AnalysisException e) {
+                Log.info("Failed to convert predicate " + conjuncts.get(0) + " 
to odps predicate");
+                Log.info("Reason: " + e.getMessage());
+                return Predicate.NO_PREDICATE;
+            }
+        }
+
+        com.aliyun.odps.table.optimizer.predicate.CompoundPredicate
+                filterPredicate = new 
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(
+                        
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND
+        );
+
+        for (Expr predicate : conjuncts) {
+            try {
+                
filterPredicate.addPredicate(convertExprToOdpsPredicate(predicate));
+            } catch (AnalysisException e) {
+                Log.info("Failed to convert predicate " + predicate);
+                Log.info("Reason: " + e.getMessage());
+                return Predicate.NO_PREDICATE;
+            }
+        }
+        return filterPredicate;
+    }
+
+    private Predicate convertExprToOdpsPredicate(Expr expr) throws 
AnalysisException {
+        Predicate odpsPredicate = null;
+        if (expr instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
+
+            
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator odpsOp;
+            switch (compoundPredicate.getOp()) {
+                case AND:
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND;
+                    break;
+                case OR:
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.OR;
+                    break;
+                case NOT:
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.NOT;
+                    break;
+                default:
+                    throw new AnalysisException("Unknown operator: " + 
compoundPredicate.getOp());
+            }
+
+            List<Predicate> odpsPredicates = new ArrayList<>();
+
+            odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(0)));
+
+            if (compoundPredicate.getOp() != Operator.NOT) {
+                
odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(1)));
+            }
+            odpsPredicate = new 
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(odpsOp, 
odpsPredicates);
+
+        } else if (expr instanceof InPredicate) {
+
+            InPredicate inPredicate = (InPredicate) expr;
+            if (inPredicate.getChildren().size() > 2) {
+                return Predicate.NO_PREDICATE;
+            }
+            com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator 
odpsOp =
+                    inPredicate.isNotIn()
+                            ? 
com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.IN
+                            : 
com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.NOT_IN;
+
+            String columnName = convertSlotRefToColumnName(expr.getChild(0));
+            com.aliyun.odps.OdpsType odpsType  =  
table.getColumnNameToOdpsColumn().get(columnName).getType();
+
+            StringBuilder stringBuilder = new StringBuilder();
+
+
+            stringBuilder.append(columnName);
+            stringBuilder.append(" ");
+            stringBuilder.append(odpsOp.getDescription());
+            stringBuilder.append(" (");
+
+            for (int i = 1; i < inPredicate.getChildren().size(); i++) {
+                stringBuilder.append(convertLiteralToOdpsValues(odpsType, 
expr.getChild(i)));
+                if (i < inPredicate.getChildren().size() - 1) {
+                    stringBuilder.append(", ");
+                }
+            }
+            stringBuilder.append(" )");
+
+            odpsPredicate = new 
com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
+
+        } else if (expr instanceof BinaryPredicate) {
+            BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
+
+
+            com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator 
odpsOp;
+            switch (binaryPredicate.getOp()) {
+                case EQ: {
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.EQUALS;
+                    break;
+                }
+                case NE: {
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.NOT_EQUALS;
+                    break;
+                }
+                case GE: {
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN_OR_EQUAL;
+                    break;
+                }
+                case LE: {
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN_OR_EQUAL;
+                    break;
+                }
+                case LT: {
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN;
+                    break;
+                }
+                case GT: {
+                    odpsOp = 
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN;
+                    break;
+                }
+                default: {
+                    odpsOp = null;
+                    break;
+                }
+            }
+
+            if (odpsOp != null) {
+                String columnName = 
convertSlotRefToColumnName(expr.getChild(0));
+                com.aliyun.odps.OdpsType odpsType  =  
table.getColumnNameToOdpsColumn().get(columnName).getType();
+                StringBuilder stringBuilder = new StringBuilder();
+                stringBuilder.append(columnName);
+                stringBuilder.append(" ");
+                stringBuilder.append(odpsOp.getDescription());
+                stringBuilder.append(" ");
+                stringBuilder.append(convertLiteralToOdpsValues(odpsType, 
expr.getChild(1)));
+
+                odpsPredicate = new 
com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
+            }
+        } else if (expr instanceof IsNullPredicate) {
+            IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
+            com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator 
odpsOp =
+                    isNullPredicate.isNotNull()
+                            ? 
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.NOT_NULL
+                            : 
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.IS_NULL;
+
+            odpsPredicate =  new 
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate(odpsOp,
+                    new com.aliyun.odps.table.optimizer.predicate.Attribute(
+                        convertSlotRefToColumnName(expr.getChild(0))
+                    )
+            );
+        }
+
+
+        if (odpsPredicate == null) {
+            throw new AnalysisException("Do not support convert ["
+                    + expr.getExprName() + "] in convertExprToOdpsPredicate.");
+        }
+        return odpsPredicate;
+    }
+
+    private String convertSlotRefToColumnName(Expr expr) throws 
AnalysisException {
+        if (expr instanceof SlotRef) {
+            return ((SlotRef) expr).getColumnName();
+        } else if (expr instanceof CastExpr) {
+            if (expr.getChild(0) instanceof SlotRef) {
+                return ((SlotRef) expr.getChild(0)).getColumnName();
+            }
+        }
+
+        throw new AnalysisException("Do not support convert ["
+                + expr.getExprName() + "] in convertSlotRefToAttribute.");
+
+
+    }
+
+    private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) 
throws AnalysisException {
+        if (!(expr instanceof LiteralExpr)) {
+            throw new AnalysisException("Do not support convert ["
+                    + expr.getExprName() + "] in convertSlotRefToAttribute.");
+        }
+        LiteralExpr literalExpr = (LiteralExpr) expr;
+
+        literalExpr.toString();

Review Comment:
   why call this here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to