stevenzwu commented on code in PR #12071: URL: https://github.com/apache/iceberg/pull/12071#discussion_r2098767251
########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java: ########## @@ -375,22 +381,43 @@ public Builder flinkConf(ReadableConfig config) { /** * Configure the write {@link DistributionMode} that the IcebergSink will use. Currently, flink - * support {@link DistributionMode#NONE} and {@link DistributionMode#HASH}. + * support {@link DistributionMode#NONE} and {@link DistributionMode#HASH} and {@link + * DistributionMode#RANGE} * * @param mode to specify the write distribution mode. * @return {@link IcebergSink.Builder} to connect the iceberg table. */ @Override public Builder distributionMode(DistributionMode mode) { - Preconditions.checkArgument( - !DistributionMode.RANGE.equals(mode), - "Flink does not support 'range' write distribution mode now."); if (mode != null) { writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); } return this; } + /** + * Range distribution needs to collect statistics about data distribution to properly shuffle + * the records in relatively balanced way. In general, low cardinality should use {@link + * StatisticsType#Map} and high cardinality should use {@link StatisticsType#Sketch} Refer to + * {@link StatisticsType} Javadoc for more details. + * + * <p>Default is {@link StatisticsType#Auto} where initially Map statistics is used. But if + * cardinality is higher than the threshold (currently 10K) as defined in {@code + * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection automatically switches to + * the sketch reservoir sampling. + * + * <p>Explicit set the statistics type if the default behavior doesn't work. + * + * @param type to specify the statistics type for range distribution. + * @return {@link IcebergSink.Builder} to connect the iceberg table. + */ + public IcebergSink.Builder rangeDistributionStatisticsType(StatisticsType type) { + if (type != null) { + writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), type.name()); + } + return this; + } + Review Comment: it seems that we missed this method ``` public Builder rangeDistributionSortKeyBaseWeight(double weight) ``` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java: ########## @@ -645,70 +676,121 @@ private DataStream<RowData> distributeDataStream(DataStream<RowData> input) { DistributionMode mode = flinkWriteConf.distributionMode(); Schema schema = table.schema(); PartitionSpec spec = table.spec(); + SortOrder sortOrder = table.sortOrder(); + LOG.info("Write distribution mode is '{}'", mode.modeName()); switch (mode) { case NONE: - if (equalityFieldIds.isEmpty()) { - return input; - } else { - LOG.info("Distribute rows by equality fields, because there are equality fields set"); - return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); - } - + return distributeDataStreamByNoneDistributionMode(input, schema); case HASH: - if (equalityFieldIds.isEmpty()) { - if (table.spec().isUnpartitioned()) { - LOG.warn( - "Fallback to use 'none' distribution mode, because there are no equality fields set " - + "and table is unpartitioned"); - return input; - } else { - if (BucketPartitionerUtil.hasOneBucketField(spec)) { - return input.partitionCustom( - new BucketPartitioner(spec), - new BucketPartitionKeySelector(spec, schema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(spec, schema, flinkRowType)); - } - } - } else { - if (spec.isUnpartitioned()) { - LOG.info( - "Distribute rows by equality fields, because there are equality fields set " - + "and table is unpartitioned"); - return input.keyBy( - new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); - } else { - for (PartitionField partitionField : spec.fields()) { - Preconditions.checkState( - equalityFieldIds.contains(partitionField.sourceId()), - "In 'hash' distribution mode with equality fields set, partition field '%s' " - + "should be included in equality fields: '%s'", - partitionField, - equalityFieldColumns); - } - return input.keyBy(new PartitionKeySelector(spec, schema, flinkRowType)); - } - } - + return distributeDataStreamByHashDistributionMode(input, schema, spec); case RANGE: - if (equalityFieldIds.isEmpty()) { - LOG.warn( - "Fallback to use 'none' distribution mode, because there are no equality fields set " - + "and {}=range is not supported yet in flink", - WRITE_DISTRIBUTION_MODE); - return input; + return distributeDataStreamByRangeDistributionMode(input, schema, spec, sortOrder); + default: + throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + mode); + } + } + + private DataStream<RowData> distributeDataStreamByNoneDistributionMode( + DataStream<RowData> input, Schema schema) { + if (equalityFieldIds.isEmpty()) { + return input; + } else { + LOG.info("Distribute rows by equality fields, because there are equality fields set"); + return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); + } + } + + private DataStream<RowData> distributeDataStreamByHashDistributionMode( + DataStream<RowData> input, Schema schema, PartitionSpec spec) { + if (equalityFieldIds.isEmpty()) { + if (table.spec().isUnpartitioned()) { + LOG.warn( + "Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and table is unpartitioned"); + return input; + } else { + if (BucketPartitionerUtil.hasOneBucketField(spec)) { Review Comment: can you check with `FlinkSink` code again? this code has been removed/reverted there. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java: ########## @@ -645,70 +676,121 @@ private DataStream<RowData> distributeDataStream(DataStream<RowData> input) { DistributionMode mode = flinkWriteConf.distributionMode(); Schema schema = table.schema(); PartitionSpec spec = table.spec(); + SortOrder sortOrder = table.sortOrder(); + LOG.info("Write distribution mode is '{}'", mode.modeName()); switch (mode) { case NONE: - if (equalityFieldIds.isEmpty()) { - return input; - } else { - LOG.info("Distribute rows by equality fields, because there are equality fields set"); - return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); - } - + return distributeDataStreamByNoneDistributionMode(input, schema); case HASH: - if (equalityFieldIds.isEmpty()) { - if (table.spec().isUnpartitioned()) { - LOG.warn( - "Fallback to use 'none' distribution mode, because there are no equality fields set " - + "and table is unpartitioned"); - return input; - } else { - if (BucketPartitionerUtil.hasOneBucketField(spec)) { - return input.partitionCustom( - new BucketPartitioner(spec), - new BucketPartitionKeySelector(spec, schema, flinkRowType)); - } else { - return input.keyBy(new PartitionKeySelector(spec, schema, flinkRowType)); - } - } - } else { - if (spec.isUnpartitioned()) { - LOG.info( - "Distribute rows by equality fields, because there are equality fields set " - + "and table is unpartitioned"); - return input.keyBy( - new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); - } else { - for (PartitionField partitionField : spec.fields()) { - Preconditions.checkState( - equalityFieldIds.contains(partitionField.sourceId()), - "In 'hash' distribution mode with equality fields set, partition field '%s' " - + "should be included in equality fields: '%s'", - partitionField, - equalityFieldColumns); - } - return input.keyBy(new PartitionKeySelector(spec, schema, flinkRowType)); - } - } - + return distributeDataStreamByHashDistributionMode(input, schema, spec); case RANGE: - if (equalityFieldIds.isEmpty()) { - LOG.warn( - "Fallback to use 'none' distribution mode, because there are no equality fields set " - + "and {}=range is not supported yet in flink", - WRITE_DISTRIBUTION_MODE); - return input; + return distributeDataStreamByRangeDistributionMode(input, schema, spec, sortOrder); + default: + throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + mode); + } + } + + private DataStream<RowData> distributeDataStreamByNoneDistributionMode( + DataStream<RowData> input, Schema schema) { + if (equalityFieldIds.isEmpty()) { + return input; + } else { + LOG.info("Distribute rows by equality fields, because there are equality fields set"); + return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); + } + } + + private DataStream<RowData> distributeDataStreamByHashDistributionMode( + DataStream<RowData> input, Schema schema, PartitionSpec spec) { + if (equalityFieldIds.isEmpty()) { + if (table.spec().isUnpartitioned()) { + LOG.warn( + "Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and table is unpartitioned"); + return input; + } else { + if (BucketPartitionerUtil.hasOneBucketField(spec)) { + return input.partitionCustom( + new BucketPartitioner(spec), + new BucketPartitionKeySelector(spec, schema, flinkRowType)); } else { - LOG.info( - "Distribute rows by equality fields, because there are equality fields set " - + "and{}=range is not supported yet in flink", - WRITE_DISTRIBUTION_MODE); - return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); + return input.keyBy(new PartitionKeySelector(spec, schema, flinkRowType)); + } + } + } else { + if (spec.isUnpartitioned()) { + LOG.info( + "Distribute rows by equality fields, because there are equality fields set " + + "and table is unpartitioned"); + return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); + } else { + for (PartitionField partitionField : spec.fields()) { + Preconditions.checkState( + equalityFieldIds.contains(partitionField.sourceId()), + "In 'hash' distribution mode with equality fields set, partition field '%s' " + + "should be included in equality fields: '%s'", + partitionField, + equalityFieldColumns); } + return input.keyBy(new PartitionKeySelector(spec, schema, flinkRowType)); + } + } + } - default: - throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + mode); + private DataStream<RowData> distributeDataStreamByRangeDistributionMode( + DataStream<RowData> input, Schema schema, PartitionSpec spec, SortOrder sortOrderParam) { + + int writerParallelism = + flinkWriteConf.writeParallelism() == null + ? input.getParallelism() + : flinkWriteConf.writeParallelism(); + + // Ideally, exception should be thrown in the combination of range distribution and + // equality fields. Primary key case should use hash distribution mode. + // Keep the current behavior of falling back to keyBy for backward compatibility. + if (!equalityFieldIds.isEmpty()) { + LOG.warn( + "Hash distribute rows by equality fields, even though {}=range is set. " + + "Range distribution for primary keys are not always safe in " + + "Flink streaming writer.", + WRITE_DISTRIBUTION_MODE); + return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, equalityFieldIds)); + } + + SortOrder sortOrder = sortOrderParam; + // range distribute by partition key or sort key if table has an SortOrder + Preconditions.checkState( + sortOrder.isSorted() || spec.isPartitioned(), + "Invalid write distribution mode: range. Need to define sort order or partition spec."); + if (sortOrder.isUnsorted()) { + sortOrder = Partitioning.sortOrderFor(spec); + LOG.info("Construct sort order from partition spec"); } + + LOG.info("Range distribute rows by sort order: {}", sortOrder); + StatisticsType statisticsType = flinkWriteConf.rangeDistributionStatisticsType(); + SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream = + input + .transform( + operatorName("range-shuffle"), + TypeInformation.of(StatisticsOrRecord.class), + new DataStatisticsOperatorFactory( + schema, + sortOrder, + writerParallelism, + statisticsType, + flinkWriteConf.rangeDistributionSortKeyBaseWeight())) + // Set the parallelism same as input operator to encourage chaining + .setParallelism(input.getParallelism()); + if (uidSuffix != null) { + shuffleStream = shuffleStream.uid("shuffle-" + uidSuffix); + } + + return shuffleStream + .partitionCustom(new RangePartitioner(schema, sortOrder), r -> r) + .filter(StatisticsOrRecord::hasRecord) Review Comment: there has been changes from when this PR was initially created. please re-sync with `FlinkSink` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org