stevenzwu commented on code in PR #10859: URL: https://github.com/apache/iceberg/pull/10859#discussion_r1706418907
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java: ########## @@ -548,21 +599,46 @@ private DataStream<RowData> distributeDataStream( } case RANGE: - if (equalityFieldIds.isEmpty()) { + // Ideally, exception should be thrown in the combination of range distribution and + // equality fields. Primary key case should use hash distribution mode. + // Keep the current behavior of falling back to keyBy for backward compatibility. + if (!equalityFieldIds.isEmpty()) { LOG.warn( - "Fallback to use 'none' distribution mode, because there are no equality fields set " - + "and {}=range is not supported yet in flink", - WRITE_DISTRIBUTION_MODE); - return input; - } else { - LOG.info( - "Distribute rows by equality fields, because there are equality fields set " - + "and{}=range is not supported yet in flink", + "Hash distribute rows by equality fields, even though {}=range is set. " + + "Range distribution for primary keys are not always safe in " + + "Flink streaming writer.", WRITE_DISTRIBUTION_MODE); return input.keyBy( new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); } + // range distribute by partition key or sort key if table has an SortOrder + Preconditions.checkState( + sortOrder.isSorted() || partitionSpec.isPartitioned(), + "Invalid write distribution mode: range. Need to define sort order and partition spec."); + if (sortOrder.isUnsorted()) { + sortOrder = Partitioning.sortOrderFor(partitionSpec); + LOG.info("Construct sort order from partition spec"); + } + + LOG.info("Range distribute rows by sort order: {}", sortOrder); + StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType(); + return input + .transform( + operatorName("range-shuffle"), Review Comment: thanks for catching the bug. it is important to add aid as it is a stateful operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org