stevenzwu commented on code in PR #9321:
URL: https://github.com/apache/iceberg/pull/9321#discussion_r1432236943


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * Internal partitioner implementation that supports MapDataStatistics, which 
is typically used for
+ * low-cardinality use cases. While MapDataStatistics can keep accurate 
counters, it can't be used
+ * for high-cardinality use cases. Otherwise, the memory footprint is too high.
+ */
+class MapRangePartitioner implements Partitioner<RowData> {
+  private final RowDataWrapper rowDataWrapper;
+  private final SortKey sortKey;
+  private final Comparator<StructLike> comparator;
+  private final Map<SortKey, Long> mapStatistics;
+  private final double closeFileCostInWeightPercentage;
+
+  // lazily computed due to the need of numPartitions
+  private Map<SortKey, KeyAssignment> assignment;
+  private NavigableMap<SortKey, Long> sortedStatsWithCloseFileCost;
+
+  MapRangePartitioner(
+      Schema schema,
+      SortOrder sortOrder,
+      MapDataStatistics dataStatistics,
+      double closeFileCostInWeightPercentage) {
+    this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
+    this.sortKey = new SortKey(schema, sortOrder);
+    this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+    this.mapStatistics = dataStatistics.statistics();
+    this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
+  }
+
+  @Override
+  public int partition(RowData row, int numPartitions) {
+    // assignment table can only be built lazily when first referenced here,
+    // because number of partitions (downstream subtasks) is needed
+    Map<SortKey, KeyAssignment> assignmentMap = assignment(numPartitions);
+    // reuse the sortKey and rowDataWrapper
+    sortKey.wrap(rowDataWrapper.wrap(row));
+    KeyAssignment keyAssignment = assignmentMap.get(sortKey);
+    if (keyAssignment == null) {
+      // haven't learned about the key before. fall back to random selection.
+      return ThreadLocalRandom.current().nextInt(numPartitions);
+    }
+
+    return keyAssignment.select();
+  }
+
+  @VisibleForTesting
+  Map<SortKey, KeyAssignment> assignment(int numPartitions) {
+    if (assignment == null) {
+      long totalWeight = mapStatistics.values().stream().mapToLong(l -> 
l).sum();
+      double targetWeightPerSubtask = ((double) totalWeight) / numPartitions;
+      long closeFileCostInWeight =
+          (long) Math.ceil(targetWeightPerSubtask * 
closeFileCostInWeightPercentage / 100);
+
+      // add one close file cost for each key even if a key with large weight 
may be assigned to
+      // multiple subtasks
+      this.sortedStatsWithCloseFileCost = Maps.newTreeMap(comparator);
+      mapStatistics.forEach(
+          (k, v) -> {
+            int estimatedSplits = (int) Math.ceil(v / targetWeightPerSubtask);
+            long estimatedCloseFileCost = closeFileCostInWeight * 
estimatedSplits;
+            sortedStatsWithCloseFileCost.put(k, v + estimatedCloseFileCost);
+          });
+
+      long totalWeightWithCloseFileCost =
+          sortedStatsWithCloseFileCost.values().stream().mapToLong(l -> 
l).sum();
+      long targetWeightPerSubtaskWithCloseFileCost =
+          (long) Math.ceil(((double) totalWeightWithCloseFileCost) / 
numPartitions);
+      this.assignment =
+          buildAssignment(
+              numPartitions, sortedStatsWithCloseFileCost, 
targetWeightPerSubtaskWithCloseFileCost);
+    }
+
+    return assignment;
+  }
+
+  @VisibleForTesting
+  NavigableMap<SortKey, Long> sortedStatsWithCloseFileCost() {
+    return sortedStatsWithCloseFileCost;
+  }
+
+  /**
+   * @return assignment summary for every subtask. Key is subtaskId. Value 
pair is (weight assigned
+   *     to the subtask, number of keys assigned to the subtask)
+   */
+  Map<Integer, Pair<Long, Integer>> assignmentInfo() {
+    Map<Integer, Pair<Long, Integer>> assignmentInfo = Maps.newTreeMap();
+    assignment.forEach(
+        (key, keyAssignment) -> {
+          for (int i = 0; i < keyAssignment.assignedSubtasks.length; ++i) {
+            int subtaskId = keyAssignment.assignedSubtasks[i];
+            long subtaskWeight = keyAssignment.subtaskWeights[i];
+            Pair<Long, Integer> oldValue = 
assignmentInfo.getOrDefault(subtaskId, Pair.of(0L, 0));
+            assignmentInfo.put(
+                subtaskId, Pair.of(oldValue.first() + subtaskWeight, 
oldValue.second() + 1));
+          }
+        });
+
+    return assignmentInfo;
+  }
+
+  private Map<SortKey, KeyAssignment> buildAssignment(
+      int numPartitions,
+      NavigableMap<SortKey, Long> sortedStatistics,
+      long targetWeightPerSubtask) {
+    Map<SortKey, KeyAssignment> assignmentMap =
+        Maps.newHashMapWithExpectedSize(sortedStatistics.size());
+    Iterator<SortKey> mapKeyIterator = sortedStatistics.keySet().iterator();
+    int subtaskId = 0;
+    SortKey currentKey = null;
+    long keyRemainingWeight = 0L;
+    long subtaskRemainingWeight = targetWeightPerSubtask;
+    List<Integer> assignedSubtasks = Lists.newArrayList();
+    List<Long> subtaskWeights = Lists.newArrayList();
+    while (mapKeyIterator.hasNext() && subtaskId < numPartitions) {
+      if (currentKey == null) {
+        currentKey = mapKeyIterator.next();
+        keyRemainingWeight = sortedStatistics.get(currentKey);
+      }
+
+      assignedSubtasks.add(subtaskId);
+      // assign the remaining weight of key to the current subtask if it is 
the last subtask
+      // or if the subtask has more capacity than the remaining key weight
+      if (subtaskId == numPartitions - 1 || keyRemainingWeight < 
subtaskRemainingWeight) {
+        subtaskWeights.add(keyRemainingWeight);
+        subtaskRemainingWeight -= keyRemainingWeight;
+        keyRemainingWeight = 0;
+      } else {
+        // filled up the current subtask
+        subtaskWeights.add(subtaskRemainingWeight);
+        keyRemainingWeight -= subtaskRemainingWeight;
+        // move on to the next subtask
+        subtaskId += 1;
+        subtaskRemainingWeight = targetWeightPerSubtask;
+      }
+
+      if (keyRemainingWeight <= 0) {
+        // finishing up the assignment for the current key
+        Preconditions.checkState(
+            assignedSubtasks.size() == subtaskWeights.size(),
+            "List size mismatch: assigned subtasks = %s, subtask weights = %s",
+            assignedSubtasks,
+            subtaskWeights);
+        KeyAssignment keyAssignment =
+            new KeyAssignment(
+                ArrayUtil.toIntArray(assignedSubtasks), 
ArrayUtil.toLongArray(subtaskWeights));

Review Comment:
   I didn't quite get the example and notations here. can you elaborate?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Pair;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMapRangePartitioner {
+  private static final SortOrder SORT_ORDER =
+      SortOrder.builderFor(TestFixtures.SCHEMA).asc("data").build();
+
+  private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, 
SORT_ORDER);
+  private static final RowType ROW_TYPE = 
FlinkSchemaUtil.convert(TestFixtures.SCHEMA);
+
+  private static final RowDataWrapper KEY_0_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_0 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_1_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_1 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_2_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_2 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_3_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_3 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_4_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_4 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_5_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_5 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_6_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_6 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_7_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_7 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_8_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_8 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_9_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_9 = SORT_KEY.copy();
+
+  static {
+    SORT_KEY_0.wrap(
+        KEY_0_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k0"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_1.wrap(
+        KEY_1_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k1"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_2.wrap(
+        KEY_2_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k2"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_3.wrap(
+        KEY_3_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k3"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_4.wrap(
+        KEY_4_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k4"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_5.wrap(
+        KEY_5_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k5"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_6.wrap(
+        KEY_6_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k6"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_7.wrap(
+        KEY_7_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k7"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_8.wrap(
+        KEY_8_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k8"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_9.wrap(
+        KEY_9_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k9"), 1, 
StringData.fromString("2023-06-20"))));
+  }
+
+  // Total weight is 80
+  private final MapDataStatistics mapDataStatistics =
+      new MapDataStatistics(
+          ImmutableMap.of(
+              SORT_KEY_0,
+              35L,
+              SORT_KEY_1,
+              23L,
+              SORT_KEY_2,
+              12L,
+              SORT_KEY_3,
+              4L,
+              SORT_KEY_4,
+              1L,
+              SORT_KEY_5,
+              1L,
+              SORT_KEY_6,
+              1L,
+              SORT_KEY_7,
+              1L,
+              SORT_KEY_8,
+              1L,
+              SORT_KEY_9,
+              1L));
+
+  @Test
+  public void testEvenlyDividableNoClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 0.0);
+    int numPartitions = 8;
+
+    // each task should get targeted weight of 10 (=80/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {10L, 10L, 10L, 5L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {5L, 10L, 8L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{2L, 10L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{4L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight, second is the number of 
assigned keys
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(10L, 1),
+            1,
+            Pair.of(10L, 1),
+            2,
+            Pair.of(10L, 1),
+            3,
+            Pair.of(10L, 2),
+            4,
+            Pair.of(10L, 1),
+            5,
+            Pair.of(10L, 2),
+            6,
+            Pair.of(10L, 1),
+            7,
+            Pair.of(10L, 7));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testEvenlyDividable20PercentClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 20.0);
+    int numPartitions = 8;
+
+    // target subtask weight is 10 before close file cost factored in.
+    // close file cost is 2 = 20% * 10.
+    // key weights before and after close file cost factored in
+    // before:     35, 23, 12, 4, 1, 1, 1, 1, 1, 1
+    // close-cost:  8,  6,  4, 2, 2, 2, 2, 2, 2, 2
+    // after:      43, 29, 16, 6, 3, 3, 3, 3, 3, 3
+    // target subtask weight per subtask is 14 (112/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {14L, 14L, 14L, 1L}),

Review Comment:
   here are the calculations
   * SORT_KEY_0's adjusted weight is 43 now  = 35 + 8
   * adjusted target weight is 14 = 112 / 8, where 112 is sum of adjusted 
weights
   * 43 split with target weight of 14 is 14, 14, 14, 1
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * Internal partitioner implementation that supports MapDataStatistics, which 
is typically used for
+ * low-cardinality use cases. While MapDataStatistics can keep accurate 
counters, it can't be used
+ * for high-cardinality use cases. Otherwise, the memory footprint is too high.
+ */
+class MapRangePartitioner implements Partitioner<RowData> {
+  private final RowDataWrapper rowDataWrapper;
+  private final SortKey sortKey;
+  private final Comparator<StructLike> comparator;
+  private final Map<SortKey, Long> mapStatistics;
+  private final double closeFileCostInWeightPercentage;
+
+  // lazily computed due to the need of numPartitions
+  private Map<SortKey, KeyAssignment> assignment;
+  private NavigableMap<SortKey, Long> sortedStatsWithCloseFileCost;
+
+  MapRangePartitioner(
+      Schema schema,
+      SortOrder sortOrder,
+      MapDataStatistics dataStatistics,
+      double closeFileCostInWeightPercentage) {
+    this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
+    this.sortKey = new SortKey(schema, sortOrder);
+    this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+    this.mapStatistics = dataStatistics.statistics();
+    this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
+  }
+
+  @Override
+  public int partition(RowData row, int numPartitions) {
+    // assignment table can only be built lazily when first referenced here,
+    // because number of partitions (downstream subtasks) is needed
+    Map<SortKey, KeyAssignment> assignmentMap = assignment(numPartitions);
+    // reuse the sortKey and rowDataWrapper
+    sortKey.wrap(rowDataWrapper.wrap(row));
+    KeyAssignment keyAssignment = assignmentMap.get(sortKey);
+    if (keyAssignment == null) {
+      // haven't learned about the key before. fall back to random selection.
+      return ThreadLocalRandom.current().nextInt(numPartitions);
+    }
+
+    return keyAssignment.select();
+  }
+
+  @VisibleForTesting
+  Map<SortKey, KeyAssignment> assignment(int numPartitions) {
+    if (assignment == null) {
+      long totalWeight = mapStatistics.values().stream().mapToLong(l -> 
l).sum();
+      double targetWeightPerSubtask = ((double) totalWeight) / numPartitions;
+      long closeFileCostInWeight =
+          (long) Math.ceil(targetWeightPerSubtask * 
closeFileCostInWeightPercentage / 100);
+
+      // add one close file cost for each key even if a key with large weight 
may be assigned to
+      // multiple subtasks
+      this.sortedStatsWithCloseFileCost = Maps.newTreeMap(comparator);
+      mapStatistics.forEach(
+          (k, v) -> {
+            int estimatedSplits = (int) Math.ceil(v / targetWeightPerSubtask);
+            long estimatedCloseFileCost = closeFileCostInWeight * 
estimatedSplits;
+            sortedStatsWithCloseFileCost.put(k, v + estimatedCloseFileCost);
+          });
+
+      long totalWeightWithCloseFileCost =
+          sortedStatsWithCloseFileCost.values().stream().mapToLong(l -> 
l).sum();
+      long targetWeightPerSubtaskWithCloseFileCost =
+          (long) Math.ceil(((double) totalWeightWithCloseFileCost) / 
numPartitions);
+      this.assignment =
+          buildAssignment(
+              numPartitions, sortedStatsWithCloseFileCost, 
targetWeightPerSubtaskWithCloseFileCost);
+    }
+
+    return assignment;
+  }
+
+  @VisibleForTesting
+  NavigableMap<SortKey, Long> sortedStatsWithCloseFileCost() {
+    return sortedStatsWithCloseFileCost;
+  }
+
+  /**
+   * @return assignment summary for every subtask. Key is subtaskId. Value 
pair is (weight assigned
+   *     to the subtask, number of keys assigned to the subtask)
+   */
+  Map<Integer, Pair<Long, Integer>> assignmentInfo() {
+    Map<Integer, Pair<Long, Integer>> assignmentInfo = Maps.newTreeMap();
+    assignment.forEach(
+        (key, keyAssignment) -> {
+          for (int i = 0; i < keyAssignment.assignedSubtasks.length; ++i) {
+            int subtaskId = keyAssignment.assignedSubtasks[i];
+            long subtaskWeight = keyAssignment.subtaskWeights[i];
+            Pair<Long, Integer> oldValue = 
assignmentInfo.getOrDefault(subtaskId, Pair.of(0L, 0));
+            assignmentInfo.put(
+                subtaskId, Pair.of(oldValue.first() + subtaskWeight, 
oldValue.second() + 1));
+          }
+        });
+
+    return assignmentInfo;
+  }
+
+  private Map<SortKey, KeyAssignment> buildAssignment(
+      int numPartitions,
+      NavigableMap<SortKey, Long> sortedStatistics,
+      long targetWeightPerSubtask) {
+    Map<SortKey, KeyAssignment> assignmentMap =
+        Maps.newHashMapWithExpectedSize(sortedStatistics.size());
+    Iterator<SortKey> mapKeyIterator = sortedStatistics.keySet().iterator();
+    int subtaskId = 0;
+    SortKey currentKey = null;
+    long keyRemainingWeight = 0L;
+    long subtaskRemainingWeight = targetWeightPerSubtask;
+    List<Integer> assignedSubtasks = Lists.newArrayList();
+    List<Long> subtaskWeights = Lists.newArrayList();
+    while (mapKeyIterator.hasNext() && subtaskId < numPartitions) {
+      if (currentKey == null) {
+        currentKey = mapKeyIterator.next();
+        keyRemainingWeight = sortedStatistics.get(currentKey);
+      }
+
+      assignedSubtasks.add(subtaskId);
+      // assign the remaining weight of key to the current subtask if it is 
the last subtask
+      // or if the subtask has more capacity than the remaining key weight
+      if (subtaskId == numPartitions - 1 || keyRemainingWeight < 
subtaskRemainingWeight) {

Review Comment:
   > If I understand correctly keyRemainingWeight < subtaskRemainingWeight 
should always be true for subtaskId == numPartitions - 1.
   
   not sure I fully understand the comment. this is an `or` condition. fully 
assign the remaining key weight to the subtask
   
   1. if it is the last subtask
   2. (or) if the weight is less than the subtask remaining capacity



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+
+/**
+ * Internal partitioner implementation that supports MapDataStatistics, which 
is typically used for
+ * low-cardinality use cases. While MapDataStatistics can keep accurate 
counters, it can't be used
+ * for high-cardinality use cases. Otherwise, the memory footprint is too high.
+ */
+class MapRangePartitioner implements Partitioner<RowData> {
+  private final RowDataWrapper rowDataWrapper;
+  private final SortKey sortKey;
+  private final Comparator<StructLike> comparator;
+  private final Map<SortKey, Long> mapStatistics;
+  private final double closeFileCostInWeightPercentage;
+
+  // lazily computed due to the need of numPartitions
+  private Map<SortKey, KeyAssignment> assignment;
+  private NavigableMap<SortKey, Long> sortedStatsWithCloseFileCost;
+
+  MapRangePartitioner(
+      Schema schema,
+      SortOrder sortOrder,
+      MapDataStatistics dataStatistics,
+      double closeFileCostInWeightPercentage) {
+    this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), 
schema.asStruct());
+    this.sortKey = new SortKey(schema, sortOrder);
+    this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
+    this.mapStatistics = dataStatistics.statistics();
+    this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage;
+  }
+
+  @Override
+  public int partition(RowData row, int numPartitions) {
+    // assignment table can only be built lazily when first referenced here,
+    // because number of partitions (downstream subtasks) is needed
+    Map<SortKey, KeyAssignment> assignmentMap = assignment(numPartitions);
+    // reuse the sortKey and rowDataWrapper
+    sortKey.wrap(rowDataWrapper.wrap(row));
+    KeyAssignment keyAssignment = assignmentMap.get(sortKey);
+    if (keyAssignment == null) {
+      // haven't learned about the key before. fall back to random selection.
+      return ThreadLocalRandom.current().nextInt(numPartitions);

Review Comment:
   Great point and question.
   
   I see multiple options here and we have to choose the least evil.
   
   1. random as implemented current, which is prone to OOM as you pointed out
   2. `sortKey.hashCode() % numPartitions`, which is probably what you meant by 
hash. this can also
   3. direct forward. subtask x -> subtask x
   
   I agree that hash is better than random on avoiding OOM, even though hash is 
prone to unbalanced traffic distribution. all traffic for the new key go to a 
single subtask, which can lead to back pressure. back pressure is probably 
better than OOM.
   
   I am wondering if #3 direct forwarding is better than #2 hash. We will have 
more files for the new key. the benefit is that the traffic for the new key is 
evenly spread across downstream subtasks.
   
   



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Pair;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMapRangePartitioner {
+  private static final SortOrder SORT_ORDER =
+      SortOrder.builderFor(TestFixtures.SCHEMA).asc("data").build();
+
+  private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, 
SORT_ORDER);
+  private static final RowType ROW_TYPE = 
FlinkSchemaUtil.convert(TestFixtures.SCHEMA);
+
+  private static final RowDataWrapper KEY_0_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_0 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_1_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());

Review Comment:
   good suggestion. will do



##########
jmh.gradle:
##########
@@ -21,10 +21,15 @@ if (jdkVersion != '8' && jdkVersion != '11' && jdkVersion 
!= '17') {
   throw new GradleException("The JMH benchamrks must be run with JDK 8 or JDK 
11 or JDK 17")
 }
 
+def flinkVersions = (System.getProperty("flinkVersions") != null ? 
System.getProperty("flinkVersions") : 
System.getProperty("defaultFlinkVersions")).split(",")
 def sparkVersions = (System.getProperty("sparkVersions") != null ? 
System.getProperty("sparkVersions") : 
System.getProperty("defaultSparkVersions")).split(",")
 def scalaVersion = System.getProperty("scalaVersion") != null ? 
System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
 def jmhProjects = [project(":iceberg-core")]
 
+if (flinkVersions.contains("1.17")) {

Review Comment:
   I think we can continue the initial PR on 1.17. will port to 1.16 and 1.18. 
don't see much difference on which version we work on first. this is incomplete 
feature anyway.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Pair;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMapRangePartitioner {
+  private static final SortOrder SORT_ORDER =
+      SortOrder.builderFor(TestFixtures.SCHEMA).asc("data").build();
+
+  private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, 
SORT_ORDER);
+  private static final RowType ROW_TYPE = 
FlinkSchemaUtil.convert(TestFixtures.SCHEMA);
+
+  private static final RowDataWrapper KEY_0_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_0 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_1_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_1 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_2_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_2 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_3_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_3 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_4_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_4 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_5_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_5 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_6_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_6 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_7_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_7 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_8_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_8 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_9_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_9 = SORT_KEY.copy();
+
+  static {
+    SORT_KEY_0.wrap(
+        KEY_0_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k0"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_1.wrap(
+        KEY_1_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k1"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_2.wrap(
+        KEY_2_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k2"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_3.wrap(
+        KEY_3_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k3"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_4.wrap(
+        KEY_4_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k4"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_5.wrap(
+        KEY_5_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k5"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_6.wrap(
+        KEY_6_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k6"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_7.wrap(
+        KEY_7_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k7"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_8.wrap(
+        KEY_8_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k8"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_9.wrap(
+        KEY_9_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k9"), 1, 
StringData.fromString("2023-06-20"))));
+  }
+
+  // Total weight is 80
+  private final MapDataStatistics mapDataStatistics =
+      new MapDataStatistics(
+          ImmutableMap.of(
+              SORT_KEY_0,
+              35L,
+              SORT_KEY_1,
+              23L,
+              SORT_KEY_2,
+              12L,
+              SORT_KEY_3,
+              4L,
+              SORT_KEY_4,
+              1L,
+              SORT_KEY_5,
+              1L,
+              SORT_KEY_6,
+              1L,
+              SORT_KEY_7,
+              1L,
+              SORT_KEY_8,
+              1L,
+              SORT_KEY_9,
+              1L));
+
+  @Test
+  public void testEvenlyDividableNoClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 0.0);
+    int numPartitions = 8;
+
+    // each task should get targeted weight of 10 (=80/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {10L, 10L, 10L, 5L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {5L, 10L, 8L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{2L, 10L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{4L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight, second is the number of 
assigned keys
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(10L, 1),
+            1,
+            Pair.of(10L, 1),
+            2,
+            Pair.of(10L, 1),
+            3,
+            Pair.of(10L, 2),
+            4,
+            Pair.of(10L, 1),
+            5,
+            Pair.of(10L, 2),
+            6,
+            Pair.of(10L, 1),
+            7,
+            Pair.of(10L, 7));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testEvenlyDividable20PercentClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 20.0);
+    int numPartitions = 8;
+
+    // target subtask weight is 10 before close file cost factored in.
+    // close file cost is 2 = 20% * 10.
+    // key weights before and after close file cost factored in
+    // before:     35, 23, 12, 4, 1, 1, 1, 1, 1, 1
+    // close-cost:  8,  6,  4, 2, 2, 2, 2, 2, 2, 2
+    // after:      43, 29, 16, 6, 3, 3, 3, 3, 3, 3
+    // target subtask weight per subtask is 14 (112/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {14L, 14L, 14L, 1L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {13L, 14L, 2L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{12L, 4L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] 
{6L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] 
{3L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{1L, 2L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight for the subtask, second is the 
number of keys
+    // assigned to the subtask
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(14L, 1),
+            1,
+            Pair.of(14L, 1),
+            2,
+            Pair.of(14L, 1),
+            3,
+            Pair.of(14L, 2),
+            4,
+            Pair.of(14L, 1),
+            5,
+            Pair.of(14L, 2),
+            6,
+            Pair.of(14L, 4),
+            7,
+            Pair.of(14L, 5));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testNonDividableNoClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 0.0);
+    int numPartitions = 9;
+
+    // each task should get targeted weight of 9 = ceiling(80/9)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {9L, 9L, 9L, 8L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {3, 4, 5, 6}, new long[] {1L, 9L, 9L, 4L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{5L, 7L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] 
{2L, 2L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight, second is the number of 
assigned keys
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(9L, 1),
+            1,
+            Pair.of(9L, 1),
+            2,
+            Pair.of(9L, 1),
+            3,
+            Pair.of(9L, 2),
+            4,
+            Pair.of(9L, 1),
+            5,
+            Pair.of(9L, 1),
+            6,
+            Pair.of(9L, 2),
+            7,
+            Pair.of(9L, 2),
+            8,
+            Pair.of(8L, 7));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testNonDividable20PercentClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 20.0);
+    int numPartitions = 9;
+
+    // target subtask weight is 9 before close file cost factored in.
+    // close file cost is 2 (= 20% * 9) per file.
+    // key weights before and after close file cost factored in
+    // before:     35, 23, 12, 4, 1, 1, 1, 1, 1, 1
+    // close-cost:  8,  6,  4, 2, 2, 2, 2, 2, 2, 2
+    // after:      43, 29, 16, 6, 3, 3, 3, 3, 3, 3
+    // target subtask weight per subtask is 13 (112/9)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {13L, 13L, 13L, 4L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {9L, 13L, 7L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{6L, 10L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{3L, 3L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] 
{1L, 2L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{3L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{3L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight for the subtask, second is the 
number of keys
+    // assigned to the subtask
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(13L, 1),
+            1,
+            Pair.of(13L, 1),
+            2,
+            Pair.of(13L, 1),
+            3,
+            Pair.of(13L, 2),
+            4,
+            Pair.of(13L, 1),
+            5,
+            Pair.of(13L, 2),
+            6,
+            Pair.of(13L, 2),
+            7,
+            Pair.of(13L, 5),
+            8,
+            Pair.of(8L, 3));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  private static Map<Integer, Pair<AtomicLong, Set<RowData>>> runPartitioner(
+      MapRangePartitioner partitioner, int numPartitions) {
+    // The Map key is the subtaskId.
+    // For the map value pair, the first element is the count of assigned and
+    // the second element of Set<String> is for the set of assigned keys.
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults = 
Maps.newHashMap();
+    partitioner
+        .sortedStatsWithCloseFileCost()
+        .forEach(
+            (sortKey, weight) -> {
+              String key = sortKey.get(0, String.class);
+              // run 100x times of the weight
+              long iterations = weight * 100;
+              for (int i = 0; i < iterations; ++i) {
+                RowData rowData =
+                    GenericRowData.of(
+                        StringData.fromString(key), 1, 
StringData.fromString("2023-06-20"));
+                // TODO
+                int subtaskId = partitioner.partition(rowData, numPartitions);
+                partitionResults.computeIfAbsent(
+                    subtaskId, k -> Pair.of(new AtomicLong(0), 
Sets.newHashSet()));
+                Pair<AtomicLong, Set<RowData>> pair = 
partitionResults.get(subtaskId);
+                pair.first().incrementAndGet();
+                // TODO
+                pair.second().add(rowData);
+              }
+            });
+    return partitionResults;
+  }
+
+  private void validatePartitionResults(
+      Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo,
+      Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults) {
+
+    
Assertions.assertThat(partitionResults.size()).isEqualTo(expectedAssignmentInfo.size());
+
+    List<Integer> expectedAssignedKeyCounts =
+        Lists.newArrayListWithExpectedSize(expectedAssignmentInfo.size());
+    List<Integer> actualAssignedKeyCounts =
+        Lists.newArrayListWithExpectedSize(partitionResults.size());
+    List<Double> expectedNormalizedWeights =
+        Lists.newArrayListWithExpectedSize(expectedAssignmentInfo.size());
+    List<Double> actualNormalizedWeights =
+        Lists.newArrayListWithExpectedSize(partitionResults.size());
+
+    long expectedTotalWeight =
+        expectedAssignmentInfo.values().stream().mapToLong(Pair::first).sum();
+    expectedAssignmentInfo.forEach(
+        (subtaskId, pair) -> {
+          expectedAssignedKeyCounts.add(pair.second());
+          expectedNormalizedWeights.add(pair.first().doubleValue() / 
expectedTotalWeight);
+        });
+
+    long actualTotalWeight =
+        partitionResults.values().stream().mapToLong(pair -> 
pair.first().longValue()).sum();
+    partitionResults.forEach(
+        (subtaskId, pair) -> {
+          actualAssignedKeyCounts.add(pair.second().size());
+          actualNormalizedWeights.add(pair.first().doubleValue() / 
actualTotalWeight);
+        });
+
+    // number of assigned keys should match exactly
+    Assertions.assertThat(actualAssignedKeyCounts)
+        .as("the number of assigned keys should match for every subtask")
+        .isEqualTo(expectedAssignedKeyCounts);
+
+    System.out.println("------------------------------------");
+    // weight for every subtask shouldn't differ for more than 10% relative to 
the expected weight
+    for (int subtaskId = 0; subtaskId < expectedNormalizedWeights.size(); 
++subtaskId) {
+      double expectedWeight = expectedNormalizedWeights.get(subtaskId);
+      double maxDriftPercentage = 10.0;
+      double min = expectedWeight * (1 - maxDriftPercentage / 100);
+      double max = expectedWeight * (1 + maxDriftPercentage / 100);
+      System.out.println(

Review Comment:
   this was used during debugging. will remove



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Pair;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMapRangePartitioner {
+  private static final SortOrder SORT_ORDER =
+      SortOrder.builderFor(TestFixtures.SCHEMA).asc("data").build();
+
+  private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, 
SORT_ORDER);
+  private static final RowType ROW_TYPE = 
FlinkSchemaUtil.convert(TestFixtures.SCHEMA);
+
+  private static final RowDataWrapper KEY_0_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_0 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_1_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_1 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_2_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_2 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_3_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_3 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_4_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_4 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_5_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_5 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_6_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_6 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_7_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_7 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_8_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_8 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_9_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_9 = SORT_KEY.copy();
+
+  static {
+    SORT_KEY_0.wrap(
+        KEY_0_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k0"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_1.wrap(
+        KEY_1_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k1"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_2.wrap(
+        KEY_2_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k2"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_3.wrap(
+        KEY_3_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k3"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_4.wrap(
+        KEY_4_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k4"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_5.wrap(
+        KEY_5_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k5"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_6.wrap(
+        KEY_6_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k6"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_7.wrap(
+        KEY_7_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k7"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_8.wrap(
+        KEY_8_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k8"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_9.wrap(
+        KEY_9_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k9"), 1, 
StringData.fromString("2023-06-20"))));
+  }
+
+  // Total weight is 80
+  private final MapDataStatistics mapDataStatistics =
+      new MapDataStatistics(
+          ImmutableMap.of(
+              SORT_KEY_0,
+              35L,
+              SORT_KEY_1,
+              23L,
+              SORT_KEY_2,
+              12L,
+              SORT_KEY_3,
+              4L,
+              SORT_KEY_4,
+              1L,
+              SORT_KEY_5,
+              1L,
+              SORT_KEY_6,
+              1L,
+              SORT_KEY_7,
+              1L,
+              SORT_KEY_8,
+              1L,
+              SORT_KEY_9,
+              1L));
+
+  @Test
+  public void testEvenlyDividableNoClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 0.0);
+    int numPartitions = 8;
+
+    // each task should get targeted weight of 10 (=80/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {10L, 10L, 10L, 5L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {5L, 10L, 8L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{2L, 10L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{4L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight, second is the number of 
assigned keys
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(10L, 1),
+            1,
+            Pair.of(10L, 1),
+            2,
+            Pair.of(10L, 1),
+            3,
+            Pair.of(10L, 2),
+            4,
+            Pair.of(10L, 1),
+            5,
+            Pair.of(10L, 2),
+            6,
+            Pair.of(10L, 1),
+            7,
+            Pair.of(10L, 7));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testEvenlyDividable20PercentClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 20.0);
+    int numPartitions = 8;
+
+    // target subtask weight is 10 before close file cost factored in.
+    // close file cost is 2 = 20% * 10.
+    // key weights before and after close file cost factored in
+    // before:     35, 23, 12, 4, 1, 1, 1, 1, 1, 1
+    // close-cost:  8,  6,  4, 2, 2, 2, 2, 2, 2, 2
+    // after:      43, 29, 16, 6, 3, 3, 3, 3, 3, 3
+    // target subtask weight per subtask is 14 (112/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {14L, 14L, 14L, 1L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {13L, 14L, 2L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{12L, 4L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] 
{6L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] 
{3L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{1L, 2L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight for the subtask, second is the 
number of keys
+    // assigned to the subtask
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(14L, 1),
+            1,
+            Pair.of(14L, 1),
+            2,
+            Pair.of(14L, 1),
+            3,
+            Pair.of(14L, 2),
+            4,
+            Pair.of(14L, 1),
+            5,
+            Pair.of(14L, 2),
+            6,
+            Pair.of(14L, 4),
+            7,
+            Pair.of(14L, 5));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testNonDividableNoClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 0.0);
+    int numPartitions = 9;
+
+    // each task should get targeted weight of 9 = ceiling(80/9)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {9L, 9L, 9L, 8L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {3, 4, 5, 6}, new long[] {1L, 9L, 9L, 4L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{5L, 7L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] 
{2L, 2L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight, second is the number of 
assigned keys
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(9L, 1),
+            1,
+            Pair.of(9L, 1),
+            2,
+            Pair.of(9L, 1),
+            3,
+            Pair.of(9L, 2),
+            4,
+            Pair.of(9L, 1),
+            5,
+            Pair.of(9L, 1),
+            6,
+            Pair.of(9L, 2),
+            7,
+            Pair.of(9L, 2),
+            8,
+            Pair.of(8L, 7));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testNonDividable20PercentClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 20.0);
+    int numPartitions = 9;
+
+    // target subtask weight is 9 before close file cost factored in.
+    // close file cost is 2 (= 20% * 9) per file.
+    // key weights before and after close file cost factored in
+    // before:     35, 23, 12, 4, 1, 1, 1, 1, 1, 1
+    // close-cost:  8,  6,  4, 2, 2, 2, 2, 2, 2, 2
+    // after:      43, 29, 16, 6, 3, 3, 3, 3, 3, 3
+    // target subtask weight per subtask is 13 (112/9)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {13L, 13L, 13L, 4L}),

Review Comment:
   here the weights are adjusted with closing file cost



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java:
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.Pair;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestMapRangePartitioner {
+  private static final SortOrder SORT_ORDER =
+      SortOrder.builderFor(TestFixtures.SCHEMA).asc("data").build();
+
+  private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, 
SORT_ORDER);
+  private static final RowType ROW_TYPE = 
FlinkSchemaUtil.convert(TestFixtures.SCHEMA);
+
+  private static final RowDataWrapper KEY_0_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_0 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_1_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_1 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_2_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_2 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_3_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_3 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_4_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_4 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_5_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_5 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_6_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_6 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_7_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_7 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_8_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_8 = SORT_KEY.copy();
+  private static final RowDataWrapper KEY_9_WRAPPER =
+      new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
+  private static final SortKey SORT_KEY_9 = SORT_KEY.copy();
+
+  static {
+    SORT_KEY_0.wrap(
+        KEY_0_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k0"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_1.wrap(
+        KEY_1_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k1"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_2.wrap(
+        KEY_2_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k2"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_3.wrap(
+        KEY_3_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k3"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_4.wrap(
+        KEY_4_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k4"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_5.wrap(
+        KEY_5_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k5"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_6.wrap(
+        KEY_6_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k6"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_7.wrap(
+        KEY_7_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k7"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_8.wrap(
+        KEY_8_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k8"), 1, 
StringData.fromString("2023-06-20"))));
+
+    SORT_KEY_9.wrap(
+        KEY_9_WRAPPER.wrap(
+            GenericRowData.of(
+                StringData.fromString("k9"), 1, 
StringData.fromString("2023-06-20"))));
+  }
+
+  // Total weight is 80
+  private final MapDataStatistics mapDataStatistics =
+      new MapDataStatistics(
+          ImmutableMap.of(
+              SORT_KEY_0,
+              35L,
+              SORT_KEY_1,
+              23L,
+              SORT_KEY_2,
+              12L,
+              SORT_KEY_3,
+              4L,
+              SORT_KEY_4,
+              1L,
+              SORT_KEY_5,
+              1L,
+              SORT_KEY_6,
+              1L,
+              SORT_KEY_7,
+              1L,
+              SORT_KEY_8,
+              1L,
+              SORT_KEY_9,
+              1L));
+
+  @Test
+  public void testEvenlyDividableNoClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 0.0);
+    int numPartitions = 8;
+
+    // each task should get targeted weight of 10 (=80/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {10L, 10L, 10L, 5L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {5L, 10L, 8L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{2L, 10L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{4L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{1L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight, second is the number of 
assigned keys
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(10L, 1),
+            1,
+            Pair.of(10L, 1),
+            2,
+            Pair.of(10L, 1),
+            3,
+            Pair.of(10L, 2),
+            4,
+            Pair.of(10L, 1),
+            5,
+            Pair.of(10L, 2),
+            6,
+            Pair.of(10L, 1),
+            7,
+            Pair.of(10L, 7));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testEvenlyDividable20PercentClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 20.0);
+    int numPartitions = 8;
+
+    // target subtask weight is 10 before close file cost factored in.
+    // close file cost is 2 = 20% * 10.
+    // key weights before and after close file cost factored in
+    // before:     35, 23, 12, 4, 1, 1, 1, 1, 1, 1
+    // close-cost:  8,  6,  4, 2, 2, 2, 2, 2, 2, 2
+    // after:      43, 29, 16, 6, 3, 3, 3, 3, 3, 3
+    // target subtask weight per subtask is 14 (112/8)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {14L, 14L, 14L, 1L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {13L, 14L, 2L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{12L, 4L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] 
{6L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] 
{3L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{1L, 2L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight for the subtask, second is the 
number of keys
+    // assigned to the subtask
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(14L, 1),
+            1,
+            Pair.of(14L, 1),
+            2,
+            Pair.of(14L, 1),
+            3,
+            Pair.of(14L, 2),
+            4,
+            Pair.of(14L, 1),
+            5,
+            Pair.of(14L, 2),
+            6,
+            Pair.of(14L, 4),
+            7,
+            Pair.of(14L, 5));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testNonDividableNoClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 0.0);
+    int numPartitions = 9;
+
+    // each task should get targeted weight of 9 = ceiling(80/9)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {9L, 9L, 9L, 8L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {3, 4, 5, 6}, new long[] {1L, 9L, 9L, 4L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{5L, 7L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] 
{2L, 2L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{1L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight, second is the number of 
assigned keys
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(9L, 1),
+            1,
+            Pair.of(9L, 1),
+            2,
+            Pair.of(9L, 1),
+            3,
+            Pair.of(9L, 2),
+            4,
+            Pair.of(9L, 1),
+            5,
+            Pair.of(9L, 1),
+            6,
+            Pair.of(9L, 2),
+            7,
+            Pair.of(9L, 2),
+            8,
+            Pair.of(8L, 7));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  @Test
+  public void testNonDividable20PercentClosingFileCost() {
+    MapRangePartitioner partitioner =
+        new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, 
mapDataStatistics, 20.0);
+    int numPartitions = 9;
+
+    // target subtask weight is 9 before close file cost factored in.
+    // close file cost is 2 (= 20% * 9) per file.
+    // key weights before and after close file cost factored in
+    // before:     35, 23, 12, 4, 1, 1, 1, 1, 1, 1
+    // close-cost:  8,  6,  4, 2, 2, 2, 2, 2, 2, 2
+    // after:      43, 29, 16, 6, 3, 3, 3, 3, 3, 3
+    // target subtask weight per subtask is 13 (112/9)
+    Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
+        ImmutableMap.of(
+            SORT_KEY_0,
+            new MapRangePartitioner.KeyAssignment(
+                new int[] {0, 1, 2, 3}, new long[] {13L, 13L, 13L, 4L}),
+            SORT_KEY_1,
+            new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new 
long[] {9L, 13L, 7L}),
+            SORT_KEY_2,
+            new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] 
{6L, 10L}),
+            SORT_KEY_3,
+            new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] 
{3L, 3L}),
+            SORT_KEY_4,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_5,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_6,
+            new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] 
{3L}),
+            SORT_KEY_7,
+            new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] 
{1L, 2L}),
+            SORT_KEY_8,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{3L}),
+            SORT_KEY_9,
+            new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] 
{3L}));
+    Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
+        partitioner.assignment(numPartitions);
+    Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment);
+
+    // key: subtask id
+    // value pair: first is the assigned weight for the subtask, second is the 
number of keys
+    // assigned to the subtask
+    Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo =
+        ImmutableMap.of(
+            0,
+            Pair.of(13L, 1),
+            1,
+            Pair.of(13L, 1),
+            2,
+            Pair.of(13L, 1),
+            3,
+            Pair.of(13L, 2),
+            4,
+            Pair.of(13L, 1),
+            5,
+            Pair.of(13L, 2),
+            6,
+            Pair.of(13L, 2),
+            7,
+            Pair.of(13L, 5),
+            8,
+            Pair.of(8L, 3));
+    Map<Integer, Pair<Long, Integer>> actualAssignmentInfo = 
partitioner.assignmentInfo();
+    
Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo);
+
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults =
+        runPartitioner(partitioner, numPartitions);
+    validatePartitionResults(expectedAssignmentInfo, partitionResults);
+  }
+
+  private static Map<Integer, Pair<AtomicLong, Set<RowData>>> runPartitioner(
+      MapRangePartitioner partitioner, int numPartitions) {
+    // The Map key is the subtaskId.
+    // For the map value pair, the first element is the count of assigned and
+    // the second element of Set<String> is for the set of assigned keys.
+    Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults = 
Maps.newHashMap();
+    partitioner
+        .sortedStatsWithCloseFileCost()
+        .forEach(
+            (sortKey, weight) -> {
+              String key = sortKey.get(0, String.class);
+              // run 100x times of the weight
+              long iterations = weight * 100;
+              for (int i = 0; i < iterations; ++i) {
+                RowData rowData =
+                    GenericRowData.of(
+                        StringData.fromString(key), 1, 
StringData.fromString("2023-06-20"));
+                // TODO
+                int subtaskId = partitioner.partition(rowData, numPartitions);
+                partitionResults.computeIfAbsent(
+                    subtaskId, k -> Pair.of(new AtomicLong(0), 
Sets.newHashSet()));
+                Pair<AtomicLong, Set<RowData>> pair = 
partitionResults.get(subtaskId);
+                pair.first().incrementAndGet();
+                // TODO
+                pair.second().add(rowData);
+              }
+            });
+    return partitionResults;
+  }
+
+  private void validatePartitionResults(
+      Map<Integer, Pair<Long, Integer>> expectedAssignmentInfo,
+      Map<Integer, Pair<AtomicLong, Set<RowData>>> partitionResults) {
+
+    
Assertions.assertThat(partitionResults.size()).isEqualTo(expectedAssignmentInfo.size());
+
+    List<Integer> expectedAssignedKeyCounts =
+        Lists.newArrayListWithExpectedSize(expectedAssignmentInfo.size());
+    List<Integer> actualAssignedKeyCounts =
+        Lists.newArrayListWithExpectedSize(partitionResults.size());
+    List<Double> expectedNormalizedWeights =
+        Lists.newArrayListWithExpectedSize(expectedAssignmentInfo.size());
+    List<Double> actualNormalizedWeights =
+        Lists.newArrayListWithExpectedSize(partitionResults.size());
+
+    long expectedTotalWeight =
+        expectedAssignmentInfo.values().stream().mapToLong(Pair::first).sum();
+    expectedAssignmentInfo.forEach(
+        (subtaskId, pair) -> {
+          expectedAssignedKeyCounts.add(pair.second());
+          expectedNormalizedWeights.add(pair.first().doubleValue() / 
expectedTotalWeight);
+        });
+
+    long actualTotalWeight =
+        partitionResults.values().stream().mapToLong(pair -> 
pair.first().longValue()).sum();
+    partitionResults.forEach(
+        (subtaskId, pair) -> {
+          actualAssignedKeyCounts.add(pair.second().size());
+          actualNormalizedWeights.add(pair.first().doubleValue() / 
actualTotalWeight);
+        });
+
+    // number of assigned keys should match exactly
+    Assertions.assertThat(actualAssignedKeyCounts)
+        .as("the number of assigned keys should match for every subtask")
+        .isEqualTo(expectedAssignedKeyCounts);
+
+    System.out.println("------------------------------------");

Review Comment:
   nope. will remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to