pvary commented on code in PR #10859:
URL: https://github.com/apache/iceberg/pull/10859#discussion_r1705279951


##########
docs/docs/flink-writes.md:
##########
@@ -262,6 +262,91 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') 
*/
 
 Check out all the options here: 
[write-options](flink-configuration.md#write-options) 
 
+## Distribution mode
+
+Flink streaming writer supports both `HASH` and `RANGE` distribution mode.
+You can enable it via `FlinkSink#Builder#distributionMode(DistributionMode )`
+or via [write-options](flink-configuration.md#write-options).
+
+### Hash distribution
+
+HASH distribution shuffle data by partition key (partitioned table) or
+equality fields (non-partitioned table). It simply leverages Flink's
+`DataStream#keyBy` to distribute the data.
+
+HASH distribution has a few limitations.
+<ul>
+<li>It doesn't handle skewed data well. E.g. some partitions have a lot more 
data than others.
+<li>It can result in unbalanced traffic distribution if cardinality of the 
partition key or
+equality fields is low as demonstrated by [PR 
4228](https://github.com/apache/iceberg/pull/4228).
+<li>Writer parallelism is limited to the cardinality of the hash key.
+if the cardinality is 10, only at most 10 writer tasks would get the traffic.
+Having higher writer parallelism (even if traffic volume requires) won't help.
+</ul>
+
+### Range distribution (experimental)
+
+RANGE distribution shuffle data by partition key or sort order via a custom 
range partitioner.
+Range distribution collects traffic statistics to guide the range partitioner 
to
+evenly distribute traffic to writer tasks.
+
+RANGE distribution can be applied an Iceberg table that either is partitioned 
or
+has SortOrder defined. For a partitioned table without SortOrder, partition 
columns
+are used as sort columns. If SortOrder is defined for the table, it is used by
+the range partitioner.
+
+Range distribution can handle skewed data. E.g.
+<ul>
+<li>Table is partitioned by event time. Typically, recent hours have more data,
+while the long-tail hours have less and less data.
+<li>Table is partitioned by country code, where some countries (like US) have
+a lot more traffic and smaller countries have a lot less data
+<li>Table is partitioned by event type, where some types have a lot more data 
than others.
+</ul>
+
+Range distribution can also cluster data on non-partition columns.
+E.g., table is partitioned hourly on ingestion time. Queries often include
+predicate on a non-partition column like `device_id` or `country_code`.
+Range partition would improve the query performance by clustering on the 
non-partition column.
+
+Range distribution only shuffle the data via range partitioner. Rows are *not* 
sorted within
+a data file, which Flink streaming writer doesn't support yet.
+
+Statistics are collected by every shuffle operator subtask and aggregated by 
the coordinator
+for every checkpoint cycle. Aggregated statistics are broadcast to all 
subtasks and
+applied to the range partitioner in the next checkpoint. So it may take up to 
two checkpoint
+cycles to detect traffic distribution change and apply the new statistics to 
range partitioner.
+
+Range distribution can work with low cardinality (like `country_code`)
+or high cardinality (like `device_id`) scenarios.
+<ul>
+<li>For low cardinality scenario (like hundreds or thousands),
+HashMap is used to track traffic distribution for every key.
+If a new sort key value shows up, range partitioner would just
+round-robin it to the writer tasks before traffic distribution has been 
learned.
+about the new key.
+<li>For high cardinality scenario (like millions or billions),
+uniform random sampling (reservoir sampling) is used to compute range bounds
+that split the sort key space evenly.
+It keeps the memory footprint and network exchange low.

Review Comment:
   Do we want to add a paragraph about the associated cost? Like CPU cost of 
collecting the statistics, and distributing them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to