stevenzwu commented on code in PR #12071:
URL: https://github.com/apache/iceberg/pull/12071#discussion_r2098767251


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java:
##########
@@ -375,22 +381,43 @@ public Builder flinkConf(ReadableConfig config) {
 
     /**
      * Configure the write {@link DistributionMode} that the IcebergSink will 
use. Currently, flink
-     * support {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
+     * support {@link DistributionMode#NONE} and {@link DistributionMode#HASH} 
and {@link
+     * DistributionMode#RANGE}
      *
      * @param mode to specify the write distribution mode.
      * @return {@link IcebergSink.Builder} to connect the iceberg table.
      */
     @Override
     public Builder distributionMode(DistributionMode mode) {
-      Preconditions.checkArgument(
-          !DistributionMode.RANGE.equals(mode),
-          "Flink does not support 'range' write distribution mode now.");
       if (mode != null) {
         writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), 
mode.modeName());
       }
       return this;
     }
 
+    /**
+     * Range distribution needs to collect statistics about data distribution 
to properly shuffle
+     * the records in relatively balanced way. In general, low cardinality 
should use {@link
+     * StatisticsType#Map} and high cardinality should use {@link 
StatisticsType#Sketch} Refer to
+     * {@link StatisticsType} Javadoc for more details.
+     *
+     * <p>Default is {@link StatisticsType#Auto} where initially Map 
statistics is used. But if
+     * cardinality is higher than the threshold (currently 10K) as defined in 
{@code
+     * SketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD}, statistics collection 
automatically switches to
+     * the sketch reservoir sampling.
+     *
+     * <p>Explicit set the statistics type if the default behavior doesn't 
work.
+     *
+     * @param type to specify the statistics type for range distribution.
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder rangeDistributionStatisticsType(StatisticsType 
type) {
+      if (type != null) {
+        
writeOptions.put(FlinkWriteOptions.RANGE_DISTRIBUTION_STATISTICS_TYPE.key(), 
type.name());
+      }
+      return this;
+    }
+

Review Comment:
   it seems that we missed this method
   ```
   public Builder rangeDistributionSortKeyBaseWeight(double weight) 
   ```



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java:
##########
@@ -645,70 +676,121 @@ private DataStream<RowData> 
distributeDataStream(DataStream<RowData> input) {
     DistributionMode mode = flinkWriteConf.distributionMode();
     Schema schema = table.schema();
     PartitionSpec spec = table.spec();
+    SortOrder sortOrder = table.sortOrder();
+
     LOG.info("Write distribution mode is '{}'", mode.modeName());
     switch (mode) {
       case NONE:
-        if (equalityFieldIds.isEmpty()) {
-          return input;
-        } else {
-          LOG.info("Distribute rows by equality fields, because there are 
equality fields set");
-          return input.keyBy(new EqualityFieldKeySelector(schema, 
flinkRowType, equalityFieldIds));
-        }
-
+        return distributeDataStreamByNoneDistributionMode(input, schema);
       case HASH:
-        if (equalityFieldIds.isEmpty()) {
-          if (table.spec().isUnpartitioned()) {
-            LOG.warn(
-                "Fallback to use 'none' distribution mode, because there are 
no equality fields set "
-                    + "and table is unpartitioned");
-            return input;
-          } else {
-            if (BucketPartitionerUtil.hasOneBucketField(spec)) {
-              return input.partitionCustom(
-                  new BucketPartitioner(spec),
-                  new BucketPartitionKeySelector(spec, schema, flinkRowType));
-            } else {
-              return input.keyBy(new PartitionKeySelector(spec, schema, 
flinkRowType));
-            }
-          }
-        } else {
-          if (spec.isUnpartitioned()) {
-            LOG.info(
-                "Distribute rows by equality fields, because there are 
equality fields set "
-                    + "and table is unpartitioned");
-            return input.keyBy(
-                new EqualityFieldKeySelector(schema, flinkRowType, 
equalityFieldIds));
-          } else {
-            for (PartitionField partitionField : spec.fields()) {
-              Preconditions.checkState(
-                  equalityFieldIds.contains(partitionField.sourceId()),
-                  "In 'hash' distribution mode with equality fields set, 
partition field '%s' "
-                      + "should be included in equality fields: '%s'",
-                  partitionField,
-                  equalityFieldColumns);
-            }
-            return input.keyBy(new PartitionKeySelector(spec, schema, 
flinkRowType));
-          }
-        }
-
+        return distributeDataStreamByHashDistributionMode(input, schema, spec);
       case RANGE:
-        if (equalityFieldIds.isEmpty()) {
-          LOG.warn(
-              "Fallback to use 'none' distribution mode, because there are no 
equality fields set "
-                  + "and {}=range is not supported yet in flink",
-              WRITE_DISTRIBUTION_MODE);
-          return input;
+        return distributeDataStreamByRangeDistributionMode(input, schema, 
spec, sortOrder);
+      default:
+        throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + 
": " + mode);
+    }
+  }
+
+  private DataStream<RowData> distributeDataStreamByNoneDistributionMode(
+      DataStream<RowData> input, Schema schema) {
+    if (equalityFieldIds.isEmpty()) {
+      return input;
+    } else {
+      LOG.info("Distribute rows by equality fields, because there are equality 
fields set");
+      return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, 
equalityFieldIds));
+    }
+  }
+
+  private DataStream<RowData> distributeDataStreamByHashDistributionMode(
+      DataStream<RowData> input, Schema schema, PartitionSpec spec) {
+    if (equalityFieldIds.isEmpty()) {
+      if (table.spec().isUnpartitioned()) {
+        LOG.warn(
+            "Fallback to use 'none' distribution mode, because there are no 
equality fields set "
+                + "and table is unpartitioned");
+        return input;
+      } else {
+        if (BucketPartitionerUtil.hasOneBucketField(spec)) {

Review Comment:
   can you check with `FlinkSink` code again? this code has been 
removed/reverted there.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java:
##########
@@ -645,70 +676,121 @@ private DataStream<RowData> 
distributeDataStream(DataStream<RowData> input) {
     DistributionMode mode = flinkWriteConf.distributionMode();
     Schema schema = table.schema();
     PartitionSpec spec = table.spec();
+    SortOrder sortOrder = table.sortOrder();
+
     LOG.info("Write distribution mode is '{}'", mode.modeName());
     switch (mode) {
       case NONE:
-        if (equalityFieldIds.isEmpty()) {
-          return input;
-        } else {
-          LOG.info("Distribute rows by equality fields, because there are 
equality fields set");
-          return input.keyBy(new EqualityFieldKeySelector(schema, 
flinkRowType, equalityFieldIds));
-        }
-
+        return distributeDataStreamByNoneDistributionMode(input, schema);
       case HASH:
-        if (equalityFieldIds.isEmpty()) {
-          if (table.spec().isUnpartitioned()) {
-            LOG.warn(
-                "Fallback to use 'none' distribution mode, because there are 
no equality fields set "
-                    + "and table is unpartitioned");
-            return input;
-          } else {
-            if (BucketPartitionerUtil.hasOneBucketField(spec)) {
-              return input.partitionCustom(
-                  new BucketPartitioner(spec),
-                  new BucketPartitionKeySelector(spec, schema, flinkRowType));
-            } else {
-              return input.keyBy(new PartitionKeySelector(spec, schema, 
flinkRowType));
-            }
-          }
-        } else {
-          if (spec.isUnpartitioned()) {
-            LOG.info(
-                "Distribute rows by equality fields, because there are 
equality fields set "
-                    + "and table is unpartitioned");
-            return input.keyBy(
-                new EqualityFieldKeySelector(schema, flinkRowType, 
equalityFieldIds));
-          } else {
-            for (PartitionField partitionField : spec.fields()) {
-              Preconditions.checkState(
-                  equalityFieldIds.contains(partitionField.sourceId()),
-                  "In 'hash' distribution mode with equality fields set, 
partition field '%s' "
-                      + "should be included in equality fields: '%s'",
-                  partitionField,
-                  equalityFieldColumns);
-            }
-            return input.keyBy(new PartitionKeySelector(spec, schema, 
flinkRowType));
-          }
-        }
-
+        return distributeDataStreamByHashDistributionMode(input, schema, spec);
       case RANGE:
-        if (equalityFieldIds.isEmpty()) {
-          LOG.warn(
-              "Fallback to use 'none' distribution mode, because there are no 
equality fields set "
-                  + "and {}=range is not supported yet in flink",
-              WRITE_DISTRIBUTION_MODE);
-          return input;
+        return distributeDataStreamByRangeDistributionMode(input, schema, 
spec, sortOrder);
+      default:
+        throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + 
": " + mode);
+    }
+  }
+
+  private DataStream<RowData> distributeDataStreamByNoneDistributionMode(
+      DataStream<RowData> input, Schema schema) {
+    if (equalityFieldIds.isEmpty()) {
+      return input;
+    } else {
+      LOG.info("Distribute rows by equality fields, because there are equality 
fields set");
+      return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, 
equalityFieldIds));
+    }
+  }
+
+  private DataStream<RowData> distributeDataStreamByHashDistributionMode(
+      DataStream<RowData> input, Schema schema, PartitionSpec spec) {
+    if (equalityFieldIds.isEmpty()) {
+      if (table.spec().isUnpartitioned()) {
+        LOG.warn(
+            "Fallback to use 'none' distribution mode, because there are no 
equality fields set "
+                + "and table is unpartitioned");
+        return input;
+      } else {
+        if (BucketPartitionerUtil.hasOneBucketField(spec)) {
+          return input.partitionCustom(
+              new BucketPartitioner(spec),
+              new BucketPartitionKeySelector(spec, schema, flinkRowType));
         } else {
-          LOG.info(
-              "Distribute rows by equality fields, because there are equality 
fields set "
-                  + "and{}=range is not supported yet in flink",
-              WRITE_DISTRIBUTION_MODE);
-          return input.keyBy(new EqualityFieldKeySelector(schema, 
flinkRowType, equalityFieldIds));
+          return input.keyBy(new PartitionKeySelector(spec, schema, 
flinkRowType));
+        }
+      }
+    } else {
+      if (spec.isUnpartitioned()) {
+        LOG.info(
+            "Distribute rows by equality fields, because there are equality 
fields set "
+                + "and table is unpartitioned");
+        return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, 
equalityFieldIds));
+      } else {
+        for (PartitionField partitionField : spec.fields()) {
+          Preconditions.checkState(
+              equalityFieldIds.contains(partitionField.sourceId()),
+              "In 'hash' distribution mode with equality fields set, partition 
field '%s' "
+                  + "should be included in equality fields: '%s'",
+              partitionField,
+              equalityFieldColumns);
         }
+        return input.keyBy(new PartitionKeySelector(spec, schema, 
flinkRowType));
+      }
+    }
+  }
 
-      default:
-        throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + 
": " + mode);
+  private DataStream<RowData> distributeDataStreamByRangeDistributionMode(
+      DataStream<RowData> input, Schema schema, PartitionSpec spec, SortOrder 
sortOrderParam) {
+
+    int writerParallelism =
+        flinkWriteConf.writeParallelism() == null
+            ? input.getParallelism()
+            : flinkWriteConf.writeParallelism();
+
+    // Ideally, exception should be thrown in the combination of range 
distribution and
+    // equality fields. Primary key case should use hash distribution mode.
+    // Keep the current behavior of falling back to keyBy for backward 
compatibility.
+    if (!equalityFieldIds.isEmpty()) {
+      LOG.warn(
+          "Hash distribute rows by equality fields, even though {}=range is 
set. "
+              + "Range distribution for primary keys are not always safe in "
+              + "Flink streaming writer.",
+          WRITE_DISTRIBUTION_MODE);
+      return input.keyBy(new EqualityFieldKeySelector(schema, flinkRowType, 
equalityFieldIds));
+    }
+
+    SortOrder sortOrder = sortOrderParam;
+    // range distribute by partition key or sort key if table has an SortOrder
+    Preconditions.checkState(
+        sortOrder.isSorted() || spec.isPartitioned(),
+        "Invalid write distribution mode: range. Need to define sort order or 
partition spec.");
+    if (sortOrder.isUnsorted()) {
+      sortOrder = Partitioning.sortOrderFor(spec);
+      LOG.info("Construct sort order from partition spec");
     }
+
+    LOG.info("Range distribute rows by sort order: {}", sortOrder);
+    StatisticsType statisticsType = 
flinkWriteConf.rangeDistributionStatisticsType();
+    SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
+        input
+            .transform(
+                operatorName("range-shuffle"),
+                TypeInformation.of(StatisticsOrRecord.class),
+                new DataStatisticsOperatorFactory(
+                    schema,
+                    sortOrder,
+                    writerParallelism,
+                    statisticsType,
+                    flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
+            // Set the parallelism same as input operator to encourage chaining
+            .setParallelism(input.getParallelism());
+    if (uidSuffix != null) {
+      shuffleStream = shuffleStream.uid("shuffle-" + uidSuffix);
+    }
+
+    return shuffleStream
+        .partitionCustom(new RangePartitioner(schema, sortOrder), r -> r)
+        .filter(StatisticsOrRecord::hasRecord)

Review Comment:
   there has been changes from when this PR was initially created. please 
re-sync with `FlinkSink`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to