szehon-ho commented on code in PR #6760:
URL: https://github.com/apache/iceberg/pull/6760#discussion_r1185695239
##########
spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala:
##########
@@ -37,13 +39,17 @@ object SparkExpressionConverter {
}
@throws[AnalysisException]
- def collectResolvedSparkExpression(session: SparkSession, tableName: String,
where: String): Expression = {
+ def collectResolvedIcebergExpression(session: SparkSession,
+ tableName: String,
+ where: String):
org.apache.iceberg.expressions.Expression = {
val tableAttrs = session.table(tableName).queryExecution.analyzed.output
val unresolvedExpression =
session.sessionState.sqlParser.parseExpression(where)
val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs))
val optimizedLogicalPlan =
session.sessionState.executePlan(filter).optimizedPlan
optimizedLogicalPlan.collectFirst {
- case filter: Filter => filter.condition
+ case filter: Filter => convertToIcebergExpression(filter.condition)
+ case dummyRelation: DummyRelation => Expressions.alwaysTrue()
Review Comment:
For cleaner code, can we return Spark's Expression.TRUE, Expression.FALSE,
and return the convertToIcebergExpression outside?
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -245,6 +245,51 @@ public void testRewriteDataFilesWithFilter() {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithDeterministicTrueFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select all 10 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
+ @Test
+ public void testRewriteDataFilesWithDeterministicFalseFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select only 0 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 0 data files and add 0 data files",
+ row(0, 0),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
Review Comment:
Seems no need for this comment, as we don't assert for bytes.
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -245,6 +245,51 @@ public void testRewriteDataFilesWithFilter() {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithDeterministicTrueFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select all 10 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
+ @Test
+ public void testRewriteDataFilesWithDeterministicFalseFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select only 0 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 0 data files and add 0 data files",
+ row(0, 0),
+ Arrays.copyOf(output.get(0), 2));
Review Comment:
Do we get output=0,0,0? Can we just assert all 3 values instead of first
two in this case?
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -245,6 +245,51 @@ public void testRewriteDataFilesWithFilter() {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithDeterministicTrueFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select all 10 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
Review Comment:
Optional: I think the test is more understandable if we just put row(10, 1,
Long.valueOf(snapshotSummar().get(...)). I do realize its that way in other
tests.
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java:
##########
@@ -245,6 +245,51 @@ public void testRewriteDataFilesWithFilter() {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithDeterministicTrueFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select all 10 files for compaction
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
+ catalogName, tableIdent);
+ assertEquals(
+ "Action should rewrite 10 data files and add 1 data files",
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
+ @Test
+ public void testRewriteDataFilesWithDeterministicFalseFilter() {
+ createTable();
+ // create 10 files under non-partitioned table
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+ // select only 0 files for compaction
Review Comment:
Minor: select no files..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]