This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b49bc4  [fix](source) fix  projection and function pushdown not 
functioning correctly (#383)
9b49bc4 is described below

commit 9b49bc43827406135ba19342fb33e48daffda4b0
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Sat May 11 14:46:39 2024 +0800

    [fix](source) fix  projection and function pushdown not functioning 
correctly (#383)
---
 .../doris/flink/table/DorisDynamicTableSource.java   | 20 ++++++++++----------
 .../doris/flink/table/DorisExpressionVisitor.java    | 11 +++++++++++
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 32851d4..e2b837c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -99,15 +99,6 @@ public final class DorisDynamicTableSource
             String filterQuery = 
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
             readOptions.setFilterQuery(filterQuery);
         }
-        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
-            String[] selectFields =
-                    DataType.getFieldNames(physicalRowDataType).toArray(new 
String[0]);
-            readOptions.setReadFields(
-                    Arrays.stream(selectFields)
-                            .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
-                            .collect(Collectors.joining(", ")));
-        }
-
         if (readOptions.getUseOldApi()) {
             List<PartitionDefinition> dorisPartitions;
             try {
@@ -194,7 +185,8 @@ public final class DorisDynamicTableSource
         DorisExpressionVisitor expressionVisitor = new 
DorisExpressionVisitor();
         for (ResolvedExpression filter : filters) {
             String filterQuery = filter.accept(expressionVisitor);
-            if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
+            if 
(StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())
+                    && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
                 acceptedFilters.add(filter);
                 this.resolvedFilterQuery.add(filterQuery);
             } else {
@@ -212,5 +204,13 @@ public final class DorisDynamicTableSource
     @Override
     public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
         this.physicalRowDataType = 
Projection.of(projectedFields).project(physicalRowDataType);
+        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+            String[] selectFields =
+                    DataType.getFieldNames(physicalRowDataType).toArray(new 
String[0]);
+            this.readOptions.setReadFields(
+                    Arrays.stream(selectFields)
+                            .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
+                            .collect(Collectors.joining(", ")));
+        }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
index 3f327fe..66242e1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.StringUtils;
 
 import java.util.List;
 
@@ -66,12 +67,22 @@ public class DorisExpressionVisitor implements 
ExpressionVisitor<String> {
         if 
(BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
             return combineLeftExpression("IS NOT NULL", 
call.getResolvedChildren().get(0));
         }
+
+        if 
(BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) {
+            return call.getChildren().get(0).accept(this);
+        }
         return null;
     }
 
     private String combineExpression(String operator, List<ResolvedExpression> 
operand) {
         String left = operand.get(0).accept(this);
+        if (StringUtils.isNullOrWhitespaceOnly(left)) {
+            return null;
+        }
         String right = operand.get(1).accept(this);
+        if (StringUtils.isNullOrWhitespaceOnly(right)) {
+            return null;
+        }
         return String.format("(%s %s %s)", left, operator, right);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to