rdblue commented on code in PR #6402: URL: https://github.com/apache/iceberg/pull/6402#discussion_r1050277049
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkFilters.java: ########## @@ -246,18 +248,70 @@ private static Optional<Expression> convertFieldAndLiteral( org.apache.flink.table.expressions.Expression left = args.get(0); org.apache.flink.table.expressions.Expression right = args.get(1); - if (left instanceof FieldReferenceExpression && right instanceof ValueLiteralExpression) { - String name = ((FieldReferenceExpression) left).getName(); - Optional<Object> lit = convertLiteral((ValueLiteralExpression) right); + Optional<Object> lit; + if (left instanceof FieldReferenceExpression) { + lit = convertExpression(right); if (lit.isPresent()) { - return Optional.of(convertLR.apply(name, lit.get())); + return Optional.of(convertLR.apply(((FieldReferenceExpression) left).getName(), lit.get())); } - } else if (left instanceof ValueLiteralExpression - && right instanceof FieldReferenceExpression) { - Optional<Object> lit = convertLiteral((ValueLiteralExpression) left); - String name = ((FieldReferenceExpression) right).getName(); + } else if (right instanceof FieldReferenceExpression) { + lit = convertExpression(left); if (lit.isPresent()) { - return Optional.of(convertRL.apply(name, lit.get())); + return Optional.of( + convertRL.apply(((FieldReferenceExpression) right).getName(), lit.get())); + } + } + + return Optional.empty(); + } + + private static Optional<Object> convertExpression( + org.apache.flink.table.expressions.Expression expression) { + if (expression instanceof ValueLiteralExpression) { + return convertLiteral((ValueLiteralExpression) expression); + } else if (expression instanceof CallExpression) { + return convertCallExpression((CallExpression) expression); + } + return Optional.empty(); + } + + private static Optional<Object> convertCallExpression(CallExpression call) { + if (!BuiltInFunctionDefinitions.CAST.equals(call.getFunctionDefinition())) { + return Optional.empty(); + } + + List<ResolvedExpression> args = call.getResolvedChildren(); + if (args.size() != 2) { + return Optional.empty(); + } + + org.apache.flink.table.expressions.Expression left = args.get(0); + org.apache.flink.table.expressions.Expression right = args.get(1); + + if (left instanceof ValueLiteralExpression && right instanceof TypeLiteralExpression) { + ValueLiteralExpression value = (ValueLiteralExpression) left; + TypeLiteralExpression type = (TypeLiteralExpression) right; + + LogicalType logicalType = type.getOutputDataType().getLogicalType(); + + Optional<?> result = value.getValueAs(logicalType.getDefaultConversion()); + if (result.isPresent()) { + return Optional.of(result.get()); + } + + switch (logicalType.getTypeRoot()) { + case DOUBLE: + Optional<String> strValue = value.getValueAs(String.class); + if (strValue.isPresent()) { + return Optional.of(Double.valueOf(strValue.get())); Review Comment: I think that this needs to handle `NumberFormatException` in case someone uses an expression like `cast("fail" as double)`. You could put a try/except around the switch and return `Optional.empty`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org