pvary commented on code in PR #10859:
URL: https://github.com/apache/iceberg/pull/10859#discussion_r1705286597


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -548,21 +599,46 @@ private DataStream<RowData> distributeDataStream(
           }
 
         case RANGE:
-          if (equalityFieldIds.isEmpty()) {
+          // Ideally, exception should be thrown in the combination of range 
distribution and
+          // equality fields. Primary key case should use hash distribution 
mode.
+          // Keep the current behavior of falling back to keyBy for backward 
compatibility.
+          if (!equalityFieldIds.isEmpty()) {
             LOG.warn(
-                "Fallback to use 'none' distribution mode, because there are 
no equality fields set "
-                    + "and {}=range is not supported yet in flink",
-                WRITE_DISTRIBUTION_MODE);
-            return input;
-          } else {
-            LOG.info(
-                "Distribute rows by equality fields, because there are 
equality fields set "
-                    + "and{}=range is not supported yet in flink",
+                "Hash distribute rows by equality fields, even though {}=range 
is set. "
+                    + "Range distribution for primary keys are not always safe 
in "
+                    + "Flink streaming writer.",
                 WRITE_DISTRIBUTION_MODE);
             return input.keyBy(
                 new EqualityFieldKeySelector(iSchema, flinkRowType, 
equalityFieldIds));
           }
 
+          // range distribute by partition key or sort key if table has an 
SortOrder
+          Preconditions.checkState(
+              sortOrder.isSorted() || partitionSpec.isPartitioned(),
+              "Invalid write distribution mode: range. Need to define sort 
order and partition spec.");
+          if (sortOrder.isUnsorted()) {
+            sortOrder = Partitioning.sortOrderFor(partitionSpec);
+            LOG.info("Construct sort order from partition spec");
+          }
+
+          LOG.info("Range distribute rows by sort order: {}", sortOrder);
+          StatisticsType statisticsType = 
flinkWriteConf.rangeDistributionStatisticsType();
+          return input
+              .transform(
+                  operatorName("range-shuffle"),

Review Comment:
   Do we need to add uid to the operator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to