rdblue commented on code in PR #7886:
URL: https://github.com/apache/iceberg/pull/7886#discussion_r1264216930
##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkScan.java:
##########
@@ -76,4 +109,706 @@ public void testEstimatedRowCount() throws
NoSuchTableException {
Assert.assertEquals(10000L, stats.numRows().getAsLong());
}
+
+ @Test
+ public void testUnpartitionedYears() throws Exception {
+ createUnpartitionedTable(spark, tableName);
+
+ SparkScanBuilder builder = scanBuilder();
+
+ YearsFunction.TimestampToYearsFunction function = new
YearsFunction.TimestampToYearsFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ "=", expressions(udf, LiteralValue.apply(years("2017-11-22"),
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+
+ // NOT Equal
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+ }
+
+ @Test
+ public void testPartitionedYears() throws Exception {
+ createPartitionedTable(spark, tableName, "years(ts)");
+
+ SparkScanBuilder builder = scanBuilder();
+
+ YearsFunction.TimestampToYearsFunction function = new
YearsFunction.TimestampToYearsFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ "=", expressions(udf, LiteralValue.apply(years("2017-11-22"),
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(5);
+
+ // NOT Equal
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(5);
+ }
+
+ @Test
+ public void testUnpartitionedMonths() throws Exception {
+ createUnpartitionedTable(spark, tableName);
+
+ SparkScanBuilder builder = scanBuilder();
+
+ MonthsFunction.TimestampToMonthsFunction function =
+ new MonthsFunction.TimestampToMonthsFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ ">", expressions(udf, LiteralValue.apply(months("2017-11-22"),
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+
+ // NOT GT
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+ }
+
+ @Test
+ public void testPartitionedMonths() throws Exception {
+ createPartitionedTable(spark, tableName, "months(ts)");
+
+ SparkScanBuilder builder = scanBuilder();
+
+ MonthsFunction.TimestampToMonthsFunction function =
+ new MonthsFunction.TimestampToMonthsFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ ">", expressions(udf, LiteralValue.apply(months("2017-11-22"),
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(5);
+
+ // NOT GT
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(5);
+ }
+
+ @Test
+ public void testUnpartitionedDays() throws Exception {
+ createUnpartitionedTable(spark, tableName);
+
+ SparkScanBuilder builder = scanBuilder();
+
+ DaysFunction.TimestampToDaysFunction function = new
DaysFunction.TimestampToDaysFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ "<", expressions(udf, LiteralValue.apply(days("2018-11-20"),
DataTypes.DateType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+
+ // NOT LT
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+ }
+
+ @Test
+ public void testPartitionedDays() throws Exception {
+ createPartitionedTable(spark, tableName, "days(ts)");
+
+ SparkScanBuilder builder = scanBuilder();
+
+ DaysFunction.TimestampToDaysFunction function = new
DaysFunction.TimestampToDaysFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ "<", expressions(udf, LiteralValue.apply(days("2018-11-20"),
DataTypes.DateType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(5);
+
+ // NOT LT
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(5);
+ }
+
+ @Test
+ public void testUnpartitionedHours() throws Exception {
+ createUnpartitionedTable(spark, tableName);
+
+ SparkScanBuilder builder = scanBuilder();
+
+ HoursFunction.TimestampToHoursFunction function = new
HoursFunction.TimestampToHoursFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ ">=",
+ expressions(
+ udf,
+ LiteralValue.apply(
+ hours("2017-11-22T06:02:09.243857+00:00"),
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+
+ // NOT GTEQ
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+ }
+
+ @Test
+ public void testPartitionedHours() throws Exception {
+ createPartitionedTable(spark, tableName, "hours(ts)");
+
+ SparkScanBuilder builder = scanBuilder();
+
+ HoursFunction.TimestampToHoursFunction function = new
HoursFunction.TimestampToHoursFunction();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(), function.canonicalName(),
expressions(FieldReference.apply("ts")));
+ Predicate predicate =
+ new Predicate(
+ ">=",
+ expressions(
+ udf,
+ LiteralValue.apply(
+ hours("2017-11-22T06:02:09.243857+00:00"),
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(8);
+
+ // NOT GTEQ
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(2);
+ }
+
+ @Test
+ public void testUnpartitionedBucketLong() throws Exception {
+ createUnpartitionedTable(spark, tableName);
+
+ SparkScanBuilder builder = scanBuilder();
+
+ BucketFunction.BucketLong function = new
BucketFunction.BucketLong(DataTypes.LongType);
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(),
+ function.canonicalName(),
+ expressions(LiteralValue.apply(5, DataTypes.IntegerType),
FieldReference.apply("id")));
+ Predicate predicate =
+ new Predicate(">=", expressions(udf, LiteralValue.apply(2,
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+
+ // NOT GTEQ
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(10);
+ }
+
+ @Test
+ public void testPartitionedBucketLong() throws Exception {
+ createPartitionedTable(spark, tableName, "bucket(5, id)");
+
+ SparkScanBuilder builder = scanBuilder();
+
+ BucketFunction.BucketLong function = new
BucketFunction.BucketLong(DataTypes.LongType);
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(),
+ function.canonicalName(),
+ expressions(LiteralValue.apply(5, DataTypes.IntegerType),
FieldReference.apply("id")));
+ Predicate predicate =
+ new Predicate(">=", expressions(udf, LiteralValue.apply(2,
DataTypes.IntegerType)));
+ pushFilters(builder, predicate);
+ Batch scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(6);
+
+ // NOT GTEQ
+ builder = scanBuilder();
+
+ predicate = new Not(predicate);
+ pushFilters(builder, predicate);
+ scan = builder.build().toBatch();
+
+ Assertions.assertThat(scan.planInputPartitions().length).isEqualTo(4);
+ }
+
+ @Test
+ public void testUnpartitionedBucketString() throws Exception {
+ createUnpartitionedTable(spark, tableName);
+
+ SparkScanBuilder builder = scanBuilder();
+
+ BucketFunction.BucketString function = new BucketFunction.BucketString();
+ UserDefinedScalarFunc udf =
+ new UserDefinedScalarFunc(
+ function.name(),
+ function.canonicalName(),
+ expressions(
+ LiteralValue.apply(5, DataTypes.IntegerType),
FieldReference.apply("data")));
Review Comment:
Some helpers would probably make this easier to read and construct, like
`int_lit`, `field_ref`, or `to_udf`:
```java
UserDefinedScalarFunc udf = to_udf(function, expressions(int_lit(5),
ref("data")));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]