nastra commented on code in PR #9731:
URL: https://github.com/apache/iceberg/pull/9731#discussion_r1500422229


##########
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java:
##########
@@ -466,6 +471,163 @@ public void testRewriteLargeManifestsPartitionedTable() 
throws IOException {
     assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2);
   }
 
+  @TestTemplate
+  public void 
testRewriteManifestsPartitionedTableWithInvalidSortColumnsThowsException()
+      throws IOException {
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").bucket("c3", 10).build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    SparkActions actions = SparkActions.get();
+
+    //  c2 is not a partition column, cannot use for sorting
+    List<String> badSortKeys1 = ImmutableList.of("c1", "c2");
+    assertThatThrownBy(
+            () ->
+                actions
+                    .rewriteManifests(table)
+                    .rewriteIf(manifest -> true)
+                    .sort(badSortKeys1)
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .message()
+        .contains("Cannot use custom sort order");
+
+    // c3_bucket is the correct internal partition name to use, c3 is the 
untransformed colum name,
+    // sort() expects
+    // the hidden partition column names
+    List<String> badSortKeys2 = ImmutableList.of("c1", "c3");
+    assertThatThrownBy(
+            () ->
+                actions
+                    .rewriteManifests(table)
+                    .rewriteIf(manifest -> true)
+                    .sort(badSortKeys2)
+                    .execute())
+        .isInstanceOf(IllegalArgumentException.class)
+        .message()
+        .contains("Cannot use custom sort order");
+  }
+
+  @TestTemplate
+  public void testRewriteManifestsPartitionedTableWithCustomSorting() throws 
IOException {
+    Random random = new Random();
+
+    PartitionSpec spec =
+        PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 
3).bucket("c3", 10).build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    List<DataFile> dataFiles = Lists.newArrayList();
+    for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
+      dataFiles.add(
+          newDataFile(
+              table,
+              TestHelpers.Row.of(
+                  new Object[] {
+                    fileOrdinal, String.valueOf(random.nextInt() * 100), 
random.nextInt(10)
+                  })));
+    }
+    ManifestFile appendManifest = writeManifest(table, dataFiles);
+    table.newFastAppend().appendManifest(appendManifest).commit();
+
+    List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
+    assertThat(manifests).as("Should have 1 manifests before 
rewrite").hasSize(1);
+
+    // Capture the c3 partition's lower and upper bounds - used for later test 
assertions
+    Integer c3PartitionMin =
+        Conversions.fromByteBuffer(
+            Types.IntegerType.get(), 
manifests.get(0).partitions().get(2).lowerBound());
+    Integer c3PartitionMax =
+        Conversions.fromByteBuffer(
+            Types.IntegerType.get(), 
manifests.get(0).partitions().get(2).upperBound());
+
+    // Set the target manifest size to a small value to force splitting 
records into multiple files
+    table
+        .updateProperties()
+        .set(
+            TableProperties.MANIFEST_TARGET_SIZE_BYTES,
+            String.valueOf(manifests.get(0).length() / 2))
+        .commit();
+
+    SparkActions actions = SparkActions.get();
+
+    List<String> manifestSortKeys = ImmutableList.of("c3_bucket", "c2_trunc", 
"c1");
+    RewriteManifests.Result result =
+        actions
+            .rewriteManifests(table)
+            .rewriteIf(manifest -> true)
+            .sort(manifestSortKeys)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .execute();
+
+    table.refresh();
+    List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
+
+    assertThat(result.rewrittenManifests()).hasSize(1);
+    assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2);
+
+    assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2);
+
+    // Rewritten manifests are clustered by c3_bucket - each should contain 
only a subset of the
+    // lower and upper bounds
+    // of the partition 'c3'.
+    List<Pair<Integer, Integer>> c3Boundaries =
+        newManifests.stream()
+            .map(manifest -> manifest.partitions().get(2))
+            .sorted(
+                Comparator.comparing(
+                    ptn -> Conversions.fromByteBuffer(Types.IntegerType.get(), 
ptn.lowerBound())))
+            .map(
+                p ->
+                    Pair.of(
+                        (Integer)
+                            
Conversions.fromByteBuffer(Types.IntegerType.get(), p.lowerBound()),
+                        (Integer)
+                            
Conversions.fromByteBuffer(Types.IntegerType.get(), p.upperBound())))
+            .collect(Collectors.toList());
+
+    List<Integer> lowers = c3Boundaries.stream().map(t -> 
t.first()).collect(Collectors.toList());
+    List<Integer> uppers = c3Boundaries.stream().map(t -> 
t.second()).collect(Collectors.toList());
+
+    // With custom sorting, this looks like
+    // - manifest 1 -> [lower bound = 0, upper bound = 4]
+    // - manifest 2 -> [lower bound = 4, upper bound = 9]
+    // Without the custom sorting, each manifest tracks the full range of c3 
upper/lower bounds.
+    // AKA they look like
+    // - manifest 1 -> [lower bound = 0, upper bound = 9]
+    // - manifest 2 -> [lower bound = 0, upper bound = 9]
+    // So the upper bound of the partitions tracked by the first file should 
be LEQ the lower bounds
+    // of the second. Etc
+    assertThat(uppers.get(0))
+        .withFailMessage(
+            "Upper bound of first manifest partition should be LEQ lower bound 
of second")
+        .isLessThanOrEqualTo(lowers.get(1));
+
+    // Each file should contain less than the full c3 partition span
+    c3Boundaries.forEach(
+        boundary -> {
+          assertThat(boundary.second() - boundary.first())
+              .withFailMessage(

Review Comment:
   same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to