zachdisc commented on code in PR #9731: URL: https://github.com/apache/iceberg/pull/9731#discussion_r1501623001
########## spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java: ########## @@ -466,6 +474,309 @@ public void testRewriteLargeManifestsPartitionedTable() throws IOException { assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2); } + @TestTemplate + public void testRewriteManifestsPartitionedTableWithInvalidSortColumns() throws IOException { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").bucket("c3", 10).build(); + Map<String, String> options = Maps.newHashMap(); + options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion)); + options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, snapshotIdInheritanceEnabled); + Table table = TABLES.create(SCHEMA, spec, options, tableLocation); + + SparkActions actions = SparkActions.get(); + + // c2 is not a partition column, cannot use for sorting + List<String> badSortKeys1 = ImmutableList.of("c1", "c2"); + assertThatThrownBy( + () -> + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .sort(badSortKeys1) + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot use custom sort order to rewrite manifests '[c1, c2]'. All partition columns " + + "must be defined in the current partition spec: 0. Choose from the available " + + "partitionable columns: [c3_bucket, c1]"); + + // c3_bucket is the correct internal partition name to use, c3 is the untransformed column name, + // sort() expects the hidden partition column names + List<String> badSortKeys2 = ImmutableList.of("c1", "c3"); + assertThatThrownBy( + () -> + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .sort(badSortKeys2) + .execute()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Cannot use custom sort order to rewrite manifests '[c1, c3]'. All partition columns " + + "must be defined in the current partition spec: 0. Choose from the available " + + "partitionable columns: [c3_bucket, c1]"); + } + + @TestTemplate + public void testRewriteManifestsPartitionedTableWithCustomSorting() throws IOException { + Random random = new Random(); + + PartitionSpec spec = + PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 3).bucket("c3", 10).build(); + Table table = TABLES.create(SCHEMA, spec, tableLocation); + + List<DataFile> dataFiles = Lists.newArrayList(); + for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) { + dataFiles.add( + newDataFile( + table, + TestHelpers.Row.of( + new Object[] { + fileOrdinal, String.valueOf(random.nextInt() * 100), random.nextInt(10) + }))); + } + ManifestFile appendManifest = writeManifest(table, dataFiles); + table.newFastAppend().appendManifest(appendManifest).commit(); + + List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io()); + assertThat(manifests).as("Should have 1 manifests before rewrite").hasSize(1); + + // Capture the c3 partition's lower and upper bounds - used for later test assertions + Integer c3PartitionMin = + Conversions.fromByteBuffer( + Types.IntegerType.get(), manifests.get(0).partitions().get(2).lowerBound()); + Integer c3PartitionMax = + Conversions.fromByteBuffer( + Types.IntegerType.get(), manifests.get(0).partitions().get(2).upperBound()); + + // Set the target manifest size to a small value to force splitting records into multiple files + table + .updateProperties() + .set( + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + String.valueOf(manifests.get(0).length() / 2)) + .commit(); + + SparkActions actions = SparkActions.get(); + + List<String> manifestSortKeys = ImmutableList.of("c3_bucket", "c2_trunc", "c1"); + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .sort(manifestSortKeys) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + + table.refresh(); + List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io()); + + assertThat(result.rewrittenManifests()).hasSize(1); + assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2); + + assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2); + + // Rewritten manifests are clustered by c3_bucket - each should contain only a subset of the + // lower and upper bounds + // of the partition 'c3'. + List<Pair<Integer, Integer>> c3Boundaries = + newManifests.stream() + .map(manifest -> manifest.partitions().get(2)) + .sorted( + Comparator.comparing( + ptn -> Conversions.fromByteBuffer(Types.IntegerType.get(), ptn.lowerBound()))) + .map( + p -> + Pair.of( + (Integer) + Conversions.fromByteBuffer(Types.IntegerType.get(), p.lowerBound()), + (Integer) + Conversions.fromByteBuffer(Types.IntegerType.get(), p.upperBound()))) + .collect(Collectors.toList()); + + List<Integer> lowers = c3Boundaries.stream().map(t -> t.first()).collect(Collectors.toList()); + List<Integer> uppers = c3Boundaries.stream().map(t -> t.second()).collect(Collectors.toList()); + + // With custom sorting, this looks like + // - manifest 1 -> [lower bound = 0, upper bound = 4] + // - manifest 2 -> [lower bound = 4, upper bound = 9] + // Without the custom sorting, each manifest tracks the full range of c3 upper/lower bounds. + // AKA they look like + // - manifest 1 -> [lower bound = 0, upper bound = 9] + // - manifest 2 -> [lower bound = 0, upper bound = 9] + // So the upper bound of the partitions tracked by the first file should be LEQ the lower bounds + // of the second. Etc + assertThat(uppers.get(0)) + .as("Upper bound of first manifest partition should be LEQ lower bound of second") + .isLessThanOrEqualTo(lowers.get(1)); + + // Each file should contain less than the full c3 partition span + c3Boundaries.forEach( + boundary -> { + assertThat(boundary.second() - boundary.first()) + .as("Manifest should contain less than the full range of c3 bucket partitions") + .isLessThanOrEqualTo(c3PartitionMax - c3PartitionMin); + }); + + // c3's Bucket(10) partition means our true lower bound = 0 and true upper bound is 9. The first + // manifest should + // include the lower bound of 0, and the last should have the upper bound of 9 + assertThat(lowers.get(0)) + .withFailMessage("Lower bound of first manifest partition should be 0") + .isEqualTo(c3PartitionMin); + assertThat(uppers.get(uppers.size() - 1)) + .withFailMessage("Lower bound of first manifest partition should be 0") + .isEqualTo(c3PartitionMax); + } + + @TestTemplate + public void testRewriteManifestsPartitionedTableWithCustomSortFunction() throws IOException { + Random random = new Random(); + + PartitionSpec spec = + PartitionSpec.builderFor(SCHEMA).identity("c1").truncate("c2", 3).bucket("c3", 10).build(); + Table table = TABLES.create(SCHEMA, spec, tableLocation); + + List<DataFile> dataFiles = Lists.newArrayList(); + for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) { + dataFiles.add( + newDataFile( + table, + TestHelpers.Row.of( + new Object[] { + fileOrdinal, String.valueOf(random.nextInt() * 100), random.nextInt(10) + }))); + } + ManifestFile appendManifest = writeManifest(table, dataFiles); + table.newFastAppend().appendManifest(appendManifest).commit(); + + List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io()); + assertThat(manifests).as("Should have 1 manifests before rewrite").hasSize(1); + + // Capture the c3 partition's lower and upper bounds - used for later test assertions + Integer c3PartitionMin = + Conversions.fromByteBuffer( + Types.IntegerType.get(), manifests.get(0).partitions().get(2).lowerBound()); + Integer c3PartitionMax = + Conversions.fromByteBuffer( + Types.IntegerType.get(), manifests.get(0).partitions().get(2).upperBound()); + + // Set the target manifest size to a small value to force splitting records into multiple files + table + .updateProperties() + .set( + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + String.valueOf(manifests.get(0).length() / 3)) + .commit(); + + SparkActions actions = SparkActions.get(); + + // This is the main point of this test! + // Let's say I want to sort manifests into buckets 0-4 and 5 - 9 + // I know my Table and partition Spec, so can do that programmatically + Function<DataFile, String> test = + (Function<DataFile, String> & Serializable) + (dataFile) -> { + StructLike partition = dataFile.partition(); + // Find the ordinal index for the c3 partition column for this data file + int c3Index = + IntStream.range(0, spec.fields().size()) + .filter(i -> spec.fields().get(i).name().contains("c3")) + .findFirst() + .getAsInt(); + Object c3BucketValue = partition.get(c3Index, Object.class); + + // Return one string for the lower values, one for the upper. RewriteManifests + // will cluster datafiles together in manifests according to this value. + return (Integer) c3BucketValue < 5 ? "cluster=LT_5" : "cluster=GTE_5"; + }; + + RewriteManifests.Result result = + actions + .rewriteManifests(table) + .rewriteIf(manifest -> true) + .sort(test) + .option(RewriteManifestsSparkAction.USE_CACHING, useCaching) + .execute(); + + table.refresh(); + List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io()); + + assertThat(result.rewrittenManifests()).hasSize(1); + assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2); + + assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2); + + // Rewritten manifests are clustered by c3_bucket - each should contain only a subset of the + // lower and upper bounds of the partition 'c3'. + List<Pair<Integer, Integer>> c3Boundaries = + newManifests.stream() + .map(manifest -> manifest.partitions().get(2)) + .sorted( + Comparator.comparing( + ptn -> Conversions.fromByteBuffer(Types.IntegerType.get(), ptn.lowerBound()))) + .map( + p -> + Pair.of( + (Integer) + Conversions.fromByteBuffer(Types.IntegerType.get(), p.lowerBound()), + (Integer) + Conversions.fromByteBuffer(Types.IntegerType.get(), p.upperBound()))) + .collect(Collectors.toList()); + + List<Integer> lowers = c3Boundaries.stream().map(t -> t.first()).collect(Collectors.toList()); + List<Integer> uppers = c3Boundaries.stream().map(t -> t.second()).collect(Collectors.toList()); + + // The custom function sorts datafiles by having c3 bucket partitions in the range 0-4 and 5-9. + // Internally, that looks like + // +--------------------+---------------------+ + // |partition |__clustering_column__| + // +--------------------+---------------------+ + // |{0, -531806488, 0} |cluster=LT_5 | + // |{1, 385955472, 7} |cluster=GTE_5 | + // |{2, 604077840, 6} |cluster=GTE_5 | + // |{3, 1875302972, 4} |cluster=LT_5 | + // |{4, -1772544904, 0} |cluster=LT_5 | + // |{5, 172551248, 7} |cluster=GTE_5 | + // ... + // After rewriting, this looks like + // - manifest 1 -> [lower bound = 0, upper bound = 4] + // - manifest 2 -> [lower bound = 5, upper bound = 9] + // Without the custom sorting, each manifest tracks the full range of c3 upper/lower bounds. + // AKA they look like + // - manifest 1 -> [lower bound = 0, upper bound = 9] + // - manifest 2 -> [lower bound = 0, upper bound = 9] + // So the upper bound of the partitions tracked by the first file should be LEQ the lower bounds + // of the second. Etc + assertThat(uppers.get(0)) + .withFailMessage( + "Upper bound of first manifest partition should be LESS THAN the lower bound of second") + .isLessThan(lowers.get(1)); + + // Each file should contain less than the full c3 partition span + c3Boundaries.forEach( + boundary -> { + assertThat(boundary.second() - boundary.first()) + .withFailMessage( + "Manifest should contain less than the full range of c3 bucket partitions") + .isLessThan(c3PartitionMax - c3PartitionMin); + }); + + // c3's Bucket(10) partition means our true lower bound = 0 and true upper bound is 9. The first + // manifest should include the lower bound of 0, and the last should have the upper bound of 9 + assertThat(lowers.get(0)) + .withFailMessage("Lower bound of first manifest c3 bucket partition should be 0") + .isEqualTo(0); + assertThat(uppers.get(0)) + .withFailMessage("Upper bound of first manifest c3 bucket partition should be 4") + .isEqualTo(4); + assertThat(lowers.get(1)) + .withFailMessage("Lower bound of second manifest c3 bucket partition should be 5") + .isEqualTo(5); + assertThat(uppers.get(1)) + .withFailMessage("Upper bound of second manifest c3 bucket partition should be 9") Review Comment: Ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org