pvary commented on code in PR #10859:
URL: https://github.com/apache/iceberg/pull/10859#discussion_r1719362536


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -233,15 +239,66 @@ public Builder flinkConf(ReadableConfig config) {
      * @return {@link Builder} to connect the iceberg table.
      */
     public Builder distributionMode(DistributionMode mode) {
-      Preconditions.checkArgument(
-          !DistributionMode.RANGE.equals(mode),
-          "Flink does not support 'range' write distribution mode now.");
       if (mode != null) {
         writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), 
mode.modeName());
       }
       return this;
     }
 
+    /**
+     * Range distribution needs to collect statistics about data distribution 
to properly shuffle
+     * the records in relatively balanced way. In general, low cardinality 
should use {@link
+     * StatisticsType#Map} and high cardinality should use {@link 
StatisticsType#Sketch} Refer to
+     * {@link StatisticsType} Javadoc for more details.
+     *
+     * <p>Default is {@link StatisticsType#Auto} where initially Map 
statistics is used. But if
+     * cardinality is higher than the threshold (currently 10K) as defined in 
{@code
+     * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection 
automatically switches to
+     * the sketch reservoir sampling.
+     *
+     * <p>Explicit set the statistics type if the default behavior doesn't 
work.
+     *
+     * @param type to specify the statistics type for range distribution.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder rangeDistributionStatisticsType(StatisticsType type) {
+      if (type != null) {
+        
writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), 
type.name());
+      }
+      return this;
+    }
+
+    /**
+     * If sort order contains partition columns, each sort key would map to 
one partition and data
+     * file. This relative weight can avoid placing too many small files for 
sort keys with low
+     * traffic. It is a double value that defines the minimal weight for each 
sort key. `0.02` means
+     * each key has a base weight of `2%` of the targeted traffic weight per 
writer task.
+     *
+     * <p>E.g. the sink Iceberg table is partitioned daily by event time. 
Assume the data stream
+     * contain events from now up to 180 days ago. With even time, traffic 
weight distribution

Review Comment:
   Nit: contains?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to