stevenzwu commented on code in PR #10331:
URL: https://github.com/apache/iceberg/pull/10331#discussion_r1625129655


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SketchUtil.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.datasketches.sampling.ReservoirItemsSketch;
+import org.apache.iceberg.SortKey;
+import org.apache.iceberg.StructLike;
+
+class SketchUtil {
+  static final int COORDINATOR_MIN_RESERVOIR_SIZE = 10_000;
+  static final int COORDINATOR_MAX_RESERVOIR_SIZE = 1_000_000;
+  static final int COORDINATOR_TARGET_PARTITIONS_MULTIPLIER = 100;
+  static final int OPERATOR_OVER_SAMPLE_RATIO = 10;
+
+  // switch the statistics tracking from map to sketch if the cardinality of 
the sort key is over
+  // this threshold. It is hardcoded for now, we can revisit in the future if 
config is needed.
+  static final int OPERATOR_SKETCH_SWITCH_THRESHOLD = 10_000;
+  static final int COORDINATOR_SKETCH_SWITCH_THRESHOLD = 100_000;
+
+  private SketchUtil() {}
+
+  /**
+   * The larger the reservoir size, the more accurate for range bounds 
calculation and the more
+   * balanced range distribution.
+   *
+   * <p>Here are the heuristic rules
+   * <li>Target size: numPartitions x 100 to achieve good accuracy and is 
easier to calculate the
+   *     range bounds
+   * <li>Min is 10K to achieve good accuracy while memory footprint is still 
relatively small
+   * <li>Max is 1M to cap the memory footprint on coordinator
+   *
+   * @param numPartitions number of range partitions which equals to 
downstream operator parallelism
+   * @return reservoir size
+   */
+  static int determineCoordinatorReservoirSize(int numPartitions) {
+    int reservoirSize = numPartitions * 
COORDINATOR_TARGET_PARTITIONS_MULTIPLIER;
+
+    if (reservoirSize < COORDINATOR_MIN_RESERVOIR_SIZE) {
+      // adjust it up and still make reservoirSize divisible by numPartitions
+      int remainder = COORDINATOR_MIN_RESERVOIR_SIZE % numPartitions;
+      reservoirSize = COORDINATOR_MIN_RESERVOIR_SIZE + (numPartitions - 
remainder);
+    } else if (reservoirSize > COORDINATOR_MAX_RESERVOIR_SIZE) {
+      // adjust it down and still make reservoirSize divisible by numPartitions
+      int remainder = COORDINATOR_MAX_RESERVOIR_SIZE % numPartitions;
+      reservoirSize = COORDINATOR_MAX_RESERVOIR_SIZE - remainder;
+    }
+
+    return reservoirSize;
+  }
+
+  /**
+   * Determine the sampling reservoir size where operator subtasks collect 
data statistics.
+   *
+   * <p>Here are the heuristic rules
+   * <li>Target size is "coordinator reservoir size * over sampling ration 
(10) / operator
+   *     parallelism"
+   * <li>Min is 1K to achieve good accuracy while memory footprint is still 
relatively small
+   * <li>Max is 100K to cap the memory footprint on coordinator
+   *
+   * @param numPartitions number of range partitions which equals to 
downstream operator parallelism
+   * @param operatorParallelism data statistics operator parallelism
+   * @return reservoir size
+   */
+  static int determineOperatorReservoirSize(int operatorParallelism, int 
numPartitions) {

Review Comment:
   > Do we call the determineBounds with the new parallelism, and we have a 
slightly worse results compared to the next checkpoint when we have stats 
collected with the new parallelism? Or we will use the already calculated 
bounds, and use fewer writers to write?
   
   range bounds are calculated when statistics were aggregated at coordinator. 
In the case of scale-up of writer parallelism, range bounds were calculated 
using the old parallelism (number of partitions) and newer subtasks will be 
idle, which is not optimal but not fatal. 
   
   Scale-down is the bigger problem. In this case, range bounds can't be used, 
as it was calculated with the bigger parallelism. we would get 
ArrayIndexOutOfBoundError.
   
   1.  simply discard the stats and operate non-optimally until statistics are 
learned again. 
   2. We can't really re-calculate the range bounds with new parallelism 
because coordinator sends the computed range bounds based on the writer 
parallelism (not the reservoir samples) to reduce the amount data transfer from 
coordinator to operator subtasks.
   
   We can change coordinator to checkpoint the reservoir samples. Upon 
parallelism change, operator subtasks should discard stats  and coordinator can 
broadcast new range bounds recomputed with new parallelism.
   
   ```
       /**
        * This is called when a subtask execution attempt of the Operator 
becomes ready to receive
        * events. The given {@code SubtaskGateway} can be used to send events 
to the execution attempt.
        *
        * <p>The given {@code SubtaskGateway} is bound to that specific 
execution attempt that became
        * ready. All events sent through the gateway target that execution 
attempt; if the attempt is
        * no longer running by the time the event is sent, then the events are 
failed.
        */
       void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to