RussellSpitzer commented on code in PR #12893:
URL: https://github.com/apache/iceberg/pull/12893#discussion_r2258304533
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java:
##########
@@ -349,6 +373,141 @@ private void checkMerge(RowLevelOperationMode mode)
throws Exception {
sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
}
+ @TestTemplate
+ public void testCopyOnWriteDeleteWithDeleteFileCacheCacheDisabled() throws
Exception {
+ checkDeleteWithDeleteFilesCacheDisabled(COPY_ON_WRITE);
+ }
+
+ @TestTemplate
+ public void testMergeOnReadDeleteWithDeleteFileCacheDisabled() throws
Exception {
+ checkDeleteWithDeleteFilesCacheDisabled(MERGE_ON_READ);
+ }
+
+ private void checkDeleteWithDeleteFilesCacheDisabled(RowLevelOperationMode
mode)
+ throws Exception {
+ withSQLConf(
+ ImmutableMap.of(
+ SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true",
+ SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "false"),
+ () -> {
+ try {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.DELETE_MODE, mode);
+
+ sql("DELETE FROM %s WHERE id = 1 OR id = 4", targetTableName);
+
+ // When cache is disabled, delete files should be opened multiple
times
+ // The cached CoW test has a maximum of 3 scans, so we expect more
than that when
+ // disabled.
+ // The cached MoR test has a maximum of 1 scan, so we expect more
than that when
+ // disabled.
+ int expectedMinStreamCount = mode == COPY_ON_WRITE ? 4 : 2;
+ assertThat(deleteFiles)
+ .allMatch(deleteFile -> streamCount(deleteFile) >=
expectedMinStreamCount);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @TestTemplate
+ public void testCopyOnWriteUpdateWithDeleteFilesCacheDisabled() throws
Exception {
+ checkUpdateWithDeleteFilesCacheDisabled(COPY_ON_WRITE);
+ }
+
+ @TestTemplate
+ public void testMergeOnReadUpdateWithDeleteFilesCacheDisabled() throws
Exception {
+ checkUpdateWithDeleteFilesCacheDisabled(MERGE_ON_READ);
+ }
+
+ private void checkUpdateWithDeleteFilesCacheDisabled(RowLevelOperationMode
mode)
+ throws Exception {
+ withSQLConf(
+ ImmutableMap.of(
+ SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true",
+ SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "false"),
+ () -> {
+ try {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.UPDATE_MODE, mode);
+
+ Dataset<Integer> updateDS =
spark.createDataset(ImmutableList.of(1, 4), Encoders.INT());
+ updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+ sql(
+ "UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)",
+ targetTableName, UPDATES_VIEW_NAME);
+
+ // When cache is disabled, delete files should be opened multiple
times
+ // Both CoW and MoR should open delete files at least 2 times
without caching
+ int expectedMinStreamCount = 2;
+ assertThat(deleteFiles)
+ .allMatch(deleteFile -> streamCount(deleteFile) >=
expectedMinStreamCount);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(-1, "hr"), row(-1, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @TestTemplate
+ public void testCopyOnWriteMergeWithDeleteFilesCacheDisabled() throws
Exception {
+ checkMergeWithDeleteFilesCacheDisabled(COPY_ON_WRITE);
+ }
+
+ @TestTemplate
+ public void testMergeOnReadMergeWithDeleteFilesCacheDisabled() throws
Exception {
+ checkMergeWithDeleteFilesCacheDisabled(MERGE_ON_READ);
+ }
+
+ private void checkMergeWithDeleteFilesCacheDisabled(RowLevelOperationMode
mode) throws Exception {
+ withSQLConf(
+ ImmutableMap.of(
+ SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true",
+ SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "false"),
+ () -> {
+ try {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.MERGE_MODE, mode);
+
+ Dataset<Integer> updateDS =
spark.createDataset(ImmutableList.of(1, 4), Encoders.INT());
+ updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+ sql(
+ "MERGE INTO %s t USING %s s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id = 100 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (-1, 'unknown')",
+ targetTableName, UPDATES_VIEW_NAME);
+
+ // When the cache is disabled, delete files are opened more often
because each Spark
+ // task reads them.
+ // The cached CoW MERGE test allows up to 3 scans, so we require
at least 4 to confirm
+ // the cache is disabled.
+ // For MoR MERGE, the cached test allows 1 scan, so we require at
least 2 to confirm the
+ // cache is disabled.
+ int expectedMinStreamCount = mode == COPY_ON_WRITE ? 4 : 2;
+ assertThat(deleteFiles)
+ .allMatch(deleteFile -> streamCount(deleteFile) >=
expectedMinStreamCount);
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(100, "hr"), row(100, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ } catch (Exception e) {
Review Comment:
Here too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]