aokolnychyi commented on code in PR #7520:
URL: https://github.com/apache/iceberg/pull/7520#discussion_r1185282407


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -103,6 +108,70 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per 
file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, 
tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger 
a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", 
commitTarget());
+          
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    Map<String, String> summary = currentSnapshot.summary();
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle 
block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 
independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      int addedFiles = 
Integer.parseInt(summary.get(SnapshotSummary.ADDED_FILES_PROP));
+      Assert.assertEquals("Must produce 4 files", 4, addedFiles);

Review Comment:
   Let me add something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to