zachdisc commented on code in PR #9731: URL: https://github.com/apache/iceberg/pull/9731#discussion_r1501621548
########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java: ########## @@ -466,6 +471,163 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2); } + @TestTemplate + public void testRewriteManifestsPartitionedTableWithInvalidSortColumnsThowsException() + throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").bucket("c3", 10).build(); + Map<String, String> options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + SparkActions actions = SparkActions.get(); + + // c2 is not a partition column, cannot use for sorting + List<String> badSortKeys1 = ImmutableList.of("c1", "c2"); + assertThatThrownBy( + () -> + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .sort(badSortKeys1) + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .message() + .contains("Cannot use custom sort order"); + + // c3_bucket is the correct internal partition name to use, c3 is the untransformed colum name, + // sort() expects + // the hidden partition column names + List<String> badSortKeys2 = ImmutableList.of("c1", "c3"); + assertThatThrownBy( + () -> + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .sort(badSortKeys2) + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .message() + .contains("Cannot use custom sort order"); + } + + @TestTemplate + public void testRewriteManifestsPartitionedTableWithCustomSorting() throws IOException { + Random random = new Random(); + + PartitionSpec spec = + PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 3).bucket("c3", 10).build(); + Map<String, String> options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + List<DataFile> dataFiles = Lists.newArrayList(); + for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) { + dataFiles.add( + newDataFile( + table, + TestHelpers.Row.of( + new Object[] { + fileOrdinal, String.valueOf(random.nextInt() * 100), random.nextInt(10) + }))); + } + ManifestFile appendManifest = writeManifest(table, dataFiles); + table.newFastAppend().appendManifest(appendManifest).commit(); + + List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io()); + assertThat(manifests).as("Should have 1 manifests before rewrite").hasSize(1); + + // Capture the c3 partition's lower and upper bounds - used for later test assertions + Integer c3PartitionMin = + Conversions.fromByteBuffer( + Types.IntegerType.get(), manifests.get(0).partitions().get(2).lowerBound()); + Integer c3PartitionMax = + Conversions.fromByteBuffer( + Types.IntegerType.get(), manifests.get(0).partitions().get(2).upperBound()); + + // Set the target manifest size to a small value to force splitting records into multiple files + table + .updateProperties() + .set( + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + String.valueOf(manifests.get(0).length() / 2)) + .commit(); + + SparkActions actions = SparkActions.get(); + + List<String> manifestSortKeys = ImmutableList.of("c3_bucket", "c2_trunc", "c1"); + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .sort(manifestSortKeys) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + + table.refresh(); + List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io()); + + assertThat(result.rewrittenManifests()).hasSize(1); + assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2); + + assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2); + + // Rewritten manifests are clustered by c3_bucket - each should contain only a subset of the + // lower and upper bounds + // of the partition 'c3'. Review Comment: Ugh. The spotlessApply build actions are great but I miss when they break things like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org