stevenzwu opened a new issue, #6303:
URL: https://github.com/apache/iceberg/issues/6303

   ### Feature Request / Improvement
   
   Today, Flink Iceberg sink only supports simple keyBy hash distribution on 
partition columns. In practice, keyBy shuffle on partition values doesn't work 
very well.
   
   We can make the following shuffling enhancements in Flink streaming writer. 
More details can be found in the [design 
doc](https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo).
 This is an uber issue for tracking purpose. Here are the rough phases.
   
   1. [hash distribution] custom partitioner on bucket values. [PR 
4228](https://github.com/apache/iceberg/pull/4228) demonstrated that keyBy on 
low-cardinality partitioning buckets resulted in skewed traffic distribution. 
Flink sink can add a custom partitioner that directly map the bucket value 
(integer) to the downstream writer tasks (integer) in round-robin fashion 
(mod). This is a relatively simple case.
   
   This is a case when `write.distribution-mode=hash` and there is a bucketing 
partition column. Other partition columns (like hourly partition) will be 
ignored regarding shuffling. The assumption is that bucketing column is where 
we want to distribute/cluster the rows.
   
   2. [hash distribution] bin packing based on traffic distribution statistics. 
This works well for skewed data on partition columns (like event time). This 
requires calculating traffic distribution statistics across partition columns 
and use the statistics to guide shuffling decision.
   
   This is a case when `write.distribution-mode=hash` and there is NO bucketing 
partition column. 
   
   3. [range distribution] range partition based on traffic distribution 
statistics. It is a variant of 2 above. This works well for "sorting" 
non-partition columns (e.g. country code, event type). It can improve data 
clustering by creating data files with narrow value ranges. Note that Flink 
streaming writer probably won't sort rows within a file, as that would be very 
expensive (not impossible). Even without rows sorted within a file, the 
improved data clustering can result in effective file pruning. We just can't 
get the additional benefits of row group level skipping (for Parquet) with rows 
sorted within a file.
   
   This is a case when `write.distribution-mode=range` and `SortOrder` is 
defined for non-partition columns. partition columns will be ignored for range 
shuffling as the assumption is that non-partition sort columns are what matter 
here.
   
   4. [high cardinality columns] 2 and 3 above are mostly for low-cardinality 
columns (e.g. unique values in hundreds), where a simple dictionary of count 
per value can be used to track traffic distribution statistics. For 
high-cardinality column (like device or user id), we would need to use 
probabilistic data sketches algorithm to calculate traffic distribution.
   
   ### Query engine
   
   Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to