pvary commented on code in PR #10859: URL: https://github.com/apache/iceberg/pull/10859#discussion_r1719362536
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java: ########## @@ -233,15 +239,66 @@ public Builder flinkConf(ReadableConfig config) { * @return {@link Builder} to connect the iceberg table. */ public Builder distributionMode(DistributionMode mode) { - Preconditions.checkArgument( - !DistributionMode.RANGE.equals(mode), - "Flink does not support 'range' write distribution mode now."); if (mode != null) { writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); } return this; } + /** + * Range distribution needs to collect statistics about data distribution to properly shuffle + * the records in relatively balanced way. In general, low cardinality should use {@link + * StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to + * {@link StatisticsType} Javadoc for more details. + * + * <p>Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if + * cardinality is higher than the threshold (currently 10K) as defined in {@code + * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to + * the sketch reservoir sampling. + * + * <p>Explicit set the statistics type if the default behavior doesn't work. + * + * @param type to specify the statistics type for range distribution. + * @return {@link Builder} to connect the iceberg table. + */ + public Builder rangeDistributionStatisticsType(StatisticsType type) { + if (type != null) { + writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name()); + } + return this; + } + + /** + * If sort order contains partition columns, each sort key would map to one partition and data + * file. This relative weight can avoid placing too many small files for sort keys with low + * traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means + * each key has a base weight of `2%` of the targeted traffic weight per writer task. + * + * <p>E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream + * contain events from now up to 180 days ago. With even time, traffic weight distribution Review Comment: Nit: contains? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org