This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9b49bc4 [fix](source) fix projection and function pushdown not functioning correctly (#383) 9b49bc4 is described below commit 9b49bc43827406135ba19342fb33e48daffda4b0 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Sat May 11 14:46:39 2024 +0800 [fix](source) fix projection and function pushdown not functioning correctly (#383) --- .../doris/flink/table/DorisDynamicTableSource.java | 20 ++++++++++---------- .../doris/flink/table/DorisExpressionVisitor.java | 11 +++++++++++ 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 32851d4..e2b837c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -99,15 +99,6 @@ public final class DorisDynamicTableSource String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); readOptions.setFilterQuery(filterQuery); } - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { - String[] selectFields = - DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); - readOptions.setReadFields( - Arrays.stream(selectFields) - .map(item -> String.format("`%s`", item.trim().replace("`", ""))) - .collect(Collectors.joining(", "))); - } - if (readOptions.getUseOldApi()) { List<PartitionDefinition> dorisPartitions; try { @@ -194,7 +185,8 @@ public final class DorisDynamicTableSource DorisExpressionVisitor expressionVisitor = new DorisExpressionVisitor(); for (ResolvedExpression filter : filters) { String filterQuery = filter.accept(expressionVisitor); - if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) { + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery()) + && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) { acceptedFilters.add(filter); this.resolvedFilterQuery.add(filterQuery); } else { @@ -212,5 +204,13 @@ public final class DorisDynamicTableSource @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { + String[] selectFields = + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + this.readOptions.setReadFields( + Arrays.stream(selectFields) + .map(item -> String.format("`%s`", item.trim().replace("`", ""))) + .collect(Collectors.joining(", "))); + } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index 3f327fe..66242e1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -26,6 +26,7 @@ import org.apache.flink.table.expressions.TypeLiteralExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.util.StringUtils; import java.util.List; @@ -66,12 +67,22 @@ public class DorisExpressionVisitor implements ExpressionVisitor<String> { if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) { return combineLeftExpression("IS NOT NULL", call.getResolvedChildren().get(0)); } + + if (BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { + return call.getChildren().get(0).accept(this); + } return null; } private String combineExpression(String operator, List<ResolvedExpression> operand) { String left = operand.get(0).accept(this); + if (StringUtils.isNullOrWhitespaceOnly(left)) { + return null; + } String right = operand.get(1).accept(this); + if (StringUtils.isNullOrWhitespaceOnly(right)) { + return null; + } return String.format("(%s %s %s)", left, operator, right); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org