wypoon commented on code in PR #9192: URL: https://github.com/apache/iceberg/pull/9192#discussion_r1431856870
########## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala: ########## @@ -56,7 +69,6 @@ object ReplaceStaticInvoke extends Rule[LogicalPlan] { filter.copy(condition = newCondition) } } - Review Comment: Unnecessary whitespace change. ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java: ########## @@ -213,17 +216,75 @@ private void testBucketLongFunction(boolean partitioned) { String query = String.format( "SELECT * FROM %s WHERE system.bucket(5, id) <= %s ORDER BY id", tableName, target); + checkQueryExecution(query, partitioned, lessThanOrEqual(bucket("id", 5), target)); + } + + @Test + public void testBucketLongFunctionInClauseOnUnpartitionedTable() { + createUnpartitionedTable(spark, tableName); + testBucketLongFunctionInClause(false); + } + + @Test + public void testBucketLongFunctionInClauseOnPartitionedTable() { + createPartitionedTable(spark, tableName, "bucket(5, id)"); + testBucketLongFunctionInClause(true); + } + + private void testBucketLongFunctionInClause(boolean partitioned) { + List<Integer> inValues = IntStream.range(0, 3).boxed().collect(Collectors.toList()); + String inValuesAsSql = + inValues.stream().map(x -> Integer.toString(x)).collect(Collectors.joining(", ")); + String query = + String.format( + "SELECT * FROM %s WHERE system.bucket(5, id) IN (%s) ORDER BY id", + tableName, inValuesAsSql); + checkQueryExecution(query, partitioned, in(bucket("id", 5), inValues.toArray())); + } + + private void checkQueryExecution( + String query, boolean partitioned, org.apache.iceberg.expressions.Expression expression) { Dataset<Row> df = spark.sql(query); LogicalPlan optimizedPlan = df.queryExecution().optimizedPlan(); checkExpressions(optimizedPlan, partitioned, "bucket"); - checkPushedFilters(optimizedPlan, lessThanOrEqual(bucket("id", 5), target)); + checkPushedFilters(optimizedPlan, expression); List<Object[]> actual = rowsToJava(df.collectAsList()); Assertions.assertThat(actual.size()).isEqualTo(5); } + @Test + public void testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiteralsOnPartitionedTable() { + createPartitionedTable(spark, tableName, "bucket(5, id)"); + testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiterals(); + } + + @Test + public void testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiteralsOnUnpartitionedTable() { + createUnpartitionedTable(spark, tableName); + testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiterals(); + } + + private void testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiterals() { + List<Integer> range = IntStream.range(0, 3).boxed().collect(Collectors.toList()); Review Comment: Not used. ########## spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSystemFunctionPushDownDQL.java: ########## @@ -213,17 +216,75 @@ private void testBucketLongFunction(boolean partitioned) { String query = String.format( "SELECT * FROM %s WHERE system.bucket(5, id) <= %s ORDER BY id", tableName, target); + checkQueryExecution(query, partitioned, lessThanOrEqual(bucket("id", 5), target)); + } + + @Test + public void testBucketLongFunctionInClauseOnUnpartitionedTable() { + createUnpartitionedTable(spark, tableName); + testBucketLongFunctionInClause(false); + } + + @Test + public void testBucketLongFunctionInClauseOnPartitionedTable() { + createPartitionedTable(spark, tableName, "bucket(5, id)"); + testBucketLongFunctionInClause(true); + } + + private void testBucketLongFunctionInClause(boolean partitioned) { + List<Integer> inValues = IntStream.range(0, 3).boxed().collect(Collectors.toList()); + String inValuesAsSql = + inValues.stream().map(x -> Integer.toString(x)).collect(Collectors.joining(", ")); + String query = + String.format( + "SELECT * FROM %s WHERE system.bucket(5, id) IN (%s) ORDER BY id", + tableName, inValuesAsSql); + checkQueryExecution(query, partitioned, in(bucket("id", 5), inValues.toArray())); + } + + private void checkQueryExecution( + String query, boolean partitioned, org.apache.iceberg.expressions.Expression expression) { Dataset<Row> df = spark.sql(query); LogicalPlan optimizedPlan = df.queryExecution().optimizedPlan(); checkExpressions(optimizedPlan, partitioned, "bucket"); - checkPushedFilters(optimizedPlan, lessThanOrEqual(bucket("id", 5), target)); + checkPushedFilters(optimizedPlan, expression); List<Object[]> actual = rowsToJava(df.collectAsList()); Assertions.assertThat(actual.size()).isEqualTo(5); } + @Test + public void testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiteralsOnPartitionedTable() { + createPartitionedTable(spark, tableName, "bucket(5, id)"); + testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiterals(); + } + + @Test + public void testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiteralsOnUnpartitionedTable() { + createUnpartitionedTable(spark, tableName); + testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiterals(); + } + + private void testBucketLongFunctionIsNotReplacedWhenArgumentsAreNotLiterals() { + List<Integer> range = IntStream.range(0, 3).boxed().collect(Collectors.toList()); Review Comment: `range` is not used. (I pointed this out before.) ########## spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceStaticInvoke.scala: ########## @@ -56,7 +69,6 @@ object ReplaceStaticInvoke extends Rule[LogicalPlan] { filter.copy(condition = newCondition) } } - Review Comment: Unnecessary (and undesirable) whitespace change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org