stevenzwu commented on code in PR #7161: URL: https://github.com/apache/iceberg/pull/7161#discussion_r1186975564
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner<Integer> { + + static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of range: "; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + Tuple2<Integer, Integer> bucketFieldInfo = + BucketPartitionerUtils.getBucketFieldInfo(partitionSpec); + + this.maxNumBuckets = bucketFieldInfo.f1; + this.currentBucketWriterOffset = new int[this.maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWritersGreaterThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkArgument( + bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must be >= 0)"); Review Comment: Nit: error msg could be `Invalid bucket ID: %s. Must be non-negative.`. note that `Preconditions` supports arg format. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionKeySelector.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.stream.IntStream; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.RowDataWrapper; + +/** + * A {@link KeySelector} that extracts the bucketId from a data row's bucket partition as the key. + * To be used with the {@link BucketPartitioner}. + */ +class BucketPartitionKeySelector implements KeySelector<RowData, Integer> { + + private final Schema schema; + private final PartitionKey partitionKey; + private final RowType flinkSchema; + private final int bucketFieldPosition; + + private transient RowDataWrapper rowDataWrapper; + + BucketPartitionKeySelector(PartitionSpec partitionSpec, Schema schema, RowType flinkSchema) { + Tuple2<Integer, Integer> bucketFieldInfo = + BucketPartitionerUtils.getBucketFieldInfo(partitionSpec); + + int bucketFieldId = bucketFieldInfo.f0; + this.schema = schema; + this.partitionKey = new PartitionKey(partitionSpec, schema); + this.flinkSchema = flinkSchema; + this.bucketFieldPosition = + IntStream.range(0, partitionSpec.fields().size()) + .filter(i -> partitionSpec.fields().get(i).fieldId() == bucketFieldId) + .toArray()[0]; + } + + /** Review Comment: nit: this comment is probably not necessary. also tries to avoid use words like `we` ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.UUID; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.types.Row; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BucketUtil; + +final class TestBucketPartitionerUtils { + + enum TableSchemaType { + ONE_BUCKET, + IDENTITY_AND_BUCKET, + TWO_BUCKETS; + } + + private TestBucketPartitionerUtils() {} + + static final DataFormatConverters.RowConverter CONVERTER = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int numBuckets) { + PartitionSpec partitionSpec = null; + + switch (tableSchemaType) { + case ONE_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", numBuckets).build(); + break; + case IDENTITY_AND_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .identity("id") + .bucket("data", numBuckets) + .build(); + break; + case TWO_BUCKETS: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .bucket("id", numBuckets) + .bucket("data", numBuckets) + .build(); + break; + } + + Preconditions.checkNotNull( + partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType); + return partitionSpec; + } + + /** + * Utility method to generate rows whose values will "hash" to a range of bucketIds (from 0 to + * numBuckets - 1) + * + * @param numRowsPerBucket how many different rows should be generated per bucket + * @param numBuckets max number of buckets to consider + * @return the list of rows whose data "hashes" to the desired bucketId + */ + static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int numBuckets) { + List<Row> rows = Lists.newArrayListWithCapacity(numBuckets * numRowsPerBucket); + // For some of our tests, this order of the generated rows matters + for (int i = 0; i < numRowsPerBucket; i++) { + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + String value = generateValueForBucketId(bucketId, numBuckets); + rows.add(Row.of(1, value)); + } + } + return rows; + } + + /** + * Utility method to generate a UUID string that will "hash" to a desired bucketId + * + * @param bucketId the desired bucketId + * @return the string data that "hashes" to the desired bucketId + */ + private static String generateValueForBucketId(int bucketId, int numBuckets) { + String value = ""; + while (true) { + String uuid = UUID.randomUUID().toString(); + if (computeBucketId(numBuckets, uuid) == bucketId) { + value = uuid; + break; + } + } + return value; + } + + /** + * Utility that performs the same hashing/bucketing mechanism used by Bucket.java Review Comment: looking at Bucket.java. we also need to add test coverage for null bucket id. ``` public Integer apply(T value) { if (value == null) { return null; } return (hash(value) & Integer.MAX_VALUE) % numBuckets; } ``` ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner<Integer> { + + static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of range: "; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + Tuple2<Integer, Integer> bucketFieldInfo = + BucketPartitionerUtils.getBucketFieldInfo(partitionSpec); + + this.maxNumBuckets = bucketFieldInfo.f1; + this.currentBucketWriterOffset = new int[this.maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWritersGreaterThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkArgument( + bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must be >= 0)"); + Preconditions.checkArgument( + bucketId < maxNumBuckets, + BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX Review Comment: nite: error msg could be `Invalid bucket ID: %s. Must be less than bucket limit: %s` ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; expectedIdx++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(expectedIdx); + if (++bucketId == NUM_BUCKETS) { + bucketId = 0; + } + } + } + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismEqualLessThanBuckets( Review Comment: similarly, test dividable and non-dividable ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; expectedIdx++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(expectedIdx); + if (++bucketId == NUM_BUCKETS) { + bucketId = 0; + } + } + } + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismEqualLessThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 30; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int bucketId = 0; bucketId < NUM_BUCKETS; bucketId++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(bucketId % numPartitions); + } + } + + @ParameterizedTest Review Comment: parameterized test seems unnecessary here ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER; +import static org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionerFlinkIcebergSink { + + private static final int NUMBER_TASK_MANAGERS = 1; + private static final int SLOTS_PER_TASK_MANAGER = 8; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension catalogExtension = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + private static final TypeInformation<Row> ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + // Parallelism = 8 (parallelism > numBuckets) throughout the test suite + private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + private final FileFormat format = FileFormat.PARQUET; + private final int numBuckets = 4; + + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private void setupEnvironment(TableSchemaType tableSchemaType) { + table = getTable(tableSchemaType); + env = + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); + tableLoader = catalogExtension.tableLoader(); + } + + private Table getTable(TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, numBuckets); + + return catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + } + + private List<RowData> convertToRowData(List<Row> rows) { + return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); + } + + private BoundedTestSource<Row> createBoundedSource(List<Row> rows) { + return new BoundedTestSource<>(rows.toArray(new Row[0])); + } + + private TableTestStats extractTableTestStats(TableSchemaType tableSchemaType) throws IOException { + int totalRecordCount = 0; + Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); // <BucketId, List<WriterId>> + Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId, NumFiles> + Map<Integer, Long> recordsPerFile = new TreeMap<>(); // <WriterId, NumRecords> + + try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) { + for (FileScanTask scanTask : fileScanTasks) { + long recordCountInFile = scanTask.file().recordCount(); + + String[] splitFilePath = scanTask.file().path().toString().split("/"); + String filename = splitFilePath[splitFilePath.length - 1]; + int writerId = Integer.parseInt(filename.split("-")[0]); + + totalRecordCount += recordCountInFile; + int bucketId = + scanTask + .file() + .partition() + .get(tableSchemaType == TableSchemaType.ONE_BUCKET ? 0 : 1, Integer.class); Review Comment: > tableSchemaType == TableSchemaType.ONE_BUCKET ? 0 : 1 this code is not intuitive. you can move it inside the enum class. maybe expose a method like `bucketPartitionColumnPosition()` ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER; +import static org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionerFlinkIcebergSink { + + private static final int NUMBER_TASK_MANAGERS = 1; + private static final int SLOTS_PER_TASK_MANAGER = 8; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension catalogExtension = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + private static final TypeInformation<Row> ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + // Parallelism = 8 (parallelism > numBuckets) throughout the test suite + private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + private final FileFormat format = FileFormat.PARQUET; + private final int numBuckets = 4; + + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private void setupEnvironment(TableSchemaType tableSchemaType) { + table = getTable(tableSchemaType); + env = + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); + tableLoader = catalogExtension.tableLoader(); + } + + private Table getTable(TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, numBuckets); + + return catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + } + + private List<RowData> convertToRowData(List<Row> rows) { + return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); + } + + private BoundedTestSource<Row> createBoundedSource(List<Row> rows) { + return new BoundedTestSource<>(rows.toArray(new Row[0])); + } + + private TableTestStats extractTableTestStats(TableSchemaType tableSchemaType) throws IOException { Review Comment: move the util method after the `@Test` methods ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/HadoopCatalogExtension.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class HadoopCatalogExtension + implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback { + protected final String database; + protected final String tableName; + + protected Path temporaryFolder; Review Comment: this can be replaced with `@TempDir private File tempDir;`. then we can remove the beforeAll and afterAll callbacks ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner<Integer> { + + static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of range: "; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + Tuple2<Integer, Integer> bucketFieldInfo = + BucketPartitionerUtils.getBucketFieldInfo(partitionSpec); + + this.maxNumBuckets = bucketFieldInfo.f1; + this.currentBucketWriterOffset = new int[this.maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWritersGreaterThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkArgument( + bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must be >= 0)"); + Preconditions.checkArgument( + bucketId < maxNumBuckets, + BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + + bucketId + + " (must be >= 0), maxNumBuckets: " + + maxNumBuckets); + + if (numPartitions <= maxNumBuckets) { + return bucketId % numPartitions; + } else { + return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions); + } + } + + /*- + * If the number of writers > the number of buckets each partitioner will keep a state of multiple + * writers per bucket as evenly as possible, and will round-robin the requests across them, in this + * case each writer will target only one bucket at all times (many writers -> one bucket). Example: + * Configuration: numPartitions (writers) = 5, maxBuckets = 2 + * Expected behavior: + * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4 + * - Records for Bucket 1 will always use Writer 1 and 3 + * Notes: + * - maxNumWritersPerBucket determines when to reset the currentBucketWriterOffset to 0 for this bucketId + * - When numPartitions is not evenly divisible by maxBuckets, some buckets will have one more writer (extraWriter). + * In this example Bucket 0 has an "extra writer" to consider before resetting its offset to 0. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition index (writer) to use for each request Review Comment: nit: `@return the destination partition index (writer subtask id)`. `to use for each request` is a bit inaccurate ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.UUID; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.types.Row; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BucketUtil; + +final class TestBucketPartitionerUtils { + + enum TableSchemaType { + ONE_BUCKET, + IDENTITY_AND_BUCKET, + TWO_BUCKETS; + } + + private TestBucketPartitionerUtils() {} + + static final DataFormatConverters.RowConverter CONVERTER = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int numBuckets) { Review Comment: it is probably bit cleaner to move it into the enum class ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner<Integer> { + + static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of range: "; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + Tuple2<Integer, Integer> bucketFieldInfo = + BucketPartitionerUtils.getBucketFieldInfo(partitionSpec); + + this.maxNumBuckets = bucketFieldInfo.f1; + this.currentBucketWriterOffset = new int[this.maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWritersGreaterThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkArgument( + bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must be >= 0)"); + Preconditions.checkArgument( + bucketId < maxNumBuckets, + BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + + bucketId + + " (must be >= 0), maxNumBuckets: " + + maxNumBuckets); + + if (numPartitions <= maxNumBuckets) { + return bucketId % numPartitions; + } else { + return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions); + } + } + + /*- + * If the number of writers > the number of buckets each partitioner will keep a state of multiple + * writers per bucket as evenly as possible, and will round-robin the requests across them, in this + * case each writer will target only one bucket at all times (many writers -> one bucket). Example: + * Configuration: numPartitions (writers) = 5, maxBuckets = 2 + * Expected behavior: + * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4 + * - Records for Bucket 1 will always use Writer 1 and 3 + * Notes: + * - maxNumWritersPerBucket determines when to reset the currentBucketWriterOffset to 0 for this bucketId + * - When numPartitions is not evenly divisible by maxBuckets, some buckets will have one more writer (extraWriter). + * In this example Bucket 0 has an "extra writer" to consider before resetting its offset to 0. + * + * @param bucketId the bucketId for each request Review Comment: it seems to me that the two `@param` are not useful. maybe remove them. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner<Integer> { + + static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of range: "; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + Tuple2<Integer, Integer> bucketFieldInfo = + BucketPartitionerUtils.getBucketFieldInfo(partitionSpec); + + this.maxNumBuckets = bucketFieldInfo.f1; + this.currentBucketWriterOffset = new int[this.maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWritersGreaterThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkArgument( + bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must be >= 0)"); + Preconditions.checkArgument( + bucketId < maxNumBuckets, + BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + + bucketId + + " (must be >= 0), maxNumBuckets: " + + maxNumBuckets); + + if (numPartitions <= maxNumBuckets) { + return bucketId % numPartitions; + } else { + return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions); + } + } + + /*- + * If the number of writers > the number of buckets each partitioner will keep a state of multiple Review Comment: very nice explanation here ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.UUID; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.types.Row; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BucketUtil; + +final class TestBucketPartitionerUtils { + + enum TableSchemaType { + ONE_BUCKET, + IDENTITY_AND_BUCKET, + TWO_BUCKETS; + } + + private TestBucketPartitionerUtils() {} + + static final DataFormatConverters.RowConverter CONVERTER = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int numBuckets) { + PartitionSpec partitionSpec = null; + + switch (tableSchemaType) { + case ONE_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", numBuckets).build(); + break; + case IDENTITY_AND_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .identity("id") + .bucket("data", numBuckets) + .build(); + break; + case TWO_BUCKETS: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .bucket("id", numBuckets) + .bucket("data", numBuckets) + .build(); + break; + } + + Preconditions.checkNotNull( + partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType); + return partitionSpec; + } + + /** + * Utility method to generate rows whose values will "hash" to a range of bucketIds (from 0 to + * numBuckets - 1) + * + * @param numRowsPerBucket how many different rows should be generated per bucket + * @param numBuckets max number of buckets to consider + * @return the list of rows whose data "hashes" to the desired bucketId + */ + static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int numBuckets) { Review Comment: it should return `RowData` right? You can use `GenericRowData`. Then we don't need `CONVERTER ` ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner<Integer> { + + static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of range: "; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket + private final int[] currentBucketWriterOffset; + + BucketPartitioner(PartitionSpec partitionSpec) { + Tuple2<Integer, Integer> bucketFieldInfo = + BucketPartitionerUtils.getBucketFieldInfo(partitionSpec); + + this.maxNumBuckets = bucketFieldInfo.f1; + this.currentBucketWriterOffset = new int[this.maxNumBuckets]; + } + + /** + * Determine the partition id based on the following criteria: If the number of writers <= the + * number of buckets, an evenly distributed number of buckets will be assigned to each writer (one + * writer -> many buckets). Conversely, if the number of writers > the number of buckets the logic + * is handled by the {@link #getPartitionWritersGreaterThanBuckets + * getPartitionWritersGreaterThanBuckets} method. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition id (writer) to use for each request + */ + @Override + public int partition(Integer bucketId, int numPartitions) { + Preconditions.checkArgument( + bucketId >= 0, BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + bucketId + " (must be >= 0)"); + Preconditions.checkArgument( + bucketId < maxNumBuckets, + BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX + + bucketId + + " (must be >= 0), maxNumBuckets: " + + maxNumBuckets); + + if (numPartitions <= maxNumBuckets) { + return bucketId % numPartitions; + } else { + return getPartitionWritersGreaterThanBuckets(bucketId, numPartitions); + } + } + + /*- + * If the number of writers > the number of buckets each partitioner will keep a state of multiple + * writers per bucket as evenly as possible, and will round-robin the requests across them, in this + * case each writer will target only one bucket at all times (many writers -> one bucket). Example: + * Configuration: numPartitions (writers) = 5, maxBuckets = 2 + * Expected behavior: + * - Records for Bucket 0 will be "round robin" between Writers 0, 2 and 4 + * - Records for Bucket 1 will always use Writer 1 and 3 + * Notes: + * - maxNumWritersPerBucket determines when to reset the currentBucketWriterOffset to 0 for this bucketId + * - When numPartitions is not evenly divisible by maxBuckets, some buckets will have one more writer (extraWriter). + * In this example Bucket 0 has an "extra writer" to consider before resetting its offset to 0. + * + * @param bucketId the bucketId for each request + * @param numPartitions the total number of partitions + * @return the partition index (writer) to use for each request + */ + private int getPartitionWritersGreaterThanBuckets(int bucketId, int numPartitions) { Review Comment: nit: I would call this `partitionWithMoreWritersThanBuckets` ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitioner.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * This partitioner will redirect records to writers deterministically based on the Bucket partition + * spec. It'll attempt to optimize the file size written depending on whether numPartitions is + * greater, less or equal than the maxNumBuckets. Note: The current implementation only supports ONE + * bucket in the partition spec. + */ +class BucketPartitioner implements Partitioner<Integer> { + + static final String BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX = "bucketId out of range: "; + + private final int maxNumBuckets; + + // To hold the OFFSET of the next writer to use for any bucket Review Comment: nit: add to the comment that this is only used for the case where the number of buckets is less than the writer parallelism ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.UUID; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.types.Row; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BucketUtil; + +final class TestBucketPartitionerUtils { + + enum TableSchemaType { + ONE_BUCKET, + IDENTITY_AND_BUCKET, + TWO_BUCKETS; + } + + private TestBucketPartitionerUtils() {} + + static final DataFormatConverters.RowConverter CONVERTER = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int numBuckets) { + PartitionSpec partitionSpec = null; + + switch (tableSchemaType) { + case ONE_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", numBuckets).build(); + break; + case IDENTITY_AND_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .identity("id") + .bucket("data", numBuckets) + .build(); + break; + case TWO_BUCKETS: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .bucket("id", numBuckets) + .bucket("data", numBuckets) + .build(); + break; + } + + Preconditions.checkNotNull( + partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType); + return partitionSpec; + } + + /** + * Utility method to generate rows whose values will "hash" to a range of bucketIds (from 0 to + * numBuckets - 1) + * + * @param numRowsPerBucket how many different rows should be generated per bucket + * @param numBuckets max number of buckets to consider + * @return the list of rows whose data "hashes" to the desired bucketId + */ + static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int numBuckets) { + List<Row> rows = Lists.newArrayListWithCapacity(numBuckets * numRowsPerBucket); + // For some of our tests, this order of the generated rows matters + for (int i = 0; i < numRowsPerBucket; i++) { + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + String value = generateValueForBucketId(bucketId, numBuckets); + rows.add(Row.of(1, value)); + } + } + return rows; + } + + /** + * Utility method to generate a UUID string that will "hash" to a desired bucketId + * + * @param bucketId the desired bucketId + * @return the string data that "hashes" to the desired bucketId + */ + private static String generateValueForBucketId(int bucketId, int numBuckets) { + String value = ""; + while (true) { + String uuid = UUID.randomUUID().toString(); + if (computeBucketId(numBuckets, uuid) == bucketId) { + value = uuid; + break; Review Comment: nit: just return the uuid? avoided the need of value variable. ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerUtils.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.util.List; +import java.util.UUID; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.types.Row; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BucketUtil; + +final class TestBucketPartitionerUtils { + + enum TableSchemaType { + ONE_BUCKET, + IDENTITY_AND_BUCKET, + TWO_BUCKETS; + } + + private TestBucketPartitionerUtils() {} + + static final DataFormatConverters.RowConverter CONVERTER = + new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes()); + + static PartitionSpec getPartitionSpec(TableSchemaType tableSchemaType, int numBuckets) { + PartitionSpec partitionSpec = null; + + switch (tableSchemaType) { + case ONE_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).bucket("data", numBuckets).build(); + break; + case IDENTITY_AND_BUCKET: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .identity("id") + .bucket("data", numBuckets) + .build(); + break; + case TWO_BUCKETS: + partitionSpec = + PartitionSpec.builderFor(SimpleDataUtil.SCHEMA) + .bucket("id", numBuckets) + .bucket("data", numBuckets) + .build(); + break; + } + + Preconditions.checkNotNull( + partitionSpec, "Invalid tableSchemaType provided: " + tableSchemaType); + return partitionSpec; + } + + /** + * Utility method to generate rows whose values will "hash" to a range of bucketIds (from 0 to + * numBuckets - 1) + * + * @param numRowsPerBucket how many different rows should be generated per bucket + * @param numBuckets max number of buckets to consider + * @return the list of rows whose data "hashes" to the desired bucketId + */ + static List<Row> generateRowsForBucketIdRange(int numRowsPerBucket, int numBuckets) { + List<Row> rows = Lists.newArrayListWithCapacity(numBuckets * numRowsPerBucket); + // For some of our tests, this order of the generated rows matters + for (int i = 0; i < numRowsPerBucket; i++) { + for (int bucketId = 0; bucketId < numBuckets; bucketId++) { + String value = generateValueForBucketId(bucketId, numBuckets); + rows.add(Row.of(1, value)); + } + } + return rows; + } + + /** + * Utility method to generate a UUID string that will "hash" to a desired bucketId + * + * @param bucketId the desired bucketId + * @return the string data that "hashes" to the desired bucketId + */ + private static String generateValueForBucketId(int bucketId, int numBuckets) { + String value = ""; + while (true) { + String uuid = UUID.randomUUID().toString(); + if (computeBucketId(numBuckets, uuid) == bucketId) { + value = uuid; + break; + } + } + return value; Review Comment: nit: Iceberg style adds an empty line after a control block `}` ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; expectedIdx++) { Review Comment: move `bucketId` initialization before the for loop. would make it easier to see the for loop ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( Review Comment: I feel we need to test two separate scenarios: dividable or non-dividable ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + Review Comment: nit: empty lines in line 40 and 43 seems unnecessary ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; expectedIdx++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); Review Comment: unless very obvious, Iceberg style doesn't use acronym. so maybe change `actualIdx` to `actualPartitionIndex`. ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; expectedIdx++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(expectedIdx); + if (++bucketId == NUM_BUCKETS) { + bucketId = 0; + } + } + } + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismEqualLessThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 30; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int bucketId = 0; bucketId < NUM_BUCKETS; bucketId++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(bucketId % numPartitions); + } + } + + @ParameterizedTest + @EnumSource(value = TestBucketPartitionerUtils.TableSchemaType.class, names = "TWO_BUCKETS") + public void testPartitionerMultipleBucketsFail( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> new BucketPartitioner(partitionSpec)) + .withMessageContaining(BucketPartitionerUtils.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE); + } + + @ParameterizedTest Review Comment: parameterized test seems unnecessary here ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; expectedIdx++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(expectedIdx); + if (++bucketId == NUM_BUCKETS) { + bucketId = 0; + } + } + } + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismEqualLessThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 30; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int bucketId = 0; bucketId < NUM_BUCKETS; bucketId++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(bucketId % numPartitions); + } + } + + @ParameterizedTest + @EnumSource(value = TestBucketPartitionerUtils.TableSchemaType.class, names = "TWO_BUCKETS") + public void testPartitionerMultipleBucketsFail( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + Assertions.assertThatExceptionOfType(RuntimeException.class) + .isThrownBy(() -> new BucketPartitioner(partitionSpec)) + .withMessageContaining(BucketPartitionerUtils.BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE); + } + + @ParameterizedTest + @ValueSource(ints = {-1, NUM_BUCKETS}) + public void testPartitionerBucketIdOutOfRangeFail(int bucketId) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec( + TestBucketPartitionerUtils.TableSchemaType.ONE_BUCKET, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> bucketPartitioner.partition(bucketId, 1)) + .withMessageContaining(BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX); Review Comment: nit: I haven't seen Iceberg code use constant string for unit test error message. maybe also assert the exact error msg. ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitioner.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.sink.BucketPartitioner.BUCKET_OUT_OF_RANGE_MESSAGE_PREFIX; + +import org.apache.iceberg.PartitionSpec; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestBucketPartitioner { + + static final int NUM_BUCKETS = 60; + + @ParameterizedTest + @EnumSource( + value = TestBucketPartitionerUtils.TableSchemaType.class, + names = {"ONE_BUCKET", "IDENTITY_AND_BUCKET"}) + public void testPartitioningParallelismGreaterThanBuckets( + TestBucketPartitionerUtils.TableSchemaType tableSchemaType) { + final int numPartitions = 500; + + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, NUM_BUCKETS); + + BucketPartitioner bucketPartitioner = new BucketPartitioner(partitionSpec); + + for (int expectedIdx = 0, bucketId = 0; expectedIdx < numPartitions; expectedIdx++) { + int actualIdx = bucketPartitioner.partition(bucketId, numPartitions); + Assertions.assertThat(actualIdx).isEqualTo(expectedIdx); + if (++bucketId == NUM_BUCKETS) { Review Comment: Iceberg style doesn't use `++` with side effect. you can use the ternary `? : ` operator here ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER; +import static org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionerFlinkIcebergSink { + + private static final int NUMBER_TASK_MANAGERS = 1; + private static final int SLOTS_PER_TASK_MANAGER = 8; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension catalogExtension = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + private static final TypeInformation<Row> ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + // Parallelism = 8 (parallelism > numBuckets) throughout the test suite + private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + private final FileFormat format = FileFormat.PARQUET; + private final int numBuckets = 4; + + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private void setupEnvironment(TableSchemaType tableSchemaType) { + table = getTable(tableSchemaType); + env = + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); + tableLoader = catalogExtension.tableLoader(); + } + + private Table getTable(TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, numBuckets); + + return catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + } + + private List<RowData> convertToRowData(List<Row> rows) { + return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); + } + + private BoundedTestSource<Row> createBoundedSource(List<Row> rows) { Review Comment: this util method seems not necessary. it is better to stay with `RowData` ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER; +import static org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionerFlinkIcebergSink { + + private static final int NUMBER_TASK_MANAGERS = 1; + private static final int SLOTS_PER_TASK_MANAGER = 8; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension catalogExtension = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + private static final TypeInformation<Row> ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + // Parallelism = 8 (parallelism > numBuckets) throughout the test suite + private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + private final FileFormat format = FileFormat.PARQUET; + private final int numBuckets = 4; + + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private void setupEnvironment(TableSchemaType tableSchemaType) { + table = getTable(tableSchemaType); + env = + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); + tableLoader = catalogExtension.tableLoader(); + } + + private Table getTable(TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, numBuckets); + + return catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + } + + private List<RowData> convertToRowData(List<Row> rows) { + return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); + } + + private BoundedTestSource<Row> createBoundedSource(List<Row> rows) { + return new BoundedTestSource<>(rows.toArray(new Row[0])); + } + + private TableTestStats extractTableTestStats(TableSchemaType tableSchemaType) throws IOException { + int totalRecordCount = 0; + Map<Integer, List<Integer>> writersPerBucket = Maps.newHashMap(); // <BucketId, List<WriterId>> + Map<Integer, Integer> filesPerBucket = Maps.newHashMap(); // <BucketId, NumFiles> + Map<Integer, Long> recordsPerFile = new TreeMap<>(); // <WriterId, NumRecords> + + try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) { + for (FileScanTask scanTask : fileScanTasks) { + long recordCountInFile = scanTask.file().recordCount(); + + String[] splitFilePath = scanTask.file().path().toString().split("/"); + String filename = splitFilePath[splitFilePath.length - 1]; + int writerId = Integer.parseInt(filename.split("-")[0]); Review Comment: We can also disable checkpoint so that there is only one flush/commit in the test. case 1: number of buckets (like 2) is less than the number of writers (like 5). with keyBy, there will be two files committed. with bucket partitioner, there should be 5 files. case 2: number of buckets (like 4) is less than the number of writers (like 2). both keyBy and bucket partition will write 4 files. this is the case where we need the extra complexity of paring the writer id. hopefully/likely, the keyBy will result in unbalanced assignment. e.g. one writer handles 3 buckets while the other handles 1 bucket. that will demonstrate the benefit and expected behavior of bucket partitioner ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestBucketPartitionerFlinkIcebergSink.java: ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.flink.MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; +import static org.apache.iceberg.flink.TestFixtures.TABLE_IDENTIFIER; +import static org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.CONVERTER; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.sink.TestBucketPartitionerUtils.TableSchemaType; +import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class TestBucketPartitionerFlinkIcebergSink { + + private static final int NUMBER_TASK_MANAGERS = 1; + private static final int SLOTS_PER_TASK_MANAGER = 8; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension catalogExtension = + new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE); + + private static final TypeInformation<Row> ROW_TYPE_INFO = + new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes()); + + // Parallelism = 8 (parallelism > numBuckets) throughout the test suite + private final int parallelism = NUMBER_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; + private final FileFormat format = FileFormat.PARQUET; + private final int numBuckets = 4; + + private Table table; + private StreamExecutionEnvironment env; + private TableLoader tableLoader; + + private void setupEnvironment(TableSchemaType tableSchemaType) { + table = getTable(tableSchemaType); + env = + StreamExecutionEnvironment.getExecutionEnvironment(DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism * 2); + tableLoader = catalogExtension.tableLoader(); + } + + private Table getTable(TableSchemaType tableSchemaType) { + PartitionSpec partitionSpec = + TestBucketPartitionerUtils.getPartitionSpec(tableSchemaType, numBuckets); + + return catalogExtension + .catalog() + .createTable( + TABLE_IDENTIFIER, + SimpleDataUtil.SCHEMA, + partitionSpec, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + } + + private List<RowData> convertToRowData(List<Row> rows) { + return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList()); + } + + private BoundedTestSource<Row> createBoundedSource(List<Row> rows) { + return new BoundedTestSource<>(rows.toArray(new Row[0])); + } + + private TableTestStats extractTableTestStats(TableSchemaType tableSchemaType) throws IOException { Review Comment: also the method name is not intuitive. it seems like `PartitionResult` to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
