mxm commented on code in PR #13260:
URL: https://github.com/apache/iceberg/pull/13260#discussion_r2138136779


##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java:
##########
@@ -771,16 +769,23 @@ private DataStream<RowData> 
distributeDataStreamByHashDistributionMode(
     }
   }
 
+  private int resolveParallelismFor(DataStream<RowData> input) {
+    // if the writeParallelism is not specified, we set the default to the 
input parallelism to
+    // encourage chaining.
+    if (flinkWriteConf.writeParallelism() == null) {
+      return input.getParallelism();
+    } else {
+      return flinkWriteConf.writeParallelism();
+    }

Review Comment:
   NIT 
   ```suggestion
       return Optional.ofNullable(flinkWriteConf.writeParallelism())
           .orElse(input.getParallelism());
   ```



##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java:
##########
@@ -388,6 +388,28 @@ void testErrorOnNullForRequiredField() throws Exception {
     assertThatThrownBy(() -> 
env.execute()).hasRootCauseInstanceOf(NullPointerException.class);
   }
 
+  @TestTemplate
+  void testDefaultWriteParallelism() throws Exception {
+    // since source dataStream parallelism used in the test is 1, we focus on 
parallelism > 1 so the
+    // assertion makes sense
+    assumeThat(parallelism).isGreaterThan(1);

Review Comment:
   Why don't we test the `parallelism == 1` case? It certainly doesn't hurt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to