stevenzwu commented on code in PR #13900: URL: https://github.com/apache/iceberg/pull/13900#discussion_r2298591187
########## flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestRangePartitionerSkew.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestRangePartitionerSkew { + private static final Logger LOG = LoggerFactory.getLogger(TestRangePartitionerSkew.class); + + // change the iterations to a larger number (like 100) to see the statistics of max skew. + // like min, max, avg, stddev of max skew. + private static final int ITERATIONS = 1; + + /** + * @param parallelism number of partitions + * @param maxSkewUpperBound the upper bound of max skew. maxSkewUpperBound is set to a loose bound + * (~5x of the max value) to avoid flakiness. + * <p> + * <li>parallelism 8: max skew statistics over 100 iterations: mean = 0.0130, stddev = 0.0041, + * min = 0.0058, max = 0.0236 + * <li>parallelism 32: max skew statistics over 100 iterations: mean = 0.0188, stddev = + * 0.0046, min = 0.0106, max = 0.0312 + */ + @ParameterizedTest + @CsvSource({"8, 100_000, 0.1", "32, 400_000, 0.15"}) + public void testMapStatisticsSkewWithLongTailDistribution( + int parallelism, int sampleSize, double maxSkewUpperBound) { + Schema schema = + new Schema(Types.NestedField.optional(1, "event_hour", Types.IntegerType.get())); + SortOrder sortOrder = SortOrder.builderFor(schema).asc("event_hour").build(); + Comparator<StructLike> comparator = SortOrderComparators.forSchema(schema, sortOrder); + SortKey sortKey = new SortKey(schema, sortOrder); + + NavigableMap<Integer, Long> weights = + DataDistributionUtil.longTailDistribution(100_000, 24, 240, 100, 2.0, 0.7); + Map<SortKey, Long> mapStatistics = + DataDistributionUtil.mapStatisticsWithLongTailDistribution(weights, sortKey); + MapAssignment mapAssignment = + MapAssignment.fromKeyFrequency(parallelism, mapStatistics, 0.0, comparator); + MapRangePartitioner partitioner = new MapRangePartitioner(schema, sortOrder, mapAssignment); + + List<Integer> keys = Lists.newArrayList(weights.keySet().iterator()); + long[] weightsCDF = DataDistributionUtil.computeCumulativeWeights(keys, weights); + long totalWeight = weightsCDF[weightsCDF.length - 1]; + + // change the iterations to a larger number (like 100) to see the statistics of max skew. + // like min, max, avg, stddev of max skew. + double[] maxSkews = new double[ITERATIONS]; + for (int iteration = 0; iteration < ITERATIONS; ++iteration) { + int[] recordsPerTask = new int[parallelism]; + for (int i = 0; i < sampleSize; ++i) { + // randomly pick a key according to the weight distribution + long weight = ThreadLocalRandom.current().nextLong(totalWeight); + int index = DataDistributionUtil.binarySearchIndex(weightsCDF, weight); + RowData row = GenericRowData.of(keys.get(index)); + int subtaskId = partitioner.partition(row, parallelism); + recordsPerTask[subtaskId] += 1; + } + + org.apache.flink.calcite.shaded.com.google.common.math.Stats recordsPerTaskStats = Review Comment: Without full path, the checkstyle will complain that shaded class should be used. But the Stats class is not included in the Iceberg shaded guava jar. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
